mirror of
https://github.com/neoarz/Syntrel.git
synced 2025-12-25 03:40:11 +01:00
293 lines
11 KiB
Python
293 lines
11 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import random
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
import aiosqlite
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord.ext.commands import Context
|
|
from dotenv import load_dotenv
|
|
|
|
from database import DatabaseManager
|
|
from utils import ascii, setup_logger, get_uptime, setup_signal_handlers
|
|
|
|
load_dotenv()
|
|
|
|
"""
|
|
Default Intents:
|
|
intents.emojis_and_stickers = True
|
|
intents.guild_messages = True
|
|
intents.guild_reactions = True
|
|
intents.guild_scheduled_events = True
|
|
intents.guild_typing = True
|
|
intents.guilds = True
|
|
intents.integrations = True
|
|
intents.invites = True
|
|
intents.reactions = True
|
|
intents.typing = True
|
|
intents.voice_states = True
|
|
intents.webhooks = True
|
|
|
|
intents.members = True
|
|
intents.presences = True
|
|
"""
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.bans = True
|
|
intents.dm_messages = True
|
|
intents.dm_reactions = True
|
|
intents.dm_typing = True
|
|
intents.emojis = True
|
|
intents.messages = True
|
|
|
|
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class DiscordBot(commands.Bot):
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
command_prefix=commands.when_mentioned_or(os.getenv("PREFIX")),
|
|
intents=intents,
|
|
help_command=None,
|
|
)
|
|
self.logger = logger
|
|
self.database = None
|
|
self.bot_prefix = os.getenv("PREFIX")
|
|
self.invite_link = os.getenv("INVITE_LINK")
|
|
self.start_time = time.time()
|
|
self._shutdown = False
|
|
|
|
async def init_db(self) -> None:
|
|
async with aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
) as db:
|
|
with open(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/schema.sql",
|
|
encoding = "utf-8"
|
|
) as file:
|
|
await db.executescript(file.read())
|
|
await db.commit()
|
|
|
|
async def load_cogs(self) -> None:
|
|
cogs_path = f"{os.path.realpath(os.path.dirname(__file__))}/cogs"
|
|
disabled_env = os.getenv("DISABLED_COGS", "")
|
|
disabled_cogs = {entry.strip().lower() for entry in disabled_env.split(",") if entry.strip()}
|
|
|
|
for folder in os.listdir(cogs_path):
|
|
folder_path = os.path.join(cogs_path, folder)
|
|
if os.path.isdir(folder_path) and not folder.startswith('__'):
|
|
for file in os.listdir(folder_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
full_name = f"{folder}.{extension}".lower()
|
|
if extension.lower() in disabled_cogs or full_name in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{full_name}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{folder}.{extension}")
|
|
self.logger.info(f"Loaded extension '{folder}.{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {folder}.{extension}\n{exception}"
|
|
)
|
|
|
|
for file in os.listdir(cogs_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
if extension.lower() in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{extension}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{extension}")
|
|
self.logger.info(f"Loaded extension '{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {extension}\n{exception}"
|
|
)
|
|
|
|
@tasks.loop(minutes=1.0)
|
|
async def status_task(self) -> None:
|
|
statuses = ["SideStore", "MeloNX", "ARMSX2", "StikDebug", "Feather"]
|
|
await self.change_presence(activity=discord.Game(random.choice(statuses)))
|
|
|
|
@status_task.before_loop
|
|
async def before_status_task(self) -> None:
|
|
await self.wait_until_ready()
|
|
|
|
async def setup_hook(self) -> None:
|
|
self.logger.info(f"Logged in as {self.user.name}")
|
|
self.logger.info(f"discord.py API version: {discord.__version__}")
|
|
self.logger.info(f"Python version: {platform.python_version()}")
|
|
self.logger.info(
|
|
f"Running on: {platform.system()} {platform.release()} ({os.name})"
|
|
)
|
|
|
|
try:
|
|
app_info = await self.application_info()
|
|
if app_info.team:
|
|
self.logger.info(f"Bot owned by team: {app_info.team.name}")
|
|
for member in app_info.team.members:
|
|
self.logger.info(f"Team member: {member.name} (ID: {member.id})")
|
|
else:
|
|
self.logger.info(f"Bot owner: {app_info.owner.name} (ID: {app_info.owner.id})")
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching application info: {e}")
|
|
|
|
|
|
await self.init_db()
|
|
await self.load_cogs()
|
|
self.status_task.start()
|
|
self.database = DatabaseManager(
|
|
connection=await aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
)
|
|
)
|
|
|
|
def get_uptime(self) -> str:
|
|
return get_uptime(self.start_time)
|
|
|
|
async def close(self) -> None:
|
|
if self._shutdown:
|
|
return
|
|
self._shutdown = True
|
|
|
|
self.logger.info("Starting shutdown process...")
|
|
|
|
if self.status_task and not self.status_task.is_being_cancelled():
|
|
self.status_task.cancel()
|
|
self.logger.info("Status task cancelled")
|
|
|
|
if self.database and self.database.connection:
|
|
try:
|
|
await self.database.connection.close()
|
|
self.logger.warning("Database connection closed")
|
|
except Exception as e:
|
|
self.logger.error(f"Error closing database connection: {e}")
|
|
|
|
try:
|
|
await super().close()
|
|
self.logger.critical("Bot shutdown complete")
|
|
except Exception as e:
|
|
self.logger.error(f"Error during bot shutdown: {e}")
|
|
|
|
async def on_message(self, message: discord.Message) -> None:
|
|
if message.author == self.user or message.author.bot:
|
|
return
|
|
|
|
if self.user in message.mentions:
|
|
try:
|
|
emoji_string = "<a:PandaPing:1417550314260926575>"
|
|
self.logger.debug(f"Attempting to react with PandaPing emoji: {emoji_string}")
|
|
await message.add_reaction(emoji_string)
|
|
self.logger.debug("Successfully reacted with PandaPing emoji")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to react with PandaPing emoji: {e}")
|
|
try:
|
|
self.logger.debug("Falling back to wave emoji")
|
|
await message.add_reaction("👋")
|
|
self.logger.debug("Successfully reacted with wave emoji")
|
|
except Exception as fallback_error:
|
|
self.logger.debug(f"Failed to react with fallback emoji: {fallback_error}")
|
|
await self.process_commands(message)
|
|
|
|
async def on_command_completion(self, context: Context) -> None:
|
|
full_command_name = context.command.qualified_name
|
|
split = full_command_name.split(" ")
|
|
executed_command = str(split[0])
|
|
|
|
if executed_command.lower() == "shutdown":
|
|
return
|
|
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs"
|
|
)
|
|
|
|
async def on_command_error(self, context: Context, error) -> None:
|
|
if isinstance(error, commands.CommandNotFound):
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Unknown command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Unknown command in DMs by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
return
|
|
if isinstance(error, commands.CommandOnCooldown):
|
|
minutes, seconds = divmod(error.retry_after, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
hours = hours % 24
|
|
embed = discord.Embed(
|
|
description=f"**Please slow down** - You can use this command again in {f'{round(hours)} hours' if round(hours) > 0 else ''} {f'{round(minutes)} minutes' if round(minutes) > 0 else ''} {f'{round(seconds)} seconds' if round(seconds) > 0 else ''}.",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.NotOwner):
|
|
if context.guild:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the guild {context.guild.name} (ID: {context.guild.id}), but the user is not an owner of the bot."
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the bot's DMs, but the user is not an owner of the bot."
|
|
)
|
|
elif isinstance(error, commands.MissingPermissions):
|
|
embed = discord.Embed(
|
|
description="You are missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to execute this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.BotMissingPermissions):
|
|
embed = discord.Embed(
|
|
description="I am missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to fully perform this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
embed = discord.Embed(
|
|
title="Error!",
|
|
description=str(error).capitalize(),
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
else:
|
|
raise error
|
|
|
|
|
|
bot = DiscordBot()
|
|
|
|
if __name__ == "__main__":
|
|
os.system('clear' if os.name == 'posix' else 'cls')
|
|
|
|
print(ascii)
|
|
|
|
setup_signal_handlers(bot)
|
|
|
|
try:
|
|
bot.run(os.getenv("TOKEN"))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except:
|
|
pass
|