mirror of
https://github.com/neoarz/Syntrel.git
synced 2025-12-25 03:40:11 +01:00
Added a _require_group_prefix check to all hybrid commands in each cog to ensure commands are only executed when invoked with the appropriate group prefix. Also updated error handling in bot.py to silently ignore CheckFailure errors. This improves command organization and prevents accidental command execution outside their intended context.
307 lines
12 KiB
Python
307 lines
12 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import random
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
import aiosqlite
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord.ext.commands import Context
|
|
from dotenv import load_dotenv
|
|
|
|
from database import DatabaseManager
|
|
from utils import ascii, setup_logger, get_uptime, setup_signal_handlers
|
|
|
|
load_dotenv()
|
|
|
|
"""
|
|
Default Intents:
|
|
intents.emojis_and_stickers = True
|
|
intents.guild_messages = True
|
|
intents.guild_reactions = True
|
|
intents.guild_scheduled_events = True
|
|
intents.guild_typing = True
|
|
intents.guilds = True
|
|
intents.integrations = True
|
|
intents.invites = True
|
|
intents.reactions = True
|
|
intents.typing = True
|
|
intents.voice_states = True
|
|
intents.webhooks = True
|
|
|
|
intents.members = True
|
|
intents.presences = True
|
|
"""
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.bans = True
|
|
intents.dm_messages = True
|
|
intents.dm_reactions = True
|
|
intents.dm_typing = True
|
|
intents.emojis = True
|
|
intents.messages = True
|
|
|
|
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class DiscordBot(commands.Bot):
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
command_prefix=commands.when_mentioned_or(os.getenv("PREFIX")),
|
|
intents=intents,
|
|
help_command=None,
|
|
)
|
|
self.logger = logger
|
|
self.database = None
|
|
self.bot_prefix = os.getenv("PREFIX")
|
|
self.invite_link = os.getenv("INVITE_LINK")
|
|
self.start_time = time.time()
|
|
self._shutdown = False
|
|
|
|
async def init_db(self) -> None:
|
|
async with aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
) as db:
|
|
with open(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/schema.sql",
|
|
encoding = "utf-8"
|
|
) as file:
|
|
await db.executescript(file.read())
|
|
await db.commit()
|
|
|
|
async def load_cogs(self) -> None:
|
|
cogs_path = f"{os.path.realpath(os.path.dirname(__file__))}/cogs"
|
|
disabled_env = os.getenv("DISABLED_COGS", "")
|
|
disabled_cogs = {entry.strip().lower() for entry in disabled_env.split(",") if entry.strip()}
|
|
|
|
for folder in os.listdir(cogs_path):
|
|
folder_path = os.path.join(cogs_path, folder)
|
|
if os.path.isdir(folder_path) and not folder.startswith('__'):
|
|
init_file = os.path.join(folder_path, "__init__.py")
|
|
if os.path.exists(init_file):
|
|
try:
|
|
await self.load_extension(f"cogs.{folder}")
|
|
if folder in ["owner", "help"]:
|
|
self.logger.info(f"Loaded extension '{folder}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {folder}\n{exception}"
|
|
)
|
|
else:
|
|
for file in os.listdir(folder_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
full_name = f"{folder}.{extension}".lower()
|
|
if extension.lower() in disabled_cogs or full_name in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{full_name}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{folder}.{extension}")
|
|
self.logger.info(f"Loaded extension '{folder}.{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {folder}.{extension}\n{exception}"
|
|
)
|
|
|
|
for file in os.listdir(cogs_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
if extension.lower() in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{extension}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{extension}")
|
|
self.logger.info(f"Loaded extension '{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {extension}\n{exception}"
|
|
)
|
|
|
|
@tasks.loop(minutes=1.0)
|
|
async def status_task(self) -> None:
|
|
statuses = ["SideStore", "MeloNX", "ARMSX2", "StikDebug", "Feather"]
|
|
await self.change_presence(activity=discord.Game(random.choice(statuses)))
|
|
|
|
@status_task.before_loop
|
|
async def before_status_task(self) -> None:
|
|
await self.wait_until_ready()
|
|
|
|
async def setup_hook(self) -> None:
|
|
self.logger.info(f"Logged in as {self.user.name}")
|
|
self.logger.info(f"discord.py API version: {discord.__version__}")
|
|
self.logger.info(f"Python version: {platform.python_version()}")
|
|
self.logger.info(
|
|
f"Running on: {platform.system()} {platform.release()} ({os.name})"
|
|
)
|
|
|
|
try:
|
|
app_info = await self.application_info()
|
|
if app_info.team:
|
|
self.logger.info(f"Bot owned by team: {app_info.team.name}")
|
|
for member in app_info.team.members:
|
|
self.logger.info(f"Team member: {member.name} (ID: {member.id})")
|
|
else:
|
|
self.logger.info(f"Bot owner: {app_info.owner.name} (ID: {app_info.owner.id})")
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching application info: {e}")
|
|
|
|
|
|
await self.init_db()
|
|
await self.load_cogs()
|
|
self.status_task.start()
|
|
self.database = DatabaseManager(
|
|
connection=await aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
)
|
|
)
|
|
|
|
def get_uptime(self) -> str:
|
|
return get_uptime(self.start_time)
|
|
|
|
async def close(self) -> None:
|
|
if self._shutdown:
|
|
return
|
|
self._shutdown = True
|
|
|
|
self.logger.info("Starting shutdown process...")
|
|
|
|
if self.status_task and not self.status_task.is_being_cancelled():
|
|
self.status_task.cancel()
|
|
self.logger.info("Status task cancelled")
|
|
|
|
if self.database and self.database.connection:
|
|
try:
|
|
await self.database.connection.close()
|
|
self.logger.warning("Database connection closed")
|
|
except Exception as e:
|
|
self.logger.error(f"Error closing database connection: {e}")
|
|
|
|
try:
|
|
await super().close()
|
|
self.logger.critical("Bot shutdown complete")
|
|
except Exception as e:
|
|
self.logger.error(f"Error during bot shutdown: {e}")
|
|
|
|
async def on_message(self, message: discord.Message) -> None:
|
|
if message.author == self.user or message.author.bot:
|
|
return
|
|
|
|
if self.user in message.mentions:
|
|
try:
|
|
emoji_string = "<a:PandaPing:1417550314260926575>"
|
|
self.logger.debug(f"Attempting to react with PandaPing emoji: {emoji_string}")
|
|
await message.add_reaction(emoji_string)
|
|
self.logger.debug("Successfully reacted with PandaPing emoji")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to react with PandaPing emoji: {e}")
|
|
try:
|
|
self.logger.debug("Falling back to wave emoji")
|
|
await message.add_reaction("👋")
|
|
self.logger.debug("Successfully reacted with wave emoji")
|
|
except Exception as fallback_error:
|
|
self.logger.debug(f"Failed to react with fallback emoji: {fallback_error}")
|
|
await self.process_commands(message)
|
|
|
|
async def on_command_completion(self, context: Context) -> None:
|
|
full_command_name = context.command.qualified_name
|
|
split = full_command_name.split(" ")
|
|
executed_command = str(split[0])
|
|
|
|
if executed_command.lower() == "shutdown":
|
|
return
|
|
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs"
|
|
)
|
|
|
|
async def on_command_error(self, context: Context, error) -> None:
|
|
if isinstance(error, commands.CommandNotFound):
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Unknown command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Unknown command in DMs by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
return
|
|
if isinstance(error, commands.CommandOnCooldown):
|
|
minutes, seconds = divmod(error.retry_after, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
hours = hours % 24
|
|
embed = discord.Embed(
|
|
description=f"**Please slow down** - You can use this command again in {f'{round(hours)} hours' if round(hours) > 0 else ''} {f'{round(minutes)} minutes' if round(minutes) > 0 else ''} {f'{round(seconds)} seconds' if round(seconds) > 0 else ''}.",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.NotOwner):
|
|
if context.guild:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the guild {context.guild.name} (ID: {context.guild.id}), but the user is not an owner of the bot."
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the bot's DMs, but the user is not an owner of the bot."
|
|
)
|
|
elif isinstance(error, commands.MissingPermissions):
|
|
embed = discord.Embed(
|
|
description="You are missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to execute this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.BotMissingPermissions):
|
|
embed = discord.Embed(
|
|
description="I am missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to fully perform this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
embed = discord.Embed(
|
|
title="Error!",
|
|
description=str(error).capitalize(),
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.CheckFailure):
|
|
return
|
|
else:
|
|
raise error
|
|
|
|
|
|
bot = DiscordBot()
|
|
|
|
if __name__ == "__main__":
|
|
os.system('clear' if os.name == 'posix' else 'cls')
|
|
|
|
print(ascii)
|
|
|
|
setup_signal_handlers(bot)
|
|
|
|
try:
|
|
bot.run(os.getenv("TOKEN"))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except:
|
|
pass
|