From f78b83042bf2bb3d95c3604ee5d4431e76b103df Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Sun, 26 Apr 2026 21:16:04 +0000 Subject: [PATCH] =?UTF-8?q?Migrate=20to=20pgvector=20=E2=80=94=20remove=20?= =?UTF-8?q?ChromaDB=20from=20api.py,=20ingest=20scripts,=20dream.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/api.py | 70 ++++++++++++------- scripts/dream.py | 69 +++++++++---------- scripts/ingest.py | 38 ++++++---- scripts/ingest_chatgpt.py | 4 +- scripts/ingest_claude.py | 30 ++++++-- scripts/migrate_to_pgvector.py | 122 +++++++++++++++++++++++++++++++++ 6 files changed, 250 insertions(+), 83 deletions(-) create mode 100644 scripts/migrate_to_pgvector.py diff --git a/scripts/api.py b/scripts/api.py index 59a38dd..3ce744f 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -6,10 +6,11 @@ import hashlib from pathlib import Path from datetime import datetime from dotenv import load_dotenv -import chromadb from sentence_transformers import SentenceTransformer import anthropic from fastapi import FastAPI, Request, Response, Depends, HTTPException +import psycopg2 +import psycopg2.extras from fastapi import UploadFile, File import tempfile import os @@ -46,6 +47,10 @@ DEFAULT_SETTINGS = { } print("Loading Aaron AI...") +PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") + +def get_pg(): + return psycopg2.connect(PG_DSN) WHISPER_PROMPT = ( "Grasshopper, Rhino, PolyJet, SLA, FDM, DMLS, ChromaDB, " "HVAMC, FWN3D, Mossygear, Nextcloud, Gitea, computational design, " @@ -59,11 +64,7 @@ if HAS_WHISPER: except Exception as e: print(f"Whisper not available: {e}") embedder = SentenceTransformer("all-MiniLM-L6-v2") -chroma_client = chromadb.PersistentClient(path=DB_PATH) -collection = chroma_client.get_or_create_collection( - name="aaronai", - metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True} -) +# ChromaDB removed — using pgvector anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) SYSTEM_PROMPT = """You are the personal AI assistant of Aaron Nelson — computational @@ -210,11 +211,17 @@ def remove_from_memory(item): def get_pinned_cv_context(): try: - results = collection.get( - where={"source": {"$in": CV_SOURCES}}, - include=["documents", "metadatas"] + pg = get_pg() + cur = pg.cursor() + cur.execute( + "SELECT document, source FROM embeddings WHERE source = ANY(%s)", + (CV_SOURCES,) ) - return results["documents"], results["metadatas"] + rows = cur.fetchall() + pg.close() + docs = [r[0] for r in rows] + metas = [{"source": r[1]} for r in rows] + return docs, metas except: return [], [] @@ -227,12 +234,7 @@ def is_professional_query(query): return any(k in query.lower() for k in keywords) def retrieve_context(query, n_results=8): - query_embedding = embedder.encode([query]).tolist() - results = collection.query( - query_embeddings=query_embedding, - n_results=n_results, - include=["documents", "metadatas", "distances"] - ) + query_embedding = embedder.encode([query]).tolist()[0] context_pieces = [] sources = [] if is_professional_query(query): @@ -240,15 +242,24 @@ def retrieve_context(query, n_results=8): for doc, meta in zip(cv_docs, cv_metas): context_pieces.append(f"[CV] {doc}") sources.append(meta.get("source", "CV")) - for doc, meta, dist in zip( - results["documents"][0], - results["metadatas"][0], - results["distances"][0] - ): - relevance = 1 - dist - if relevance > 0.3 and meta.get("source") not in CV_SOURCES: - context_pieces.append(doc) - sources.append(meta.get("source", "unknown")) + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + SELECT document, source, 1 - (embedding <=> %s::vector) as similarity + FROM embeddings + WHERE source NOT IN %s + ORDER BY embedding <=> %s::vector + LIMIT %s + """, (query_embedding, tuple(CV_SOURCES) if CV_SOURCES else ('__none__',), + query_embedding, n_results)) + for doc, source, similarity in cur.fetchall(): + if similarity > 0.3: + context_pieces.append(doc) + sources.append(source or "unknown") + pg.close() + except Exception as e: + print(f"pgvector retrieval error: {e}") return context_pieces, sources def get_conversation_history(conversation_id, limit=20): @@ -519,7 +530,14 @@ async def update_memory(request: Request, auth: str = Depends(require_auth)): @app.get("/api/status") async def get_status(auth: str = Depends(require_auth)): - chunk_count = collection.count() + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT COUNT(*) FROM embeddings") + chunk_count = cur.fetchone()[0] + pg.close() + except: + chunk_count = 0 # Watcher status watcher_running = False diff --git a/scripts/dream.py b/scripts/dream.py index 6d08d7d..9261965 100644 --- a/scripts/dream.py +++ b/scripts/dream.py @@ -10,8 +10,13 @@ import argparse from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv +import psycopg2 load_dotenv(Path.home() / "aaronai" / ".env") +PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") + +def get_pg(): + return psycopg2.connect(PG_DSN) # ─── Paths ────────────────────────────────────────────────────────────────── DB_PATH = str(Path.home() / "aaronai" / "db") @@ -115,16 +120,9 @@ def check_recent_journal(days=3): # ─── Stage 3: Retrieve ────────────────────────────────────────────────────── def retrieve(mode, task=None, project=None, n_results=8): - import chromadb from sentence_transformers import SentenceTransformer - embedder = SentenceTransformer("all-MiniLM-L6-v2") - client = chromadb.PersistentClient(path=DB_PATH) - collection = client.get_or_create_collection( - name="aaronai", - metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True} - ) - + embedder = SentenceTransformer("all-MiniLM-L6-v2") low, high = MODE_RANGES[mode] if task: @@ -138,38 +136,37 @@ def retrieve(mode, task=None, project=None, n_results=8): else: query = "research fabrication teaching practice recent work" - embedding = embedder.encode([query]).tolist() - results = collection.query( - query_embeddings=embedding, - n_results=n_results * 3, - include=["documents", "metadatas", "distances"] - ) + embedding = embedder.encode([query]).tolist()[0] - chunks = [] + chunks = [] seen_sources = set() - for doc, meta, dist in zip( - results["documents"][0], - results["metadatas"][0], - results["distances"][0] - ): - relevance = 1 - dist - source = meta.get("source", "unknown") + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + SELECT document, source, 1 - (embedding <=> %s::vector) as similarity + FROM embeddings + ORDER BY embedding <=> %s::vector + LIMIT %s + """, (embedding, embedding, n_results * 3)) - if not (low <= relevance <= high): - continue - if source in seen_sources: - continue - - chunks.append({ - "source": source, - "content": doc, - "relevance": relevance, - }) - seen_sources.add(source) - - if len(chunks) >= n_results: - break + for doc, source, similarity in cur.fetchall(): + if not (low <= similarity <= high): + continue + if source in seen_sources: + continue + chunks.append({ + "source": source or "unknown", + "content": doc, + "relevance": similarity, + }) + seen_sources.add(source) + if len(chunks) >= n_results: + break + pg.close() + except Exception as e: + print(f"pgvector retrieval error: {e}") return chunks diff --git a/scripts/ingest.py b/scripts/ingest.py index ac87feb..e984bae 100644 --- a/scripts/ingest.py +++ b/scripts/ingest.py @@ -3,7 +3,9 @@ import sys import hashlib from pathlib import Path from dotenv import load_dotenv -import chromadb +import psycopg2 +import psycopg2.extras +import json from sentence_transformers import SentenceTransformer from docx import Document from pypdf import PdfReader @@ -14,12 +16,10 @@ load_dotenv(Path.home() / "aaronai" / ".env") print("Loading embedding model...") embedder = SentenceTransformer("all-MiniLM-L6-v2") -db_path = str(Path.home() / "aaronai" / "db") -client = chromadb.PersistentClient(path=db_path) -collection = client.get_or_create_collection( - name="aaronai", - metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True} -) +PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") + +def get_pg(): + return psycopg2.connect(PG_DSN) def extract_text_from_docx(path): doc = Document(path) @@ -98,12 +98,24 @@ def ingest_file(filepath): "folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent)) } for _ in chunks] - collection.upsert( - documents=chunks, - embeddings=embeddings, - ids=ids, - metadatas=metadatas - ) + pg = get_pg() + cur = pg.cursor() + for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas): + cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + metadata = EXCLUDED.metadata + """, ( + chunk_id, chunk, embedding, + meta.get('source'), 'document', None, + json.dumps(meta) + )) + pg.commit() + pg.close() print(f" Indexed {len(chunks)} chunks: {path.name}") return len(chunks) diff --git a/scripts/ingest_chatgpt.py b/scripts/ingest_chatgpt.py index 2bb6406..cd7e3ef 100644 --- a/scripts/ingest_chatgpt.py +++ b/scripts/ingest_chatgpt.py @@ -3,7 +3,9 @@ import sys from pathlib import Path from datetime import datetime from sentence_transformers import SentenceTransformer -import chromadb +import psycopg2 +import psycopg2.extras +import json as json_module # Paths db_path = str(Path.home() / "aaronai" / "db") diff --git a/scripts/ingest_claude.py b/scripts/ingest_claude.py index 977c93d..98de6ba 100644 --- a/scripts/ingest_claude.py +++ b/scripts/ingest_claude.py @@ -2,7 +2,9 @@ import json import sys from pathlib import Path from sentence_transformers import SentenceTransformer -import chromadb +import psycopg2 +import psycopg2.extras +import json as json_module # Paths db_path = str(Path.home() / "aaronai" / "db") @@ -102,12 +104,26 @@ def ingest_conversations(path): continue embeddings = embedder.encode([n[1] for n in new]).tolist() - collection.upsert( - ids=[n[0] for n in new], - documents=[n[1] for n in new], - metadatas=[n[2] for n in new], - embeddings=embeddings, - ) + pg = get_pg() + cur = pg.cursor() + for (chunk_id, chunk_text, meta), embedding in zip(new, embeddings): + cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + type = EXCLUDED.type, + created_at = EXCLUDED.created_at, + metadata = EXCLUDED.metadata + """, ( + chunk_id, chunk_text, embedding, + meta.get('source'), meta.get('type'), meta.get('created_at'), + json_module.dumps(meta) + )) + pg.commit() + pg.close() total += len(new) print(f"Conversations: {total} chunks added, {skipped} skipped") diff --git a/scripts/migrate_to_pgvector.py b/scripts/migrate_to_pgvector.py new file mode 100644 index 0000000..2f570f6 --- /dev/null +++ b/scripts/migrate_to_pgvector.py @@ -0,0 +1,122 @@ +""" +Migration: ChromaDB → pgvector +Re-embeds all documents from ChromaDB SQLite into PostgreSQL with pgvector. +Keeps ChromaDB intact as backup until migration is verified. +""" +import sqlite3 +import psycopg2 +import json +from pathlib import Path +from sentence_transformers import SentenceTransformer + +CHROMA_SQLITE = str(Path.home() / "aaronai" / "db" / "chroma.sqlite3") +PG_DSN = "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost" + +print("Loading embedding model...") +embedder = SentenceTransformer("all-MiniLM-L6-v2") + +print("Connecting to databases...") +chroma = sqlite3.connect(CHROMA_SQLITE) +chroma.row_factory = sqlite3.Row +c = chroma.cursor() + +pg = psycopg2.connect(PG_DSN) +pg_cur = pg.cursor() + +# Get all documents with their metadata from ChromaDB +print("Reading documents from ChromaDB...") +c.execute(""" + SELECT + e.id as row_id, + e.embedding_id, + MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document, + MAX(CASE WHEN em.key = 'source' THEN em.string_value END) as source, + MAX(CASE WHEN em.key = 'type' THEN em.string_value END) as type, + MAX(CASE WHEN em.key = 'created_at' THEN em.string_value END) as created_at + FROM embeddings e + LEFT JOIN embedding_metadata em ON e.id = em.id + GROUP BY e.id, e.embedding_id + HAVING document IS NOT NULL + ORDER BY e.id +""") + +rows = c.fetchall() +print(f"Found {len(rows)} documents to migrate") + +# Check existing in PostgreSQL +pg_cur.execute("SELECT id FROM embeddings") +existing_ids = set(r[0] for r in pg_cur.fetchall()) +print(f"Already in PostgreSQL: {len(existing_ids)}") + +# Filter to only new ones +to_migrate = [r for r in rows if r['embedding_id'] not in existing_ids] +print(f"Need to migrate: {len(to_migrate)}") + +if not to_migrate: + print("Nothing to migrate — already complete") + chroma.close() + pg.close() + exit(0) + +# Migrate in batches +batch_size = 200 +migrated = 0 +errors = 0 + +for i in range(0, len(to_migrate), batch_size): + batch = to_migrate[i:i+batch_size] + + # Generate embeddings + texts = [r['document'] for r in batch] + try: + embeddings = embedder.encode(texts, show_progress_bar=False).tolist() + except Exception as e: + print(f"Embedding error at batch {i}: {e}") + errors += len(batch) + continue + + # Insert into PostgreSQL + for row, embedding in zip(batch, embeddings): + try: + pg_cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + type = EXCLUDED.type, + created_at = EXCLUDED.created_at, + metadata = EXCLUDED.metadata + """, ( + row['embedding_id'], + row['document'], + embedding, + row['source'], + row['type'], + row['created_at'], + json.dumps({ + 'source': row['source'], + 'type': row['type'], + 'created_at': row['created_at'], + }) + )) + migrated += 1 + except Exception as e: + print(f"Insert error for {row['embedding_id']}: {e}") + errors += 1 + + pg.commit() + print(f"Progress: {min(i+batch_size, len(to_migrate))}/{len(to_migrate)} ({errors} errors)") + +# Final count +pg_cur.execute("SELECT COUNT(*) FROM embeddings") +final_count = pg_cur.fetchone()[0] + +chroma.close() +pg.close() + +print(f"\nMigration complete:") +print(f" Migrated: {migrated}") +print(f" Errors: {errors}") +print(f" PostgreSQL total: {final_count}")