Add nightly conversation indexing — Aaron AI conversations into pgvector at 2:30AM
This commit is contained in:
@@ -0,0 +1,156 @@
|
||||
"""
|
||||
Nightly ingestion of Aaron AI conversations into pgvector.
|
||||
Chunks by exchange (user + assistant together), deduplicates by conversation_id + index.
|
||||
Only indexes conversations with more than 3 exchanges.
|
||||
"""
|
||||
import sqlite3
|
||||
import psycopg2
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dotenv import load_dotenv
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env")
|
||||
|
||||
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
|
||||
PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost")
|
||||
MIN_EXCHANGES = 3
|
||||
|
||||
print("Loading embedding model...")
|
||||
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||
|
||||
def get_conversations():
|
||||
conn = sqlite3.connect(CONVERSATIONS_DB)
|
||||
conn.row_factory = sqlite3.Row
|
||||
c = conn.cursor()
|
||||
c.execute("""
|
||||
SELECT c.id, c.title, c.created_at, c.updated_at,
|
||||
COUNT(m.id) as message_count
|
||||
FROM conversations c
|
||||
JOIN messages m ON m.conversation_id = c.id
|
||||
GROUP BY c.id
|
||||
HAVING message_count >= ?
|
||||
ORDER BY c.updated_at DESC
|
||||
""", (MIN_EXCHANGES * 2,))
|
||||
convos = c.fetchall()
|
||||
conn.close()
|
||||
return convos
|
||||
|
||||
def get_messages(conversation_id):
|
||||
conn = sqlite3.connect(CONVERSATIONS_DB)
|
||||
conn.row_factory = sqlite3.Row
|
||||
c = conn.cursor()
|
||||
c.execute("""
|
||||
SELECT role, content, timestamp
|
||||
FROM messages
|
||||
WHERE conversation_id = ?
|
||||
ORDER BY timestamp ASC
|
||||
""", (conversation_id,))
|
||||
msgs = c.fetchall()
|
||||
conn.close()
|
||||
return msgs
|
||||
|
||||
def chunk_conversation(convo, messages):
|
||||
chunks = []
|
||||
title = convo['title'] or 'Untitled'
|
||||
conv_id = convo['id']
|
||||
|
||||
# Pair user + assistant exchanges
|
||||
pairs = []
|
||||
i = 0
|
||||
msgs = list(messages)
|
||||
while i < len(msgs):
|
||||
if msgs[i]['role'] == 'user':
|
||||
user_msg = msgs[i]['content']
|
||||
asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else ''
|
||||
pairs.append((user_msg, asst_msg))
|
||||
i += 2
|
||||
else:
|
||||
i += 1
|
||||
|
||||
# Sliding window of 2 exchanges
|
||||
for idx in range(0, len(pairs), 2):
|
||||
window = pairs[idx:idx+2]
|
||||
text_parts = []
|
||||
for user, asst in window:
|
||||
if user:
|
||||
text_parts.append(f"You: {user[:500]}")
|
||||
if asst:
|
||||
text_parts.append(f"Aaron AI: {asst[:500]}")
|
||||
|
||||
if not text_parts:
|
||||
continue
|
||||
|
||||
chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts)
|
||||
chunk_id = f"aaronai_conv_{conv_id}_{idx}"
|
||||
chunks.append((chunk_id, chunk_text, {
|
||||
"source": f"Aaron AI: {title}",
|
||||
"type": "aaronai_conversation",
|
||||
"created_at": convo['updated_at'],
|
||||
}))
|
||||
|
||||
return chunks
|
||||
|
||||
def run():
|
||||
pg = psycopg2.connect(PG_DSN)
|
||||
cur = pg.cursor()
|
||||
|
||||
conversations = get_conversations()
|
||||
print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges")
|
||||
|
||||
total_added = 0
|
||||
total_skipped = 0
|
||||
|
||||
for convo in conversations:
|
||||
messages = get_messages(convo['id'])
|
||||
chunks = chunk_conversation(convo, messages)
|
||||
|
||||
if not chunks:
|
||||
continue
|
||||
|
||||
ids = [c[0] for c in chunks]
|
||||
|
||||
# Check which already exist
|
||||
cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,))
|
||||
existing = set(r[0] for r in cur.fetchall())
|
||||
new_chunks = [c for c in chunks if c[0] not in existing]
|
||||
|
||||
if not new_chunks:
|
||||
total_skipped += len(chunks)
|
||||
continue
|
||||
|
||||
# Embed and insert
|
||||
texts = [c[1] for c in new_chunks]
|
||||
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
|
||||
|
||||
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
|
||||
cur.execute("""
|
||||
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
|
||||
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
document = EXCLUDED.document,
|
||||
embedding = EXCLUDED.embedding,
|
||||
source = EXCLUDED.source,
|
||||
type = EXCLUDED.type,
|
||||
created_at = EXCLUDED.created_at
|
||||
""", (
|
||||
chunk_id, chunk_text, embedding,
|
||||
meta['source'], meta['type'], meta['created_at'],
|
||||
json.dumps(meta)
|
||||
))
|
||||
|
||||
pg.commit()
|
||||
total_added += len(new_chunks)
|
||||
total_skipped += len(existing)
|
||||
|
||||
cur.execute("SELECT COUNT(*) FROM embeddings")
|
||||
total = cur.fetchone()[0]
|
||||
pg.close()
|
||||
|
||||
print(f"Done: {total_added} chunks added, {total_skipped} already indexed")
|
||||
print(f"Corpus total: {total} chunks")
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
Reference in New Issue
Block a user