diff --git a/scripts/ingest_conversations.py b/scripts/ingest_conversations.py new file mode 100644 index 0000000..0e752f7 --- /dev/null +++ b/scripts/ingest_conversations.py @@ -0,0 +1,156 @@ +""" +Nightly ingestion of Aaron AI conversations into pgvector. +Chunks by exchange (user + assistant together), deduplicates by conversation_id + index. +Only indexes conversations with more than 3 exchanges. +""" +import sqlite3 +import psycopg2 +import json +import os +from pathlib import Path +from datetime import datetime +from dotenv import load_dotenv +from sentence_transformers import SentenceTransformer + +load_dotenv(Path.home() / "aaronai" / ".env") + +CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") +PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") +MIN_EXCHANGES = 3 + +print("Loading embedding model...") +embedder = SentenceTransformer("all-MiniLM-L6-v2") + +def get_conversations(): + conn = sqlite3.connect(CONVERSATIONS_DB) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute(""" + SELECT c.id, c.title, c.created_at, c.updated_at, + COUNT(m.id) as message_count + FROM conversations c + JOIN messages m ON m.conversation_id = c.id + GROUP BY c.id + HAVING message_count >= ? + ORDER BY c.updated_at DESC + """, (MIN_EXCHANGES * 2,)) + convos = c.fetchall() + conn.close() + return convos + +def get_messages(conversation_id): + conn = sqlite3.connect(CONVERSATIONS_DB) + conn.row_factory = sqlite3.Row + c = conn.cursor() + c.execute(""" + SELECT role, content, timestamp + FROM messages + WHERE conversation_id = ? + ORDER BY timestamp ASC + """, (conversation_id,)) + msgs = c.fetchall() + conn.close() + return msgs + +def chunk_conversation(convo, messages): + chunks = [] + title = convo['title'] or 'Untitled' + conv_id = convo['id'] + + # Pair user + assistant exchanges + pairs = [] + i = 0 + msgs = list(messages) + while i < len(msgs): + if msgs[i]['role'] == 'user': + user_msg = msgs[i]['content'] + asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else '' + pairs.append((user_msg, asst_msg)) + i += 2 + else: + i += 1 + + # Sliding window of 2 exchanges + for idx in range(0, len(pairs), 2): + window = pairs[idx:idx+2] + text_parts = [] + for user, asst in window: + if user: + text_parts.append(f"You: {user[:500]}") + if asst: + text_parts.append(f"Aaron AI: {asst[:500]}") + + if not text_parts: + continue + + chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts) + chunk_id = f"aaronai_conv_{conv_id}_{idx}" + chunks.append((chunk_id, chunk_text, { + "source": f"Aaron AI: {title}", + "type": "aaronai_conversation", + "created_at": convo['updated_at'], + })) + + return chunks + +def run(): + pg = psycopg2.connect(PG_DSN) + cur = pg.cursor() + + conversations = get_conversations() + print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges") + + total_added = 0 + total_skipped = 0 + + for convo in conversations: + messages = get_messages(convo['id']) + chunks = chunk_conversation(convo, messages) + + if not chunks: + continue + + ids = [c[0] for c in chunks] + + # Check which already exist + cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,)) + existing = set(r[0] for r in cur.fetchall()) + new_chunks = [c for c in chunks if c[0] not in existing] + + if not new_chunks: + total_skipped += len(chunks) + continue + + # Embed and insert + texts = [c[1] for c in new_chunks] + embeddings = embedder.encode(texts, show_progress_bar=False).tolist() + + for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings): + cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + type = EXCLUDED.type, + created_at = EXCLUDED.created_at + """, ( + chunk_id, chunk_text, embedding, + meta['source'], meta['type'], meta['created_at'], + json.dumps(meta) + )) + + pg.commit() + total_added += len(new_chunks) + total_skipped += len(existing) + + cur.execute("SELECT COUNT(*) FROM embeddings") + total = cur.fetchone()[0] + pg.close() + + print(f"Done: {total_added} chunks added, {total_skipped} already indexed") + print(f"Corpus total: {total} chunks") + +if __name__ == "__main__": + run()