Files
aaronAI/scripts/ingest_conversations.py
T

157 lines
4.8 KiB
Python

"""
Nightly ingestion of Aaron AI conversations into pgvector.
Chunks by exchange (user + assistant together), deduplicates by conversation_id + index.
Only indexes conversations with more than 3 exchanges.
"""
import sqlite3
import psycopg2
import json
import os
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
load_dotenv(Path.home() / "aaronai" / ".env")
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
PG_DSN = os.getenv("PG_DSN")
MIN_EXCHANGES = 3
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
def get_conversations():
conn = sqlite3.connect(CONVERSATIONS_DB)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT c.id, c.title, c.created_at, c.updated_at,
COUNT(m.id) as message_count
FROM conversations c
JOIN messages m ON m.conversation_id = c.id
GROUP BY c.id
HAVING message_count >= ?
ORDER BY c.updated_at DESC
""", (MIN_EXCHANGES * 2,))
convos = c.fetchall()
conn.close()
return convos
def get_messages(conversation_id):
conn = sqlite3.connect(CONVERSATIONS_DB)
conn.row_factory = sqlite3.Row
c = conn.cursor()
c.execute("""
SELECT role, content, timestamp
FROM messages
WHERE conversation_id = ?
ORDER BY timestamp ASC
""", (conversation_id,))
msgs = c.fetchall()
conn.close()
return msgs
def chunk_conversation(convo, messages):
chunks = []
title = convo['title'] or 'Untitled'
conv_id = convo['id']
# Pair user + assistant exchanges
pairs = []
i = 0
msgs = list(messages)
while i < len(msgs):
if msgs[i]['role'] == 'user':
user_msg = msgs[i]['content']
asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else ''
pairs.append((user_msg, asst_msg))
i += 2
else:
i += 1
# Sliding window of 2 exchanges
for idx in range(0, len(pairs), 2):
window = pairs[idx:idx+2]
text_parts = []
for user, asst in window:
if user:
text_parts.append(f"You: {user[:500]}")
if asst:
text_parts.append(f"Aaron AI: {asst[:500]}")
if not text_parts:
continue
chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts)
chunk_id = f"aaronai_conv_{conv_id}_{idx}"
chunks.append((chunk_id, chunk_text, {
"source": f"Aaron AI: {title}",
"type": "aaronai_conversation",
"created_at": convo['updated_at'],
}))
return chunks
def run():
pg = psycopg2.connect(PG_DSN)
cur = pg.cursor()
conversations = get_conversations()
print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges")
total_added = 0
total_skipped = 0
for convo in conversations:
messages = get_messages(convo['id'])
chunks = chunk_conversation(convo, messages)
if not chunks:
continue
ids = [c[0] for c in chunks]
# Check which already exist
cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,))
existing = set(r[0] for r in cur.fetchall())
new_chunks = [c for c in chunks if c[0] not in existing]
if not new_chunks:
total_skipped += len(chunks)
continue
# Embed and insert
texts = [c[1] for c in new_chunks]
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = EXCLUDED.created_at
""", (
chunk_id, chunk_text, embedding,
meta['source'], meta['type'], meta['created_at'],
json.dumps(meta)
))
pg.commit()
total_added += len(new_chunks)
total_skipped += len(existing)
cur.execute("SELECT COUNT(*) FROM embeddings")
total = cur.fetchone()[0]
pg.close()
print(f"Done: {total_added} chunks added, {total_skipped} already indexed")
print(f"Corpus total: {total} chunks")
if __name__ == "__main__":
run()