""" Migration: ChromaDB → pgvector Re-embeds all documents from ChromaDB SQLite into PostgreSQL with pgvector. Keeps ChromaDB intact as backup until migration is verified. """ import sqlite3 import psycopg2 import json from pathlib import Path from sentence_transformers import SentenceTransformer CHROMA_SQLITE = str(Path.home() / "aaronai" / "db" / "chroma.sqlite3") import os PG_DSN = os.getenv("PG_DSN") if not PG_DSN: raise RuntimeError("PG_DSN environment variable not set") print("Loading embedding model...") embedder = SentenceTransformer("all-MiniLM-L6-v2") print("Connecting to databases...") chroma = sqlite3.connect(CHROMA_SQLITE) chroma.row_factory = sqlite3.Row c = chroma.cursor() pg = psycopg2.connect(PG_DSN) pg_cur = pg.cursor() # Get all documents with their metadata from ChromaDB print("Reading documents from ChromaDB...") c.execute(""" SELECT e.id as row_id, e.embedding_id, MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document, MAX(CASE WHEN em.key = 'source' THEN em.string_value END) as source, MAX(CASE WHEN em.key = 'type' THEN em.string_value END) as type, MAX(CASE WHEN em.key = 'created_at' THEN em.string_value END) as created_at FROM embeddings e LEFT JOIN embedding_metadata em ON e.id = em.id GROUP BY e.id, e.embedding_id HAVING document IS NOT NULL ORDER BY e.id """) rows = c.fetchall() print(f"Found {len(rows)} documents to migrate") # Check existing in PostgreSQL pg_cur.execute("SELECT id FROM embeddings") existing_ids = set(r[0] for r in pg_cur.fetchall()) print(f"Already in PostgreSQL: {len(existing_ids)}") # Filter to only new ones to_migrate = [r for r in rows if r['embedding_id'] not in existing_ids] print(f"Need to migrate: {len(to_migrate)}") if not to_migrate: print("Nothing to migrate — already complete") chroma.close() pg.close() exit(0) # Migrate in batches batch_size = 200 migrated = 0 errors = 0 for i in range(0, len(to_migrate), batch_size): batch = to_migrate[i:i+batch_size] # Generate embeddings texts = [r['document'] for r in batch] try: embeddings = embedder.encode(texts, show_progress_bar=False).tolist() except Exception as e: print(f"Embedding error at batch {i}: {e}") errors += len(batch) continue # Insert into PostgreSQL for row, embedding in zip(batch, embeddings): try: pg_cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document, embedding = EXCLUDED.embedding, source = EXCLUDED.source, type = EXCLUDED.type, created_at = EXCLUDED.created_at, metadata = EXCLUDED.metadata """, ( row['embedding_id'], row['document'], embedding, row['source'], row['type'], row['created_at'], json.dumps({ 'source': row['source'], 'type': row['type'], 'created_at': row['created_at'], }) )) migrated += 1 except Exception as e: print(f"Insert error for {row['embedding_id']}: {e}") errors += 1 pg.commit() print(f"Progress: {min(i+batch_size, len(to_migrate))}/{len(to_migrate)} ({errors} errors)") # Final count pg_cur.execute("SELECT COUNT(*) FROM embeddings") final_count = pg_cur.fetchone()[0] chroma.close() pg.close() print(f"\nMigration complete:") print(f" Migrated: {migrated}") print(f" Errors: {errors}") print(f" PostgreSQL total: {final_count}")