""" Nightly ingestion of Aaron AI conversations into pgvector. Chunks by exchange (user + assistant together), deduplicates by conversation_id + index. Only indexes conversations with more than 3 exchanges. """ import sqlite3 import psycopg2 import json import os from pathlib import Path from datetime import datetime from dotenv import load_dotenv from sentence_transformers import SentenceTransformer load_dotenv(Path.home() / "aaronai" / ".env") CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") MIN_EXCHANGES = 3 print("Loading embedding model...") embedder = SentenceTransformer("all-MiniLM-L6-v2") def get_conversations(): conn = sqlite3.connect(CONVERSATIONS_DB) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" SELECT c.id, c.title, c.created_at, c.updated_at, COUNT(m.id) as message_count FROM conversations c JOIN messages m ON m.conversation_id = c.id GROUP BY c.id HAVING message_count >= ? ORDER BY c.updated_at DESC """, (MIN_EXCHANGES * 2,)) convos = c.fetchall() conn.close() return convos def get_messages(conversation_id): conn = sqlite3.connect(CONVERSATIONS_DB) conn.row_factory = sqlite3.Row c = conn.cursor() c.execute(""" SELECT role, content, timestamp FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC """, (conversation_id,)) msgs = c.fetchall() conn.close() return msgs def chunk_conversation(convo, messages): chunks = [] title = convo['title'] or 'Untitled' conv_id = convo['id'] # Pair user + assistant exchanges pairs = [] i = 0 msgs = list(messages) while i < len(msgs): if msgs[i]['role'] == 'user': user_msg = msgs[i]['content'] asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else '' pairs.append((user_msg, asst_msg)) i += 2 else: i += 1 # Sliding window of 2 exchanges for idx in range(0, len(pairs), 2): window = pairs[idx:idx+2] text_parts = [] for user, asst in window: if user: text_parts.append(f"You: {user[:500]}") if asst: text_parts.append(f"Aaron AI: {asst[:500]}") if not text_parts: continue chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts) chunk_id = f"aaronai_conv_{conv_id}_{idx}" chunks.append((chunk_id, chunk_text, { "source": f"Aaron AI: {title}", "type": "aaronai_conversation", "created_at": convo['updated_at'], })) return chunks def run(): pg = psycopg2.connect(PG_DSN) cur = pg.cursor() conversations = get_conversations() print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges") total_added = 0 total_skipped = 0 for convo in conversations: messages = get_messages(convo['id']) chunks = chunk_conversation(convo, messages) if not chunks: continue ids = [c[0] for c in chunks] # Check which already exist cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,)) existing = set(r[0] for r in cur.fetchall()) new_chunks = [c for c in chunks if c[0] not in existing] if not new_chunks: total_skipped += len(chunks) continue # Embed and insert texts = [c[1] for c in new_chunks] embeddings = embedder.encode(texts, show_progress_bar=False).tolist() for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings): cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document, embedding = EXCLUDED.embedding, source = EXCLUDED.source, type = EXCLUDED.type, created_at = EXCLUDED.created_at """, ( chunk_id, chunk_text, embedding, meta['source'], meta['type'], meta['created_at'], json.dumps(meta) )) pg.commit() total_added += len(new_chunks) total_skipped += len(existing) cur.execute("SELECT COUNT(*) FROM embeddings") total = cur.fetchone()[0] pg.close() print(f"Done: {total_added} chunks added, {total_skipped} already indexed") print(f"Corpus total: {total} chunks") if __name__ == "__main__": run()