mirror of
https://github.com/neoarz/Syntrel.git
synced 2025-12-25 11:40:12 +01:00
400 lines
15 KiB
Python
400 lines
15 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import random
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
import aiosqlite
|
|
import discord
|
|
from discord.ext import commands, tasks
|
|
from discord.ext.commands import Context
|
|
from dotenv import load_dotenv
|
|
|
|
from database import DatabaseManager
|
|
from utils.ascii_art import ascii
|
|
|
|
load_dotenv()
|
|
|
|
"""
|
|
Default Intents:
|
|
intents.emojis_and_stickers = True
|
|
intents.guild_messages = True
|
|
intents.guild_reactions = True
|
|
intents.guild_scheduled_events = True
|
|
intents.guild_typing = True
|
|
intents.guilds = True
|
|
intents.integrations = True
|
|
intents.invites = True
|
|
intents.reactions = True
|
|
intents.typing = True
|
|
intents.voice_states = True
|
|
intents.webhooks = True
|
|
|
|
intents.members = True
|
|
intents.presences = True
|
|
"""
|
|
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.bans = True
|
|
intents.dm_messages = True
|
|
intents.dm_reactions = True
|
|
intents.dm_typing = True
|
|
intents.emojis = True
|
|
intents.messages = True
|
|
|
|
|
|
|
|
|
|
class LoggingFormatter(logging.Formatter):
|
|
# Colors
|
|
black = "\x1b[30m"
|
|
red = "\x1b[31m"
|
|
green = "\x1b[32m"
|
|
yellow = "\x1b[33m"
|
|
blue = "\x1b[34m"
|
|
gray = "\x1b[38m"
|
|
# Styles
|
|
reset = "\x1b[0m"
|
|
bold = "\x1b[1m"
|
|
|
|
COLORS = {
|
|
logging.DEBUG: gray + bold,
|
|
logging.INFO: blue + bold,
|
|
logging.WARNING: yellow + bold,
|
|
logging.ERROR: red,
|
|
logging.CRITICAL: red + bold,
|
|
}
|
|
|
|
def format(self, record):
|
|
log_color = self.COLORS[record.levelno]
|
|
format = "(black){asctime}(reset) (levelcolor){levelname:<8}(reset) (green){name}(reset) {message}"
|
|
format = format.replace("(black)", self.black + self.bold)
|
|
format = format.replace("(reset)", self.reset)
|
|
format = format.replace("(levelcolor)", log_color)
|
|
format = format.replace("(green)", self.green + self.bold)
|
|
formatter = logging.Formatter(format, "%Y-%m-%d %H:%M:%S", style="{")
|
|
return formatter.format(record)
|
|
|
|
|
|
logger = logging.getLogger("discord_bot")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(LoggingFormatter())
|
|
log_file_path = os.getenv("LOG_FILE", "logs/discord.log")
|
|
log_dir = os.path.dirname(log_file_path)
|
|
if log_dir:
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
file_handler = logging.FileHandler(filename=log_file_path, encoding="utf-8", mode="w")
|
|
file_handler_formatter = logging.Formatter(
|
|
"[{asctime}] [{levelname:<8}] {name}: {message}", "%Y-%m-%d %H:%M:%S", style="{"
|
|
)
|
|
file_handler.setFormatter(file_handler_formatter)
|
|
|
|
logger.addHandler(console_handler)
|
|
logger.addHandler(file_handler)
|
|
|
|
|
|
class DiscordBot(commands.Bot):
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
command_prefix=commands.when_mentioned_or(os.getenv("PREFIX")),
|
|
intents=intents,
|
|
help_command=None,
|
|
)
|
|
"""
|
|
This creates custom bot variables so that we can access these variables in cogs more easily.
|
|
|
|
For example, The logger is available using the following code:
|
|
- self.logger # In this class
|
|
- bot.logger # In this file
|
|
- self.bot.logger # In cogs
|
|
"""
|
|
self.logger = logger
|
|
self.database = None
|
|
self.bot_prefix = os.getenv("PREFIX")
|
|
self.invite_link = os.getenv("INVITE_LINK")
|
|
self.start_time = time.time()
|
|
self._shutdown = False
|
|
|
|
async def init_db(self) -> None:
|
|
async with aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
) as db:
|
|
with open(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/schema.sql",
|
|
encoding = "utf-8"
|
|
) as file:
|
|
await db.executescript(file.read())
|
|
await db.commit()
|
|
|
|
async def load_cogs(self) -> None:
|
|
"""
|
|
The code in this function is executed whenever the bot will start.
|
|
"""
|
|
cogs_path = f"{os.path.realpath(os.path.dirname(__file__))}/cogs"
|
|
disabled_env = os.getenv("DISABLED_COGS", "")
|
|
disabled_cogs = {entry.strip().lower() for entry in disabled_env.split(",") if entry.strip()}
|
|
|
|
for folder in os.listdir(cogs_path):
|
|
folder_path = os.path.join(cogs_path, folder)
|
|
if os.path.isdir(folder_path) and not folder.startswith('__'):
|
|
for file in os.listdir(folder_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
full_name = f"{folder}.{extension}".lower()
|
|
if extension.lower() in disabled_cogs or full_name in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{full_name}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{folder}.{extension}")
|
|
self.logger.info(f"Loaded extension '{folder}.{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {folder}.{extension}\n{exception}"
|
|
)
|
|
|
|
for file in os.listdir(cogs_path):
|
|
if file.endswith(".py") and not file.startswith('__'):
|
|
extension = file[:-3]
|
|
if extension.lower() in disabled_cogs:
|
|
self.logger.info(f"Skipped disabled extension '{extension}'")
|
|
continue
|
|
try:
|
|
await self.load_extension(f"cogs.{extension}")
|
|
self.logger.info(f"Loaded extension '{extension}'")
|
|
except Exception as e:
|
|
exception = f"{type(e).__name__}: {e}"
|
|
self.logger.error(
|
|
f"Failed to load extension {extension}\n{exception}"
|
|
)
|
|
|
|
@tasks.loop(minutes=1.0)
|
|
async def status_task(self) -> None:
|
|
"""
|
|
Setup the game status task of the bot.
|
|
"""
|
|
statuses = ["SideStore", "MeloNX", "ARMSX2", "StikDebug", "Feather"]
|
|
await self.change_presence(activity=discord.Game(random.choice(statuses)))
|
|
|
|
@status_task.before_loop
|
|
async def before_status_task(self) -> None:
|
|
"""
|
|
Before starting the status changing task, we make sure the bot is ready
|
|
"""
|
|
await self.wait_until_ready()
|
|
|
|
async def setup_hook(self) -> None:
|
|
"""
|
|
This will just be executed when the bot starts the first time.
|
|
"""
|
|
self.logger.info(f"Logged in as {self.user.name}")
|
|
self.logger.info(f"discord.py API version: {discord.__version__}")
|
|
self.logger.info(f"Python version: {platform.python_version()}")
|
|
self.logger.info(
|
|
f"Running on: {platform.system()} {platform.release()} ({os.name})"
|
|
)
|
|
|
|
try:
|
|
app_info = await self.application_info()
|
|
if app_info.team:
|
|
self.logger.info(f"Bot owned by team: {app_info.team.name}")
|
|
for member in app_info.team.members:
|
|
self.logger.info(f"Team member: {member.name} (ID: {member.id})")
|
|
else:
|
|
self.logger.info(f"Bot owner: {app_info.owner.name} (ID: {app_info.owner.id})")
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching application info: {e}")
|
|
|
|
|
|
await self.init_db()
|
|
await self.load_cogs()
|
|
self.status_task.start()
|
|
self.database = DatabaseManager(
|
|
connection=await aiosqlite.connect(
|
|
f"{os.path.realpath(os.path.dirname(__file__))}/database/database.db"
|
|
)
|
|
)
|
|
|
|
def get_uptime(self) -> str:
|
|
uptime_seconds = int(time.time() - self.start_time)
|
|
days = uptime_seconds // 86400
|
|
hours = (uptime_seconds % 86400) // 3600
|
|
minutes = (uptime_seconds % 3600) // 60
|
|
seconds = uptime_seconds % 60
|
|
|
|
if days > 0:
|
|
return f"{days}d {hours}h {minutes}m {seconds}s"
|
|
elif hours > 0:
|
|
return f"{hours}h {minutes}m {seconds}s"
|
|
elif minutes > 0:
|
|
return f"{minutes}m {seconds}s"
|
|
else:
|
|
return f"{seconds}s"
|
|
|
|
async def close(self) -> None:
|
|
if self._shutdown:
|
|
return
|
|
self._shutdown = True
|
|
|
|
self.logger.info("Starting shutdown process...")
|
|
|
|
if self.status_task and not self.status_task.is_being_cancelled():
|
|
self.status_task.cancel()
|
|
self.logger.info("Status task cancelled")
|
|
|
|
if self.database and self.database.connection:
|
|
try:
|
|
await self.database.connection.close()
|
|
self.logger.warning("Database connection closed")
|
|
except Exception as e:
|
|
self.logger.error(f"Error closing database connection: {e}")
|
|
|
|
try:
|
|
await super().close()
|
|
self.logger.critical("Bot shutdown complete")
|
|
except Exception as e:
|
|
self.logger.error(f"Error during bot shutdown: {e}")
|
|
|
|
async def on_message(self, message: discord.Message) -> None:
|
|
"""
|
|
The code in this event is executed every time someone sends a message, with or without the prefix
|
|
|
|
:param message: The message that was sent.
|
|
"""
|
|
if message.author == self.user or message.author.bot:
|
|
return
|
|
|
|
# Reacts to messages which mention the bot
|
|
if self.user in message.mentions:
|
|
try:
|
|
emoji_string = "<a:PandaPing:1417550314260926575>"
|
|
self.logger.debug(f"Attempting to react with PandaPing emoji: {emoji_string}")
|
|
await message.add_reaction(emoji_string)
|
|
self.logger.debug("Successfully reacted with PandaPing emoji")
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to react with PandaPing emoji: {e}")
|
|
try:
|
|
self.logger.debug("Falling back to wave emoji")
|
|
await message.add_reaction("👋")
|
|
self.logger.debug("Successfully reacted with wave emoji")
|
|
except Exception as fallback_error:
|
|
self.logger.debug(f"Failed to react with fallback emoji: {fallback_error}")
|
|
await self.process_commands(message)
|
|
|
|
async def on_command_completion(self, context: Context) -> None:
|
|
"""
|
|
The code in this event is executed every time a normal command has been *successfully* executed.
|
|
|
|
:param context: The context of the command that has been executed.
|
|
"""
|
|
full_command_name = context.command.qualified_name
|
|
split = full_command_name.split(" ")
|
|
executed_command = str(split[0])
|
|
|
|
# Skip logging for shutdown command as it logs manually before shutdown
|
|
if executed_command.lower() == "shutdown":
|
|
return
|
|
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id})"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Executed {executed_command} command by {context.author} (ID: {context.author.id}) in DMs"
|
|
)
|
|
|
|
async def on_command_error(self, context: Context, error) -> None:
|
|
"""
|
|
The code in this event is executed every time a normal valid command catches an error.
|
|
|
|
:param context: The context of the normal command that failed executing.
|
|
:param error: The error that has been faced.
|
|
"""
|
|
if isinstance(error, commands.CommandNotFound):
|
|
if context.guild is not None:
|
|
self.logger.info(
|
|
f"Unknown command in {context.guild.name} (ID: {context.guild.id}) by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Unknown command in DMs by {context.author} (ID: {context.author.id}): {context.message.content}"
|
|
)
|
|
return
|
|
if isinstance(error, commands.CommandOnCooldown):
|
|
minutes, seconds = divmod(error.retry_after, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
hours = hours % 24
|
|
embed = discord.Embed(
|
|
description=f"**Please slow down** - You can use this command again in {f'{round(hours)} hours' if round(hours) > 0 else ''} {f'{round(minutes)} minutes' if round(minutes) > 0 else ''} {f'{round(seconds)} seconds' if round(seconds) > 0 else ''}.",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.NotOwner):
|
|
if context.guild:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the guild {context.guild.name} (ID: {context.guild.id}), but the user is not an owner of the bot."
|
|
)
|
|
else:
|
|
self.logger.warning(
|
|
f"{context.author} (ID: {context.author.id}) tried to execute an owner only command in the bot's DMs, but the user is not an owner of the bot."
|
|
)
|
|
elif isinstance(error, commands.MissingPermissions):
|
|
embed = discord.Embed(
|
|
description="You are missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to execute this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.BotMissingPermissions):
|
|
embed = discord.Embed(
|
|
description="I am missing the permission(s) `"
|
|
+ ", ".join(error.missing_permissions)
|
|
+ "` to fully perform this command!",
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
elif isinstance(error, commands.MissingRequiredArgument):
|
|
embed = discord.Embed(
|
|
title="Error!",
|
|
# We need to capitalize because the command arguments have no capital letter in the code and they are the first word in the error message.
|
|
description=str(error).capitalize(),
|
|
color=0xE02B2B,
|
|
)
|
|
await context.send(embed=embed)
|
|
else:
|
|
raise error
|
|
|
|
|
|
def signal_handler(signum, frame):
|
|
logger.info("Shutdown requested. Closing bot...")
|
|
if bot.loop and not bot.loop.is_closed():
|
|
asyncio.create_task(bot.close())
|
|
bot.loop.call_soon_threadsafe(bot.loop.stop)
|
|
|
|
|
|
bot = DiscordBot()
|
|
|
|
if __name__ == "__main__":
|
|
os.system('clear' if os.name == 'posix' else 'cls')
|
|
|
|
print(ascii)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
try:
|
|
bot.run(os.getenv("TOKEN"))
|
|
except KeyboardInterrupt:
|
|
pass
|
|
except:
|
|
pass
|