import os import platform import random import time import aiosqlite import discord from discord.ext import commands, tasks from discord.ext.commands import Context from dotenv import load_dotenv from database import DatabaseManager from utils import ascii, setup_logger, get_uptime, setup_signal_handlers load_dotenv() intents = discord.Intents.default() intents.message_content = True intents.bans = True intents.dm_messages = True intents.dm_reactions = True intents.dm_typing = True intents.emojis = True intents.messages = True intents.reactions = True intents.typing = True intents.voice_states = True intents.webhooks = True intents.emojis_and_stickers = True intents.guild_messages = True intents.guild_reactions = True intents.guild_scheduled_events = True intents.guild_typing = True intents.guilds = True intents.integrations = True intents.invites = True intents.presences = True intents.members = True logger = setup_logger() class DiscordBot(commands.Bot): def __init__(self) -> None: super().__init__( command_prefix=commands.when_mentioned_or(os.getenv("PREFIX")), intents=intents, help_command=None, ) self.logger = logger self.database = None self.bot_prefix = os.getenv("PREFIX") self.invite_link = os.getenv("INVITE_LINK") self.start_time = time.time() self._shutdown = False async def init_db(self) -> None: async with aiosqlite.connect( f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db" ) as db: with open( f"{os.path.realpath(os.path.dirname(__file__))}/database/schema.sql", encoding="utf-8", ) as file: await db.executescript(file.read()) await db.commit() async def load_cogs(self) -> None: cogs_path = f"{os.path.realpath(os.path.dirname(__file__))}/cogs" disabled_env = os.getenv("DISABLED_COGS", "") disabled_cogs = { entry.strip().lower() for entry in disabled_env.split(",") if entry.strip() } for folder in os.listdir(cogs_path): folder_path = os.path.join(cogs_path, folder) if os.path.isdir(folder_path) and not folder.startswith("__"): init_file = os.path.join(folder_path, "__init__.py") if os.path.exists(init_file): try: await self.load_extension(f"cogs.{folder}") if folder in ["owner", "help", "botinfo"]: self.logger.info(f"Loaded extension '{folder}'") except Exception as e: exception = f"{type(e).__name__}: {e}" self.logger.error( f"Failed to load extension {folder}\n{exception}" ) else: for file in os.listdir(folder_path): if file.endswith(".py") and not file.startswith("__"): extension = file[:-3] full_name = f"{folder}.{extension}".lower() if ( extension.lower() in disabled_cogs or full_name in disabled_cogs ): self.logger.info( f"Skipped disabled extension '{full_name}'" ) continue try: await self.load_extension(f"cogs.{folder}.{extension}") self.logger.info( f"Loaded extension '{folder}.{extension}'" ) except Exception as e: exception = f"{type(e).__name__}: {e}" self.logger.error( f"Failed to load extension {folder}.{extension}\n{exception}" ) for file in os.listdir(cogs_path): if file.endswith(".py") and not file.startswith("__"): extension = file[:-3] if extension.lower() in disabled_cogs: self.logger.info(f"Skipped disabled extension '{extension}'") continue try: await self.load_extension(f"cogs.{extension}") if extension in ["owner", "help", "botinfo"]: self.logger.info(f"Loaded extension '{extension}'") except Exception as e: exception = f"{type(e).__name__}: {e}" self.logger.error( f"Failed to load extension {extension}\n{exception}" ) @tasks.loop(minutes=1.0) async def status_task(self) -> None: statuses = ["SideStore", "MeloNX", "ARMSX2", "StikDebug", "Feather"] await self.change_presence(activity=discord.Game(random.choice(statuses))) @status_task.before_loop async def before_status_task(self) -> None: await self.wait_until_ready() async def setup_hook(self) -> None: self.logger.info(f"Logged in as {self.user.name}") self.logger.info(f"discord.py API version: {discord.__version__}") self.logger.info(f"Python version: {platform.python_version()}") self.logger.info( f"Running on: {platform.system()} {platform.release()} ({os.name})" ) try: app_info = await self.application_info() if app_info.team: self.logger.info(f"Bot owned by team: {app_info.team.name}") for member in app_info.team.members: self.logger.info(f"Team member: {member.name} (ID: {member.id})") else: self.logger.info( f"Bot owner: {app_info.owner.name} (ID: {app_info.owner.id})" ) except Exception as e: self.logger.error(f"Error fetching application info: {e}") await self.init_db() await self.load_cogs() self.status_task.start() self.database = DatabaseManager( connection=await aiosqlite.connect( f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db" ) ) def get_uptime(self) -> str: return get_uptime(self.start_time) async def close(self) -> None: if self._shutdown: return self._shutdown = True self.logger.info("Starting shutdown process...") if self.status_task and not self.status_task.is_being_cancelled(): self.status_task.cancel() self.logger.info("Status task cancelled") if self.database and self.database.connection: try: await self.database.connection.close() self.logger.warning("Database connection closed") except Exception as e: self.logger.error(f"Error closing database connection: {e}") try: await super().close() self.logger.critical("Bot shutdown complete") except Exception as e: self.logger.error(f"Error during bot shutdown: {e}") async def on_command_completion(self, context: Context) -> None: full_command_name = context.command.qualified_name split = full_command_name.split(" ") executed_command = str(split[0]) if executed_command.lower() in ["shutdown", "say", "embed"]: return if context.guild is not None: self.logger.info( f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})" ) else: self.logger.info( f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs" ) async def on_command_error(self, context: Context, error) -> None: if isinstance(error, commands.CommandNotFound): if context.guild is not None: self.logger.info( f"Unknown command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id}): {context.message.content}" ) else: self.logger.info( f"Unknown command in DMs by {context.author} (ID: {context.author.id}): {context.message.content}" ) return if isinstance(error, commands.CommandOnCooldown): minutes, seconds = divmod(error.retry_after, 60) hours, minutes = divmod(minutes, 60) hours = hours % 24 embed = discord.Embed( description=f"**Please slow down** - You can use this command again in {f'{round(hours)} hours' if round(hours) > 0 else ''} {f'{round(minutes)} minutes' if round(minutes) > 0 else ''} {f'{round(seconds)} seconds' if round(seconds) > 0 else ''}.", color=0xE02B2B, ) await context.send(embed=embed) elif isinstance(error, commands.NotOwner): if context.guild: self.logger.warning( f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the guild {context.guild.name} (ID: {context.guild.id}), but the user is not an owner of the bot." ) else: self.logger.warning( f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the bot's DMs, but the user is not an owner of the bot." ) elif isinstance(error, commands.MissingPermissions): embed = discord.Embed( description="You are missing the permission(s) `" + ", ".join(error.missing_permissions) + "` to execute this command!", color=0xE02B2B, ) await context.send(embed=embed) elif isinstance(error, commands.BotMissingPermissions): embed = discord.Embed( description="I am missing the permission(s) `" + ", ".join(error.missing_permissions) + "` to fully perform this command!", color=0xE02B2B, ) await context.send(embed=embed) elif isinstance(error, commands.MissingRequiredArgument): embed = discord.Embed( title="Error!", description=str(error).capitalize(), color=0xE02B2B, ) await context.send(embed=embed) elif isinstance(error, commands.CheckFailure): return else: raise error bot = DiscordBot() if __name__ == "__main__": os.system("clear" if os.name == "posix" else "cls") print(ascii) setup_signal_handlers(bot) try: bot.run(os.getenv("TOKEN")) except KeyboardInterrupt: logger.info("Received keyboard interrupt") except Exception as e: logger.critical(f"Fatal error during bot execution: {type(e).__name__}: {e}")