Files
aaronAI/deprecated/migrate_to_pgvector.py

126 lines
3.8 KiB
Python

"""
Migration: ChromaDB → pgvector
Re-embeds all documents from ChromaDB SQLite into PostgreSQL with pgvector.
Keeps ChromaDB intact as backup until migration is verified.
"""
import sqlite3
import psycopg2
import json
from pathlib import Path
from sentence_transformers import SentenceTransformer
CHROMA_SQLITE = str(Path.home() / "aaronai" / "db" / "chroma.sqlite3")
import os
PG_DSN = os.getenv("PG_DSN")
if not PG_DSN:
raise RuntimeError("PG_DSN environment variable not set")
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
print("Connecting to databases...")
chroma = sqlite3.connect(CHROMA_SQLITE)
chroma.row_factory = sqlite3.Row
c = chroma.cursor()
pg = psycopg2.connect(PG_DSN)
pg_cur = pg.cursor()
# Get all documents with their metadata from ChromaDB
print("Reading documents from ChromaDB...")
c.execute("""
SELECT
e.id as row_id,
e.embedding_id,
MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document,
MAX(CASE WHEN em.key = 'source' THEN em.string_value END) as source,
MAX(CASE WHEN em.key = 'type' THEN em.string_value END) as type,
MAX(CASE WHEN em.key = 'created_at' THEN em.string_value END) as created_at
FROM embeddings e
LEFT JOIN embedding_metadata em ON e.id = em.id
GROUP BY e.id, e.embedding_id
HAVING document IS NOT NULL
ORDER BY e.id
""")
rows = c.fetchall()
print(f"Found {len(rows)} documents to migrate")
# Check existing in PostgreSQL
pg_cur.execute("SELECT id FROM embeddings")
existing_ids = set(r[0] for r in pg_cur.fetchall())
print(f"Already in PostgreSQL: {len(existing_ids)}")
# Filter to only new ones
to_migrate = [r for r in rows if r['embedding_id'] not in existing_ids]
print(f"Need to migrate: {len(to_migrate)}")
if not to_migrate:
print("Nothing to migrate — already complete")
chroma.close()
pg.close()
exit(0)
# Migrate in batches
batch_size = 200
migrated = 0
errors = 0
for i in range(0, len(to_migrate), batch_size):
batch = to_migrate[i:i+batch_size]
# Generate embeddings
texts = [r['document'] for r in batch]
try:
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
except Exception as e:
print(f"Embedding error at batch {i}: {e}")
errors += len(batch)
continue
# Insert into PostgreSQL
for row, embedding in zip(batch, embeddings):
try:
pg_cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = EXCLUDED.created_at,
metadata = EXCLUDED.metadata
""", (
row['embedding_id'],
row['document'],
embedding,
row['source'],
row['type'],
row['created_at'],
json.dumps({
'source': row['source'],
'type': row['type'],
'created_at': row['created_at'],
})
))
migrated += 1
except Exception as e:
print(f"Insert error for {row['embedding_id']}: {e}")
errors += 1
pg.commit()
print(f"Progress: {min(i+batch_size, len(to_migrate))}/{len(to_migrate)} ({errors} errors)")
# Final count
pg_cur.execute("SELECT COUNT(*) FROM embeddings")
final_count = pg_cur.fetchone()[0]
chroma.close()
pg.close()
print(f"\nMigration complete:")
print(f" Migrated: {migrated}")
print(f" Errors: {errors}")
print(f" PostgreSQL total: {final_count}")