# bot.py import os import discord import yaml from discord.ext import commands from textwrap import wrap from dotenv import load_dotenv import random import yaml from scheduler import start_scheduler from profilepic import set_avatar_from_bytes from context import fetch_recent_context, format_context from logger import setup_logger logger = setup_logger("bot") from ai import unload_model, load_model, get_current_model, get_ai_response dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env') load_dotenv(dotenv_path) logger.info(f"πŸ” Loaded MODEL_NAME from .env: {os.getenv('MODEL_NAME')}") MODEL_NAME = os.getenv("MODEL_NAME", "llama3:latest") logger.info(f"πŸ” Loaded MODEL_NAME from .env: {MODEL_NAME}") # 🧽 Try to unload any currently loaded model logger.info(f"🧹 Attempting to clear VRAM before loading {MODEL_NAME}...") unload_model(MODEL_NAME) # πŸš€ Load target model from .env if load_model(MODEL_NAME): logger.info(f"πŸš€ Model `{MODEL_NAME}` preloaded on startup.") else: logger.warning(f"⚠️ Failed to preload model `{MODEL_NAME}`.") logger.info(f"βœ… Final model in use: {MODEL_NAME}") from personality import apply_personality, set_persona from discord.ext.commands import ( cooldown, BucketType, CooldownMapping, CommandOnCooldown ) base_dir = os.path.dirname(__file__) settings_path = os.path.join(base_dir, "settings.yml") with open(settings_path, "r", encoding="utf-8") as f: settings = yaml.safe_load(f) ROAST_COOLDOWN_SECONDS = settings["cooldowns"]["roast"] GLOBAL_COOLDOWN_SECONDS = settings["cooldowns"]["global"] COOLDOWN_MSG_TEMPLATE = settings["messages"]["cooldown"] TOKEN = os.getenv("DISCORD_TOKEN") if not TOKEN: logger.error("❌ DISCORD_TOKEN not set in .env file.") raise SystemExit("DISCORD_TOKEN not set.") intents = discord.Intents.default() intents.message_content = True bot = commands.Bot(command_prefix="!", intents=intents) @bot.event async def on_command_error(ctx, error): if isinstance(error, CommandOnCooldown): retry_secs = round(error.retry_after, 1) template = random.choice(COOLDOWN_MSG_TEMPLATE) if isinstance(COOLDOWN_MSG_TEMPLATE, list) else COOLDOWN_MSG_TEMPLATE msg = template.replace("{seconds}", str(retry_secs)) logger.info(f"Command {ctx.command} on cooldown. Retry after {retry_secs} seconds.") await ctx.send(msg) else: raise error # Global cooldown bucket global_cooldown = CooldownMapping.from_cooldown(1, GLOBAL_COOLDOWN_SECONDS, BucketType.user) @bot.check async def global_command_cooldown(ctx): bucket = global_cooldown.get_bucket(ctx.message) retry_after = bucket.update_rate_limit() if retry_after: raise CommandOnCooldown(bucket, retry_after, BucketType.user) return True @bot.event async def on_message(message): if message.author == bot.user: return if bot.user.mentioned_in(message): prompt = message.content.replace(f"<@{bot.user.id}>", "").strip() context_msgs = await fetch_recent_context(message.channel) formatted_context = format_context(context_msgs) logger.info("🧠 Injected context block:\n" + formatted_context) async with message.channel.typing(): reply = get_ai_response(prompt, context=formatted_context) await message.channel.send(reply) if prompt: async with message.channel.typing(): # πŸ‘ˆ Typing indicator! response = get_ai_response(prompt) await message.channel.send(response) await bot.process_commands(message) @bot.event async def on_ready(): print(f"βœ… Logged in as {bot.user.name}") logger.info(f"Logged in as {bot.user.name}") # Optional: rename itself in servers (if it has permission) for guild in bot.guilds: me = guild.me if me.nick != "Delta": try: await me.edit(nick="Delta") logger.info(f"πŸ”„ Renamed self to Delta in {guild.name}") except Exception as e: logger.warning(f"⚠️ Failed to rename in {guild.name}: {e}") bot.loop.create_task(start_scheduler(bot)) @bot.command() async def ping(ctx): await ctx.send("πŸ“ Pong!") @bot.command() async def chat(ctx, *, prompt): await ctx.send("πŸ€– Thinking...") reply = get_ai_response(prompt) MAX_DISCORD_MESSAGE_LENGTH = 2000 # Split long replies into chunks that fit Discord limits chunks = wrap(reply, MAX_DISCORD_MESSAGE_LENGTH) # Log only if the response is being chunked if len(chunks) > 1: logger.warning(f"πŸ’¬ Splitting response into {len(chunks)} chunks due to length.") for chunk in chunks: await ctx.send(chunk) @bot.command() async def setpersona(ctx, *, description): set_persona(description) await ctx.send("βœ… Persona updated! New style will be used in replies.") @bot.command(name='roast') @cooldown(rate=1, per=ROAST_COOLDOWN_SECONDS, type=BucketType.user) async def roast(ctx): # Get the mentioned user (or fallback to the author) target = ctx.message.mentions[0].mention if ctx.message.mentions else ctx.author.mention # Build the roast prompt prompt = f"Roast {target}. Be dramatic, insulting, and sarcastic. Speak in your usual chaotic RGB catgirl personality." # Get AI response response = get_ai_response(prompt) # Send the roast back to the channel await ctx.send(f"😼 {response}") @bot.command(name="clearmodel") async def clear_model(ctx): from ai import unload_model, get_current_model model = get_current_model() success = unload_model(model) msg = f"βœ… Unloaded model: `{model}`" if success else f"❌ Failed to unload model: `{model}`" await ctx.send(msg) @bot.command(name="model") async def current_model(ctx): from ai import get_current_model model = get_current_model() await ctx.send(f"πŸ“¦ Current model: `{model}`") @bot.command(name="setmodel") async def set_model(ctx, *, model_name): from ai import get_current_model, load_model, unload_model current_model = get_current_model() if model_name == current_model: return await ctx.send(f"⚠️ `{model_name}` is already active.") await ctx.send(f"πŸ”„ Switching from `{current_model}` to `{model_name}`…") # 1) Soft-unload old model from VRAM only if unload_model(current_model): await ctx.send(f"🧽 Unloaded `{current_model}` from VRAM.") else: await ctx.send(f"⚠️ Couldn’t unload `{current_model}` (it may not have been loaded).") # 2) Load the new one if not load_model(model_name): return await ctx.send(f"❌ Failed to pull `{model_name}`. Make sure it’s in `ollama list`.") # 3) Update runtime AND .env on disk os.environ["MODEL_NAME"] = model_name env_path = os.path.join(os.path.dirname(__file__), '..', '.env') # Read and rewrite .env lines = [] with open(env_path, 'r', encoding='utf-8') as f: for line in f: if line.startswith("MODEL_NAME="): lines.append(f"MODEL_NAME={model_name}\n") else: lines.append(line) with open(env_path, 'w', encoding='utf-8') as f: f.writelines(lines) await ctx.send(f"βœ… Model switched to `{model_name}` and `.env` updated.") @bot.command(name="models") async def list_models(ctx): import requests from ai import TAGS_ENDPOINT try: resp = requests.get(TAGS_ENDPOINT) models = [m["name"] for m in resp.json().get("models", [])] if models: await ctx.send("🧠 Available models:\n" + "\n".join(f"- `{m}`" for m in models)) else: await ctx.send("❌ No models found.") except Exception as e: await ctx.send(f"❌ Failed to fetch models: {e}") @bot.command(name="setavatar") @commands.is_owner() # Only the bot owner can run this async def set_avatar(ctx): if not ctx.message.attachments: return await ctx.send("❌ Please attach an image (PNG) to use as the new avatar.") image = ctx.message.attachments[0] image_bytes = await image.read() token = os.getenv("DISCORD_TOKEN") if not token: return await ctx.send("❌ Bot token not found in environment.") success = set_avatar_from_bytes(image_bytes, token) if success: await ctx.send("βœ… Avatar updated successfully!") else: await ctx.send("❌ Failed to update avatar.") @bot.event async def on_ready(): print(f"βœ… Logged in as {bot.user.name}") logger.info(f"Logged in as {bot.user.name}") bot.loop.create_task(start_scheduler(bot)) bot.run(TOKEN)