Email-Agent/Obsolete/gmail_to_db_test.py

136 lines
4.4 KiB
Python
Raw Permalink Normal View History

2025-05-06 11:13:15 -04:00
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
import base64
import mysql.connector
import os
import yaml
import datetime
from initialize_db import initialize_database
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
# === Load DB credentials ===
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "emailuser")
DB_PASSWORD = os.getenv("DB_PASSWORD", "miguel33020")
DB_NAME = os.getenv("DB_NAME", "emailassistant")
def authenticate_gmail():
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES)
creds = flow.run_local_server(port=0)
return build("gmail", "v1", credentials=creds)
def get_header(headers, name):
for h in headers:
if h["name"].lower() == name.lower():
return h["value"]
return None
def decode_body(payload):
if "data" in payload.get("body", {}):
return base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="ignore")
elif "parts" in payload:
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and "data" in part.get("body", {}):
return base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="ignore")
return ""
def insert_into_db(email_data):
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME
)
cursor = conn.cursor()
query = """
INSERT IGNORE INTO emails (
user, account, message_id, thread_id, account_id, sender, cc, subject, body, links,
received_at, folder, attachments, is_read, labels,
ai_category, ai_confidence, ai_summary,
processing_status, sync_status, attachment_path, downloaded
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s)
"""
try:
cursor.execute(query, (
email_data.get("user", "default_user"),
email_data.get("account", "main_account"),
email_data["message_id"],
email_data["thread_id"],
email_data.get("account_id", ""),
email_data["sender"],
email_data["cc"],
email_data["subject"],
email_data["body"],
email_data.get("links", ""),
email_data["received_at"],
email_data.get("folder", "inbox"),
email_data["attachments"],
False,
email_data["labels"],
None, None, None,
"unprocessed",
"synced",
None,
False
))
print(f"✅ Stored: {email_data['subject'][:60]}...")
except Exception as e:
print("❌ Error inserting into DB:", e)
conn.commit()
cursor.close()
conn.close()
def fetch_and_store_emails(service):
results = service.users().messages().list(userId="me", maxResults=500).execute()
messages = results.get("messages", [])
for msg in messages:
msg_data = service.users().messages().get(userId="me", id=msg["id"]).execute()
payload = msg_data.get("payload", {})
headers = payload.get("headers", [])
sender = get_header(headers, "From")
cc = get_header(headers, "Cc")
subject = get_header(headers, "Subject")
date_str = get_header(headers, "Date")
body = decode_body(payload)
try:
received_at = datetime.datetime.strptime(date_str, "%a, %d %b %Y %H:%M:%S %z")
except:
received_at = datetime.datetime.utcnow()
email_data = {
"user": "default_user",
"account": "main_account",
"message_id": msg_data["id"],
"thread_id": msg_data.get("threadId"),
"account_id": "",
"sender": sender,
"cc": cc,
"subject": subject,
"body": body,
"links": "", # Placeholder, will be populated by AI
"received_at": received_at,
"folder": "inbox",
"attachments": str(payload.get("parts", [])),
"labels": str(msg_data.get("labelIds", []))
}
insert_into_db(email_data)
if __name__ == "__main__":
initialize_database()
gmail_service = authenticate_gmail()
fetch_and_store_emails(gmail_service)