157 lines
4.8 KiB
Python
157 lines
4.8 KiB
Python
"""
|
|
Nightly ingestion of Aaron AI conversations into pgvector.
|
|
Chunks by exchange (user + assistant together), deduplicates by conversation_id + index.
|
|
Only indexes conversations with more than 3 exchanges.
|
|
"""
|
|
import sqlite3
|
|
import psycopg2
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
MIN_EXCHANGES = 3
|
|
|
|
print("Loading embedding model...")
|
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
def get_conversations():
|
|
conn = sqlite3.connect(CONVERSATIONS_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
SELECT c.id, c.title, c.created_at, c.updated_at,
|
|
COUNT(m.id) as message_count
|
|
FROM conversations c
|
|
JOIN messages m ON m.conversation_id = c.id
|
|
GROUP BY c.id
|
|
HAVING message_count >= ?
|
|
ORDER BY c.updated_at DESC
|
|
""", (MIN_EXCHANGES * 2,))
|
|
convos = c.fetchall()
|
|
conn.close()
|
|
return convos
|
|
|
|
def get_messages(conversation_id):
|
|
conn = sqlite3.connect(CONVERSATIONS_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
SELECT role, content, timestamp
|
|
FROM messages
|
|
WHERE conversation_id = ?
|
|
ORDER BY timestamp ASC
|
|
""", (conversation_id,))
|
|
msgs = c.fetchall()
|
|
conn.close()
|
|
return msgs
|
|
|
|
def chunk_conversation(convo, messages):
|
|
chunks = []
|
|
title = convo['title'] or 'Untitled'
|
|
conv_id = convo['id']
|
|
|
|
# Pair user + assistant exchanges
|
|
pairs = []
|
|
i = 0
|
|
msgs = list(messages)
|
|
while i < len(msgs):
|
|
if msgs[i]['role'] == 'user':
|
|
user_msg = msgs[i]['content']
|
|
asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else ''
|
|
pairs.append((user_msg, asst_msg))
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
# Sliding window of 2 exchanges
|
|
for idx in range(0, len(pairs), 2):
|
|
window = pairs[idx:idx+2]
|
|
text_parts = []
|
|
for user, asst in window:
|
|
if user:
|
|
text_parts.append(f"You: {user[:500]}")
|
|
if asst:
|
|
text_parts.append(f"Aaron AI: {asst[:500]}")
|
|
|
|
if not text_parts:
|
|
continue
|
|
|
|
chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts)
|
|
chunk_id = f"aaronai_conv_{conv_id}_{idx}"
|
|
chunks.append((chunk_id, chunk_text, {
|
|
"source": f"Aaron AI: {title}",
|
|
"type": "aaronai_conversation",
|
|
"created_at": convo['updated_at'],
|
|
}))
|
|
|
|
return chunks
|
|
|
|
def run():
|
|
pg = psycopg2.connect(PG_DSN)
|
|
cur = pg.cursor()
|
|
|
|
conversations = get_conversations()
|
|
print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges")
|
|
|
|
total_added = 0
|
|
total_skipped = 0
|
|
|
|
for convo in conversations:
|
|
messages = get_messages(convo['id'])
|
|
chunks = chunk_conversation(convo, messages)
|
|
|
|
if not chunks:
|
|
continue
|
|
|
|
ids = [c[0] for c in chunks]
|
|
|
|
# Check which already exist
|
|
cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,))
|
|
existing = set(r[0] for r in cur.fetchall())
|
|
new_chunks = [c for c in chunks if c[0] not in existing]
|
|
|
|
if not new_chunks:
|
|
total_skipped += len(chunks)
|
|
continue
|
|
|
|
# Embed and insert
|
|
texts = [c[1] for c in new_chunks]
|
|
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
|
|
|
|
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
|
|
cur.execute("""
|
|
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
|
|
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
document = EXCLUDED.document,
|
|
embedding = EXCLUDED.embedding,
|
|
source = EXCLUDED.source,
|
|
type = EXCLUDED.type,
|
|
created_at = EXCLUDED.created_at
|
|
""", (
|
|
chunk_id, chunk_text, embedding,
|
|
meta['source'], meta['type'], meta['created_at'],
|
|
json.dumps(meta)
|
|
))
|
|
|
|
pg.commit()
|
|
total_added += len(new_chunks)
|
|
total_skipped += len(existing)
|
|
|
|
cur.execute("SELECT COUNT(*) FROM embeddings")
|
|
total = cur.fetchone()[0]
|
|
pg.close()
|
|
|
|
print(f"Done: {total_added} chunks added, {total_skipped} already indexed")
|
|
print(f"Corpus total: {total} chunks")
|
|
|
|
if __name__ == "__main__":
|
|
run()
|