AI-Discord-Bot/src/bot.py

547 lines
19 KiB
Python
Raw Normal View History

# bot.py
import time
import asyncio
2025-05-06 11:56:01 -04:00
import os
import discord
import yaml
import random
from dotenv import load_dotenv
from textwrap import wrap
from discord.ext import commands
from discord.ext.commands import (
cooldown,
BucketType,
CommandOnCooldown
)
from cooldown import CooldownManager
# Local imports
from scheduler import start_scheduler
2025-05-14 16:01:12 -04:00
from profilepic import set_avatar_from_bytes
from context import fetch_raw_context, format_context
from user_profiles import (
load_user_profile,
update_last_seen,
increment_interactions,
format_profile_for_block,
set_pronouns,
set_custom_prompt
)
from personality import apply_personality, set_persona, load_persona
from logger import setup_logger
from ai import (
unload_model,
load_model,
get_current_model,
get_ai_response,
TAGS_ENDPOINT
)
from ai import load_modelfile, unload_modelfile, get_modelfile_info
from time_logger import log_message_activity
from autochat import should_auto_reply, generate_auto_reply, update_reply_timer, maybe_react_to_message
debug_mode = os.getenv("DEBUG_MODE", "false").lower() == "true"
from user_profiles import format_profile_for_block as format_user_profile_block
# Setup logger and environment
logger = setup_logger("bot")
dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
load_dotenv(dotenv_path)
# No hardcoded owner IDs; use discord.py's owner check and guild admin perms.
# Message-level guard for cooldown updates (avoid double-updating during dispatch)
_cooldown_updated = set()
# Message-level guard to avoid sending the same cooldown error multiple times
_cooldown_error_sent = set()
_cooldown_recorded_for_msg = set()
# Message-level guard for generic one-shot sends (avoid duplicate command replies)
_message_sent_once = set()
# Load model settings
MODEL_NAME = os.getenv("MODEL_NAME", "llama3:latest")
logger.info(f"🔍 Loaded MODEL_NAME from .env: {MODEL_NAME}")
if debug_mode:
logger.info(f"🧹 Attempting to clear VRAM before loading {MODEL_NAME}...")
unload_model(MODEL_NAME)
if load_model(MODEL_NAME):
logger.info(f"🚀 Model `{MODEL_NAME}` preloaded on startup.")
else:
logger.warning(f"⚠️ Failed to preload model `{MODEL_NAME}`.")
logger.info(f"✅ Final model in use: {MODEL_NAME}")
# Load YAML settings
base_dir = os.path.dirname(__file__)
settings_path = os.path.join(base_dir, "settings.yml")
with open(settings_path, "r", encoding="utf-8") as f:
settings = yaml.safe_load(f)
ROAST_COOLDOWN_SECONDS = settings["cooldowns"]["roast"]
GLOBAL_COOLDOWN_SECONDS = settings["cooldowns"]["global"]
COOLDOWN_MSG_TEMPLATE = settings["messages"]["cooldown"]
2025-05-06 11:56:01 -04:00
# Configure Discord bot
TOKEN = os.getenv("DISCORD_TOKEN")
if not TOKEN:
logger.error("❌ DISCORD_TOKEN not set in .env file.")
raise SystemExit("DISCORD_TOKEN not set.")
2025-05-06 11:56:01 -04:00
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
2025-05-06 11:56:01 -04:00
# Handle cooldown errors globally
2025-05-06 11:56:01 -04:00
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, CommandOnCooldown):
retry_secs = round(error.retry_after, 1)
template = random.choice(COOLDOWN_MSG_TEMPLATE) if isinstance(COOLDOWN_MSG_TEMPLATE, list) else COOLDOWN_MSG_TEMPLATE
msg = template.replace("{seconds}", str(retry_secs))
# Prevent duplicate cooldown messages for the same triggering message
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
if msg_id is not None and msg_id in _cooldown_error_sent:
logger.debug(f"on_command_error: cooldown message already sent for msg={msg_id}")
return
logger.info(f"Command {ctx.command} on cooldown for user={getattr(ctx.author, 'id', None)}. Retry after {retry_secs} seconds.")
try:
await ctx.send(msg)
except Exception:
# ignore send failures
pass
if msg_id is not None:
_cooldown_error_sent.add(msg_id)
async def _clear_cooldown_error(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_error_sent.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_cooldown_error(msg_id))
except Exception:
pass
else:
raise error
# Global cooldown manager (per-user)
_cooldown_mgr = CooldownManager()
@bot.check
async def global_command_cooldown(ctx):
# Allow the application owner to bypass cooldowns
try:
if await bot.is_owner(ctx.author):
return True
except Exception:
pass
# Allow guild administrators / users with Manage Guild to bypass cooldowns
try:
perms = getattr(ctx.author, 'guild_permissions', None)
if perms and (perms.administrator or perms.manage_guild):
return True
except Exception:
pass
# Use a message-level guard so we only update the cooldown once per message
user_id = getattr(ctx.author, 'id', None)
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
logger.debug(f"global_command_cooldown: check user={user_id} msg={msg_id} command={getattr(ctx, 'command', None)}")
# If we've already updated cooldown for this message, allow immediately
if msg_id is not None and msg_id in _cooldown_updated:
logger.debug(f"global_command_cooldown: msg {msg_id} already updated, allow")
return True
# Check and update atomically; this will prevent races where multiple
# Use peek to inspect remaining time without updating state. The actual
# recording of the timestamp happens once the command starts (see
# `before_invoke` handler) so there's a single canonical writer.
retry = await _cooldown_mgr.peek('global', user_id, GLOBAL_COOLDOWN_SECONDS)
if retry > 0.0:
logger.info(f"global_command_cooldown: user={user_id} blocked, retry={retry}")
raise CommandOnCooldown(commands.Cooldown(1, GLOBAL_COOLDOWN_SECONDS, BucketType.user), retry)
# Mark this message as updated so repeated checks during dispatch don't re-update
if msg_id is not None:
_cooldown_updated.add(msg_id)
# schedule removal after a short grace window
async def _remove_later(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_updated.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_remove_later(msg_id))
except Exception:
# ignore if event loop not running
pass
return True
2025-05-06 11:56:01 -04:00
# Record cooldown when a command is about to execute. This centralizes the
# write side of the cooldown and prevents multiple check-and-update races.
@bot.before_invoke
async def record_global_cooldown(ctx):
try:
# bypass for owners/admins
if await bot.is_owner(ctx.author):
return
except Exception:
pass
try:
perms = getattr(ctx.author, 'guild_permissions', None)
if perms and (perms.administrator or perms.manage_guild):
return
except Exception:
pass
user_id = getattr(ctx.author, 'id', None)
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
# If we've already recorded cooldown for this message, skip (idempotent)
if msg_id is not None and msg_id in _cooldown_recorded_for_msg:
logger.debug(f"record_global_cooldown: already recorded for msg={msg_id}")
return
# Single writer: record the timestamp so future peeks will see the
# updated value.
try:
await _cooldown_mgr.record('global', user_id)
logger.debug(f"record_global_cooldown: recorded for user={user_id}")
if msg_id is not None:
_cooldown_recorded_for_msg.add(msg_id)
async def _clear_record(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_recorded_for_msg.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_record(msg_id))
except Exception:
pass
except Exception as e:
logger.debug(f"record_global_cooldown: failed to record for user={user_id}: {e}")
# Handle direct bot mentions
2025-05-14 16:01:12 -04:00
@bot.event
async def on_message(message):
# If we observe our own outgoing messages from the gateway, log them.
2025-05-14 16:01:12 -04:00
if message.author == bot.user:
try:
logger.debug(f"on_message: observed own message id={getattr(message,'id',None)} channel={getattr(getattr(message,'channel',None),'id',None)}")
except Exception:
pass
2025-05-14 16:01:12 -04:00
return
from autochat import maybe_react_to_message, generate_auto_reply
from personality import load_persona
# 👤 Load persona for reactions
persona = load_persona()
# 💬 React to message FIRST
await maybe_react_to_message(message, persona)
# 🤖 Passive reply logic
reply = await generate_auto_reply(message, bot)
if reply:
await message.channel.send(reply)
# 📣 Mention override (if bot is pinged)
2025-05-14 16:01:12 -04:00
if bot.user.mentioned_in(message):
prompt = message.content.replace(f"<@{bot.user.id}>", "").strip()
if not prompt:
return
user_id = str(message.author.id)
update_last_seen(user_id)
profile = load_user_profile(message.author)
logger.info("=" * 60 + " AI Response " + "=" * 60)
logger.info(f"🧠 Profile loaded for {profile['display_name']} (interactions: {profile['interactions']})")
context_msgs = await fetch_raw_context(message.channel)
2025-05-14 20:27:49 -04:00
formatted_context = format_context(context_msgs)
logger.info(f"📚 Retrieved {len(context_msgs)} messages for context")
2025-05-14 20:27:49 -04:00
async with message.channel.typing():
reply = get_ai_response(prompt, context=formatted_context, user_profile=profile)
2025-05-14 20:27:49 -04:00
await message.channel.send(reply)
2025-05-14 16:01:12 -04:00
await bot.process_commands(message)
# Bot startup event
2025-05-14 16:01:12 -04:00
@bot.event
async def on_ready():
print(f"✅ Logged in as {bot.user.name}")
logger.info(f"Logged in as {bot.user.name}")
for guild in bot.guilds:
me = guild.me
if me.nick != "Delta":
try:
await me.edit(nick="Delta")
logger.info(f"🔄 Renamed self to Delta in {guild.name}")
except Exception as e:
logger.warning(f"⚠️ Failed to rename in {guild.name}: {e}")
bot.loop.create_task(start_scheduler(bot))
# Commands
@bot.command(name="setprompt")
async def set_prompt_cmd(ctx, *, prompt):
set_custom_prompt(ctx.author.id, prompt)
await ctx.send("✅ Custom prompt saved.")
@bot.command(name="setpronouns")
async def set_pronouns_cmd(ctx, *, pronouns):
success = set_pronouns(ctx.author, pronouns)
if success:
await ctx.send(f"✅ Got it, {ctx.author.display_name}! Your pronouns have been updated.")
else:
await ctx.send("⚠️ Failed to update pronouns. Try interacting with Delta first to generate your profile.")
@bot.command()
async def ping(ctx):
await ctx.send("🏓 Pong!")
2025-05-06 11:56:01 -04:00
@bot.command()
async def chat(ctx, *, prompt):
await ctx.send("🤖 Thinking...")
reply = get_ai_response(prompt)
for chunk in wrap(reply, 2000):
await ctx.send(chunk)
# Modelfile admin commands -------------------------------------------------
@bot.group(name="modfile")
@commands.is_owner()
async def modfile_group(ctx):
"""Manage modelfiles at runtime. Subcommands: reload, switch, disable, info"""
if ctx.invoked_subcommand is None:
await ctx.send("Available: `!modfile reload [path]`, `!modfile switch <path>`, `!modfile disable`, `!modfile info`")
@modfile_group.command(name="reload")
@commands.is_owner()
async def modfile_reload(ctx, *, path: str = None):
"""Reload the current modelfile or load from an optional new path."""
await ctx.send("🔁 Reloading modelfile...")
ok = load_modelfile(path) if path else load_modelfile()
await ctx.send("✅ Reloaded." if ok else "❌ Failed to reload modelfile. Check logs.")
@modfile_group.command(name="switch")
@commands.is_owner()
async def modfile_switch(ctx, *, path: str):
"""Switch to a different modelfile path and load it."""
await ctx.send(f"🔁 Switching modelfile to `{path}`...")
ok = load_modelfile(path)
await ctx.send("✅ Switched and loaded." if ok else "❌ Failed to switch modelfile. Check logs.")
@modfile_group.command(name="disable")
@commands.is_owner()
async def modfile_disable(ctx):
"""Disable the active modelfile and return to persona injection."""
unload_modelfile()
await ctx.send("✅ Modelfile disabled; falling back to persona injection.")
@modfile_group.command(name="info")
@commands.is_owner()
async def modfile_info(ctx):
# Instrumentation: log invocation and message id to diagnose duplicate sends
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
logger.debug(f"modfile_info invoked: cmd={getattr(ctx, 'command', None)} user={getattr(ctx.author, 'id', None)} msg={msg_id}")
info = get_modelfile_info()
if not info:
logger.debug(f"modfile_info: no modelfile, sending informational reply for msg={msg_id}")
return await ctx.send(" No modelfile currently loaded.")
system_preview = info.get('system_preview') or ''
lines = [
f"Source: `{info.get('_source_path')}`",
f"Base model: `{info.get('base_model')}`",
f"Params: `{info.get('params')}`",
"System preview:",
"```" + system_preview + "```"]
# Use per-message idempotent send to avoid duplicate replies
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
payload = "\n".join(lines)
if msg_id is not None:
key = ("modfile_info", msg_id)
if key in _message_sent_once:
logger.debug(f"modfile_info: already sent for msg={msg_id} - skipping send")
return
logger.debug(f"modfile_info: preparing to send reply for msg={msg_id}")
_message_sent_once.add(key)
async def _clear_sent(k):
try:
await __import__('asyncio').sleep(5)
_message_sent_once.discard(k)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_sent(key))
except Exception:
pass
try:
sent = await ctx.send(payload)
try:
sent_id = getattr(sent, 'id', None)
chan = getattr(getattr(sent, 'channel', None), 'id', None)
logger.debug(f"modfile_info: sent payload for msg={msg_id} -> sent_id={sent_id} channel={chan}")
except Exception:
logger.debug(f"modfile_info: sent payload for msg={msg_id}")
except Exception as e:
logger.debug(f"modfile_info: failed to send payload for msg={msg_id}: {e}")
@modfile_group.command(name="list")
@commands.is_owner()
async def modfile_list(ctx):
"""List available modelfiles in common locations (examples/, personas/, src/)."""
base = os.path.dirname(os.path.dirname(__file__))
candidates = []
search_dirs = [
os.path.join(base, 'examples'),
os.path.join(base, 'personas'),
os.path.join(base, 'src'),
base
]
for d in search_dirs:
if not os.path.isdir(d):
continue
for fname in os.listdir(d):
if fname.endswith('.mod') or fname.endswith('.json'):
candidates.append(os.path.join(d, fname))
if not candidates:
return await ctx.send("No modelfiles found in examples/, personas/, or src/.")
lines = ["Available modelfiles:"]
for p in sorted(candidates):
lines.append(f"- `{p}`")
await ctx.send("\n".join(lines))
@bot.command()
async def setpersona(ctx, *, description):
set_persona(description)
await ctx.send("✅ Persona updated! New style will be used in replies.")
@bot.command(name='roast')
@cooldown(rate=1, per=ROAST_COOLDOWN_SECONDS, type=BucketType.user)
async def roast(ctx):
target = ctx.message.mentions[0].mention if ctx.message.mentions else ctx.author.mention
prompt = f"Roast {target}. Be dramatic, insulting, and sarcastic. Speak in your usual chaotic RGB catgirl personality."
response = get_ai_response(prompt)
await ctx.send(f"😼 {response}")
@bot.command(name="clearmodel")
async def clear_model(ctx):
model = get_current_model()
success = unload_model(model)
msg = f"✅ Unloaded model: `{model}`" if success else f"❌ Failed to unload model: `{model}`"
await ctx.send(msg)
@bot.command(name="model")
async def current_model(ctx):
model = get_current_model()
await ctx.send(f"📦 Current model: `{model}`")
@bot.command(name="setmodel")
async def set_model(ctx, *, model_name):
current_model = get_current_model()
if model_name == current_model:
return await ctx.send(f"⚠️ `{model_name}` is already active.")
await ctx.send(f"🔄 Switching from `{current_model}` to `{model_name}`…")
if unload_model(current_model):
await ctx.send(f"🧽 Unloaded `{current_model}` from VRAM.")
else:
await ctx.send(f"⚠️ Couldnt unload `{current_model}`.")
if not load_model(model_name):
return await ctx.send(f"❌ Failed to pull `{model_name}`.")
os.environ["MODEL_NAME"] = model_name
env_path = os.path.join(os.path.dirname(__file__), '..', '.env')
lines = []
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
lines.append(f"MODEL_NAME={model_name}\n" if line.startswith("MODEL_NAME=") else line)
with open(env_path, 'w', encoding='utf-8') as f:
f.writelines(lines)
await ctx.send(f"✅ Model switched to `{model_name}` and `.env` updated.")
@bot.command(name="models")
async def list_models(ctx):
import requests
try:
resp = requests.get(TAGS_ENDPOINT)
models = [m["name"] for m in resp.json().get("models", [])]
if models:
await ctx.send("🧠 Available models:\n" + "\n".join(f"- `{m}`" for m in models))
else:
await ctx.send("❌ No models found.")
except Exception as e:
await ctx.send(f"❌ Failed to fetch models: {e}")
@bot.command(name="dryrun")
@commands.is_owner()
async def dryrun(ctx, *, prompt: str):
"""Build the prompt and payload without contacting the model.
Usage: `!dryrun Your test prompt here`"""
await ctx.send("🧪 Building dry-run payload...")
from ai import build_dryrun_payload
profile = load_user_profile(ctx.author)
info = build_dryrun_payload(prompt, context=None, user_profile=profile)
prompt_preview = info['prompt'][:1500]
payload_preview = {k: info['payload'][k] for k in info['payload'] if k != 'prompt'}
lines = [
"Prompt assembled:",
"```",
prompt_preview,
"```",
"Payload params:",
"```",
str(payload_preview),
"```"
]
await ctx.send("\n".join(lines))
2025-05-14 16:01:12 -04:00
@bot.command(name="setavatar")
@commands.is_owner()
2025-05-14 16:01:12 -04:00
async def set_avatar(ctx):
if not ctx.message.attachments:
return await ctx.send("❌ Please attach an image (PNG) to use as the new avatar.")
image = ctx.message.attachments[0]
image_bytes = await image.read()
token = os.getenv("DISCORD_TOKEN")
if not token:
return await ctx.send("❌ Bot token not found in environment.")
success = set_avatar_from_bytes(image_bytes, token)
await ctx.send("✅ Avatar updated successfully!" if success else "❌ Failed to update avatar.")
# Run bot
bot.run(TOKEN)