AI-Discord-Bot/src/bot.py
milo 5f8c93ff69 🗄️ Add SQLite database system with JSON fallback and memory controls
Implement configurable database backends (SQLite/JSON) with unified memory
management, automated migration, Docker support, and privacy controls.
Maintains full backward compatibility while enabling future PostgreSQL/ChromaDB.
2025-10-10 13:04:48 -04:00

588 lines
21 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# bot.py
import time
import asyncio
import os
import discord
import yaml
import random
from dotenv import load_dotenv
from textwrap import wrap
from discord.ext import commands
from discord.ext.commands import (
cooldown,
BucketType,
CommandOnCooldown
)
from cooldown import CooldownManager
# Local imports
from scheduler import start_scheduler
from profilepic import set_avatar_from_bytes
from context import fetch_raw_context, format_context
from user_profiles import (
load_user_profile,
update_last_seen,
increment_interactions,
format_profile_for_block,
set_pronouns,
set_custom_prompt
)
from personality import apply_personality, set_persona, load_persona
from logger import setup_logger
from ai import (
unload_model,
load_model,
get_current_model,
get_ai_response,
TAGS_ENDPOINT
)
from enhanced_ai import get_ai_response_with_memory, analyze_user_message_for_memory
from ai import load_modelfile, unload_modelfile, get_modelfile_info
from time_logger import log_message_activity
from autochat import should_auto_reply, generate_auto_reply, update_reply_timer, maybe_react_to_message
debug_mode = os.getenv("DEBUG_MODE", "false").lower() == "true"
from user_profiles import format_profile_for_block as format_user_profile_block
# Setup logger and environment
logger = setup_logger("bot")
dotenv_path = os.path.join(os.path.dirname(__file__), '..', '.env')
load_dotenv(dotenv_path)
# No hardcoded owner IDs; use discord.py's owner check and guild admin perms.
# Message-level guard for cooldown updates (avoid double-updating during dispatch)
_cooldown_updated = set()
# Message-level guard to avoid sending the same cooldown error multiple times
_cooldown_error_sent = set()
_cooldown_recorded_for_msg = set()
# Message-level guard for generic one-shot sends (avoid duplicate command replies)
_message_sent_once = set()
# Load model settings
MODEL_NAME = os.getenv("MODEL_NAME", "llama3:latest")
logger.info(f"🔍 Loaded MODEL_NAME from .env: {MODEL_NAME}")
if debug_mode:
logger.info(f"🧹 Attempting to clear VRAM before loading {MODEL_NAME}...")
unload_model(MODEL_NAME)
if load_model(MODEL_NAME):
logger.info(f"🚀 Model `{MODEL_NAME}` preloaded on startup.")
else:
logger.warning(f"⚠️ Failed to preload model `{MODEL_NAME}`.")
logger.info(f"✅ Final model in use: {MODEL_NAME}")
# Load YAML settings
base_dir = os.path.dirname(__file__)
settings_path = os.path.join(base_dir, "settings.yml")
with open(settings_path, "r", encoding="utf-8") as f:
settings = yaml.safe_load(f)
ROAST_COOLDOWN_SECONDS = settings["cooldowns"]["roast"]
GLOBAL_COOLDOWN_SECONDS = settings["cooldowns"]["global"]
COOLDOWN_MSG_TEMPLATE = settings["messages"]["cooldown"]
# Configure Discord bot
TOKEN = os.getenv("DISCORD_TOKEN")
if not TOKEN:
logger.error("❌ DISCORD_TOKEN not set in .env file.")
raise SystemExit("DISCORD_TOKEN not set.")
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
# Handle cooldown errors globally
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, CommandOnCooldown):
retry_secs = round(error.retry_after, 1)
template = random.choice(COOLDOWN_MSG_TEMPLATE) if isinstance(COOLDOWN_MSG_TEMPLATE, list) else COOLDOWN_MSG_TEMPLATE
msg = template.replace("{seconds}", str(retry_secs))
# Prevent duplicate cooldown messages for the same triggering message
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
if msg_id is not None and msg_id in _cooldown_error_sent:
logger.debug(f"on_command_error: cooldown message already sent for msg={msg_id}")
return
logger.info(f"Command {ctx.command} on cooldown for user={getattr(ctx.author, 'id', None)}. Retry after {retry_secs} seconds.")
try:
await ctx.send(msg)
except Exception:
# ignore send failures
pass
if msg_id is not None:
_cooldown_error_sent.add(msg_id)
async def _clear_cooldown_error(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_error_sent.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_cooldown_error(msg_id))
except Exception:
pass
else:
raise error
# Global cooldown manager (per-user)
_cooldown_mgr = CooldownManager()
@bot.check
async def global_command_cooldown(ctx):
# Allow the application owner to bypass cooldowns
try:
if await bot.is_owner(ctx.author):
return True
except Exception:
pass
# Allow guild administrators / users with Manage Guild to bypass cooldowns
try:
perms = getattr(ctx.author, 'guild_permissions', None)
if perms and (perms.administrator or perms.manage_guild):
return True
except Exception:
pass
# Use a message-level guard so we only update the cooldown once per message
user_id = getattr(ctx.author, 'id', None)
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
logger.debug(f"global_command_cooldown: check user={user_id} msg={msg_id} command={getattr(ctx, 'command', None)}")
# If we've already updated cooldown for this message, allow immediately
if msg_id is not None and msg_id in _cooldown_updated:
logger.debug(f"global_command_cooldown: msg {msg_id} already updated, allow")
return True
# Check and update atomically; this will prevent races where multiple
# Use peek to inspect remaining time without updating state. The actual
# recording of the timestamp happens once the command starts (see
# `before_invoke` handler) so there's a single canonical writer.
retry = await _cooldown_mgr.peek('global', user_id, GLOBAL_COOLDOWN_SECONDS)
if retry > 0.0:
logger.info(f"global_command_cooldown: user={user_id} blocked, retry={retry}")
raise CommandOnCooldown(commands.Cooldown(1, GLOBAL_COOLDOWN_SECONDS, BucketType.user), retry)
# Mark this message as updated so repeated checks during dispatch don't re-update
if msg_id is not None:
_cooldown_updated.add(msg_id)
# schedule removal after a short grace window
async def _remove_later(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_updated.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_remove_later(msg_id))
except Exception:
# ignore if event loop not running
pass
return True
# Record cooldown when a command is about to execute. This centralizes the
# write side of the cooldown and prevents multiple check-and-update races.
@bot.before_invoke
async def record_global_cooldown(ctx):
try:
# bypass for owners/admins
if await bot.is_owner(ctx.author):
return
except Exception:
pass
try:
perms = getattr(ctx.author, 'guild_permissions', None)
if perms and (perms.administrator or perms.manage_guild):
return
except Exception:
pass
user_id = getattr(ctx.author, 'id', None)
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
# If we've already recorded cooldown for this message, skip (idempotent)
if msg_id is not None and msg_id in _cooldown_recorded_for_msg:
logger.debug(f"record_global_cooldown: already recorded for msg={msg_id}")
return
# Single writer: record the timestamp so future peeks will see the
# updated value.
try:
await _cooldown_mgr.record('global', user_id)
logger.debug(f"record_global_cooldown: recorded for user={user_id}")
if msg_id is not None:
_cooldown_recorded_for_msg.add(msg_id)
async def _clear_record(mid):
try:
await __import__('asyncio').sleep(5)
_cooldown_recorded_for_msg.discard(mid)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_record(msg_id))
except Exception:
pass
except Exception as e:
logger.debug(f"record_global_cooldown: failed to record for user={user_id}: {e}")
# Handle direct bot mentions
@bot.event
async def on_message(message):
# If we observe our own outgoing messages from the gateway, log them.
if message.author == bot.user:
try:
logger.debug(f"on_message: observed own message id={getattr(message,'id',None)} channel={getattr(getattr(message,'channel',None),'id',None)}")
except Exception:
pass
return
from autochat import maybe_react_to_message, generate_auto_reply
from personality import load_persona
# 👤 Load persona for reactions
persona = load_persona()
# 💬 React to message FIRST
await maybe_react_to_message(message, persona)
# 🤖 Passive reply logic
reply = await generate_auto_reply(message, bot)
if reply:
await message.channel.send(reply)
# 📣 Mention override (if bot is pinged)
if bot.user.mentioned_in(message):
prompt = message.content.replace(f"<@{bot.user.id}>", "").strip()
if not prompt:
return
user_id = str(message.author.id)
update_last_seen(user_id)
profile = load_user_profile(message.author)
logger.info("=" * 60 + " AI Response " + "=" * 60)
logger.info(f"🧠 Profile loaded for {profile['display_name']} (interactions: {profile['interactions']})")
context_msgs = await fetch_raw_context(message.channel)
formatted_context = format_context(context_msgs)
logger.info(f"📚 Retrieved {len(context_msgs)} messages for context")
async with message.channel.typing():
# Use memory-enhanced AI response
reply = get_ai_response_with_memory(
prompt,
context=context_msgs, # Pass raw messages for better context
user_profile=profile,
message=message
)
await message.channel.send(reply)
await bot.process_commands(message)
# Bot startup event
@bot.event
async def on_ready():
print(f"✅ Logged in as {bot.user.name}")
logger.info(f"Logged in as {bot.user.name}")
for guild in bot.guilds:
me = guild.me
if me.nick != "Delta":
try:
await me.edit(nick="Delta")
logger.info(f"🔄 Renamed self to Delta in {guild.name}")
except Exception as e:
logger.warning(f"⚠️ Failed to rename in {guild.name}: {e}")
bot.loop.create_task(start_scheduler(bot))
# Commands
@bot.command(name="setprompt")
async def set_prompt_cmd(ctx, *, prompt):
set_custom_prompt(ctx.author.id, prompt)
await ctx.send("✅ Custom prompt saved.")
@bot.command(name="setpronouns")
async def set_pronouns_cmd(ctx, *, pronouns):
success = set_pronouns(ctx.author, pronouns)
if success:
await ctx.send(f"✅ Got it, {ctx.author.display_name}! Your pronouns have been updated.")
else:
await ctx.send("⚠️ Failed to update pronouns. Try interacting with Delta first to generate your profile.")
@bot.command()
async def ping(ctx):
await ctx.send("🏓 Pong!")
@bot.command()
async def chat(ctx, *, prompt):
await ctx.send("🤖 Thinking...")
reply = get_ai_response(prompt)
for chunk in wrap(reply, 2000):
await ctx.send(chunk)
# Modelfile admin commands -------------------------------------------------
@bot.group(name="modfile")
@commands.is_owner()
async def modfile_group(ctx):
"""Manage modelfiles at runtime. Subcommands: reload, switch, disable, info"""
if ctx.invoked_subcommand is None:
await ctx.send("Available: `!modfile reload [path]`, `!modfile switch <path>`, `!modfile disable`, `!modfile info`")
@modfile_group.command(name="reload")
@commands.is_owner()
async def modfile_reload(ctx, *, path: str = None):
"""Reload the current modelfile or load from an optional new path."""
await ctx.send("🔁 Reloading modelfile...")
ok = load_modelfile(path) if path else load_modelfile()
await ctx.send("✅ Reloaded." if ok else "❌ Failed to reload modelfile. Check logs.")
@modfile_group.command(name="switch")
@commands.is_owner()
async def modfile_switch(ctx, *, path: str):
"""Switch to a different modelfile path and load it."""
await ctx.send(f"🔁 Switching modelfile to `{path}`...")
ok = load_modelfile(path)
await ctx.send("✅ Switched and loaded." if ok else "❌ Failed to switch modelfile. Check logs.")
@modfile_group.command(name="disable")
@commands.is_owner()
async def modfile_disable(ctx):
"""Disable the active modelfile and return to persona injection."""
unload_modelfile()
await ctx.send("✅ Modelfile disabled; falling back to persona injection.")
@modfile_group.command(name="info")
@commands.is_owner()
async def modfile_info(ctx):
# Instrumentation: log invocation and message id to diagnose duplicate sends
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
logger.debug(f"modfile_info invoked: cmd={getattr(ctx, 'command', None)} user={getattr(ctx.author, 'id', None)} msg={msg_id}")
info = get_modelfile_info()
if not info:
logger.debug(f"modfile_info: no modelfile, sending informational reply for msg={msg_id}")
return await ctx.send(" No modelfile currently loaded.")
system_preview = info.get('system_preview') or ''
lines = [
f"Source: `{info.get('_source_path')}`",
f"Base model: `{info.get('base_model')}`",
f"Params: `{info.get('params')}`",
"System preview:",
"```" + system_preview + "```"]
# Use per-message idempotent send to avoid duplicate replies
msg_id = getattr(getattr(ctx, 'message', None), 'id', None)
payload = "\n".join(lines)
if msg_id is not None:
key = ("modfile_info", msg_id)
if key in _message_sent_once:
logger.debug(f"modfile_info: already sent for msg={msg_id} - skipping send")
return
logger.debug(f"modfile_info: preparing to send reply for msg={msg_id}")
_message_sent_once.add(key)
async def _clear_sent(k):
try:
await __import__('asyncio').sleep(5)
_message_sent_once.discard(k)
except Exception:
pass
try:
__import__('asyncio').create_task(_clear_sent(key))
except Exception:
pass
try:
sent = await ctx.send(payload)
try:
sent_id = getattr(sent, 'id', None)
chan = getattr(getattr(sent, 'channel', None), 'id', None)
logger.debug(f"modfile_info: sent payload for msg={msg_id} -> sent_id={sent_id} channel={chan}")
except Exception:
logger.debug(f"modfile_info: sent payload for msg={msg_id}")
except Exception as e:
logger.debug(f"modfile_info: failed to send payload for msg={msg_id}: {e}")
@modfile_group.command(name="list")
@commands.is_owner()
async def modfile_list(ctx):
"""List available modelfiles in common locations (examples/, personas/, src/)."""
base = os.path.dirname(os.path.dirname(__file__))
candidates = []
search_dirs = [
os.path.join(base, 'examples'),
os.path.join(base, 'personas'),
os.path.join(base, 'src'),
base
]
for d in search_dirs:
if not os.path.isdir(d):
continue
for fname in os.listdir(d):
if fname.endswith('.mod') or fname.endswith('.json'):
candidates.append(os.path.join(d, fname))
if not candidates:
return await ctx.send("No modelfiles found in examples/, personas/, or src/.")
lines = ["Available modelfiles:"]
for p in sorted(candidates):
lines.append(f"- `{p}`")
await ctx.send("\n".join(lines))
@bot.command()
async def setpersona(ctx, *, description):
set_persona(description)
await ctx.send("✅ Persona updated! New style will be used in replies.")
@bot.command(name='roast')
@cooldown(rate=1, per=ROAST_COOLDOWN_SECONDS, type=BucketType.user)
async def roast(ctx):
target = ctx.message.mentions[0].mention if ctx.message.mentions else ctx.author.mention
prompt = f"Roast {target}. Be dramatic, insulting, and sarcastic. Speak in your usual chaotic RGB catgirl personality."
response = get_ai_response(prompt)
await ctx.send(f"😼 {response}")
@bot.command(name="clearmodel")
async def clear_model(ctx):
model = get_current_model()
success = unload_model(model)
msg = f"✅ Unloaded model: `{model}`" if success else f"❌ Failed to unload model: `{model}`"
await ctx.send(msg)
@bot.command(name="model")
async def current_model(ctx):
model = get_current_model()
await ctx.send(f"📦 Current model: `{model}`")
@bot.command(name="setmodel")
async def set_model(ctx, *, model_name):
current_model = get_current_model()
if model_name == current_model:
return await ctx.send(f"⚠️ `{model_name}` is already active.")
await ctx.send(f"🔄 Switching from `{current_model}` to `{model_name}`…")
if unload_model(current_model):
await ctx.send(f"🧽 Unloaded `{current_model}` from VRAM.")
else:
await ctx.send(f"⚠️ Couldnt unload `{current_model}`.")
if not load_model(model_name):
return await ctx.send(f"❌ Failed to pull `{model_name}`.")
os.environ["MODEL_NAME"] = model_name
env_path = os.path.join(os.path.dirname(__file__), '..', '.env')
lines = []
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
lines.append(f"MODEL_NAME={model_name}\n" if line.startswith("MODEL_NAME=") else line)
with open(env_path, 'w', encoding='utf-8') as f:
f.writelines(lines)
await ctx.send(f"✅ Model switched to `{model_name}` and `.env` updated.")
@bot.command(name="models")
async def list_models(ctx):
import requests
try:
resp = requests.get(TAGS_ENDPOINT)
models = [m["name"] for m in resp.json().get("models", [])]
if models:
await ctx.send("🧠 Available models:\n" + "\n".join(f"- `{m}`" for m in models))
else:
await ctx.send("❌ No models found.")
except Exception as e:
await ctx.send(f"❌ Failed to fetch models: {e}")
@bot.command(name="memory")
@commands.is_owner()
async def memory_cmd(ctx, action: str = "info", *, target: str = None):
"""Memory management: !memory info [@user], !memory cleanup, !memory summary"""
from enhanced_ai import get_user_memory_summary
from memory_manager import memory_manager
if action == "info":
user_id = str(ctx.author.id)
if ctx.message.mentions:
user_id = str(ctx.message.mentions[0].id)
summary = get_user_memory_summary(user_id)
await ctx.send(f"```\n{summary}\n```")
elif action == "cleanup":
memory_manager.cleanup_old_memories(days=30)
await ctx.send("🧹 Cleaned up old memories (30+ days)")
elif action == "summary":
channel_id = str(ctx.channel.id)
memories = memory_manager.get_conversation_context(channel_id, hours=48)
if memories:
summary_lines = [f"Recent channel memories ({len(memories)} total):"]
for i, memory in enumerate(memories[:5]):
timestamp = memory['timestamp'][:16].replace('T', ' ')
content = memory['content'][:100]
summary_lines.append(f"{i+1}. {timestamp}: {content}")
await ctx.send(f"```\n" + "\n".join(summary_lines) + "\n```")
else:
await ctx.send("No recent memories for this channel.")
else:
await ctx.send("Usage: `!memory info [@user]`, `!memory cleanup`, `!memory summary`")
@bot.command(name="dryrun")
@commands.is_owner()
async def dryrun(ctx, *, prompt: str):
"""Build the prompt and payload without contacting the model.
Usage: `!dryrun Your test prompt here`"""
await ctx.send("🧪 Building dry-run payload...")
from ai import build_dryrun_payload
profile = load_user_profile(ctx.author)
info = build_dryrun_payload(prompt, context=None, user_profile=profile)
prompt_preview = info['prompt'][:1500]
payload_preview = {k: info['payload'][k] for k in info['payload'] if k != 'prompt'}
lines = [
"Prompt assembled:",
"```",
prompt_preview,
"```",
"Payload params:",
"```",
str(payload_preview),
"```"
]
await ctx.send("\n".join(lines))
@bot.command(name="setavatar")
@commands.is_owner()
async def set_avatar(ctx):
if not ctx.message.attachments:
return await ctx.send("❌ Please attach an image (PNG) to use as the new avatar.")
image = ctx.message.attachments[0]
image_bytes = await image.read()
token = os.getenv("DISCORD_TOKEN")
if not token:
return await ctx.send("❌ Bot token not found in environment.")
success = set_avatar_from_bytes(image_bytes, token)
await ctx.send("✅ Avatar updated successfully!" if success else "❌ Failed to update avatar.")
# Run bot
bot.run(TOKEN)