Migrate to pgvector — remove ChromaDB from api.py, ingest scripts, dream.py

This commit is contained in:
2026-04-26 21:16:04 +00:00
parent d2eed98906
commit f78b83042b
6 changed files with 250 additions and 83 deletions
+44 -26
View File
@@ -6,10 +6,11 @@ import hashlib
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
import chromadb
from sentence_transformers import SentenceTransformer
import anthropic
from fastapi import FastAPI, Request, Response, Depends, HTTPException
import psycopg2
import psycopg2.extras
from fastapi import UploadFile, File
import tempfile
import os
@@ -46,6 +47,10 @@ DEFAULT_SETTINGS = {
}
print("Loading Aaron AI...")
PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost")
def get_pg():
return psycopg2.connect(PG_DSN)
WHISPER_PROMPT = (
"Grasshopper, Rhino, PolyJet, SLA, FDM, DMLS, ChromaDB, "
"HVAMC, FWN3D, Mossygear, Nextcloud, Gitea, computational design, "
@@ -59,11 +64,7 @@ if HAS_WHISPER:
except Exception as e:
print(f"Whisper not available: {e}")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
chroma_client = chromadb.PersistentClient(path=DB_PATH)
collection = chroma_client.get_or_create_collection(
name="aaronai",
metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True}
)
# ChromaDB removed — using pgvector
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
SYSTEM_PROMPT = """You are the personal AI assistant of Aaron Nelson — computational
@@ -210,11 +211,17 @@ def remove_from_memory(item):
def get_pinned_cv_context():
try:
results = collection.get(
where={"source": {"$in": CV_SOURCES}},
include=["documents", "metadatas"]
pg = get_pg()
cur = pg.cursor()
cur.execute(
"SELECT document, source FROM embeddings WHERE source = ANY(%s)",
(CV_SOURCES,)
)
return results["documents"], results["metadatas"]
rows = cur.fetchall()
pg.close()
docs = [r[0] for r in rows]
metas = [{"source": r[1]} for r in rows]
return docs, metas
except:
return [], []
@@ -227,12 +234,7 @@ def is_professional_query(query):
return any(k in query.lower() for k in keywords)
def retrieve_context(query, n_results=8):
query_embedding = embedder.encode([query]).tolist()
results = collection.query(
query_embeddings=query_embedding,
n_results=n_results,
include=["documents", "metadatas", "distances"]
)
query_embedding = embedder.encode([query]).tolist()[0]
context_pieces = []
sources = []
if is_professional_query(query):
@@ -240,15 +242,24 @@ def retrieve_context(query, n_results=8):
for doc, meta in zip(cv_docs, cv_metas):
context_pieces.append(f"[CV] {doc}")
sources.append(meta.get("source", "CV"))
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
relevance = 1 - dist
if relevance > 0.3 and meta.get("source") not in CV_SOURCES:
context_pieces.append(doc)
sources.append(meta.get("source", "unknown"))
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
WHERE source NOT IN %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, tuple(CV_SOURCES) if CV_SOURCES else ('__none__',),
query_embedding, n_results))
for doc, source, similarity in cur.fetchall():
if similarity > 0.3:
context_pieces.append(doc)
sources.append(source or "unknown")
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return context_pieces, sources
def get_conversation_history(conversation_id, limit=20):
@@ -519,7 +530,14 @@ async def update_memory(request: Request, auth: str = Depends(require_auth)):
@app.get("/api/status")
async def get_status(auth: str = Depends(require_auth)):
chunk_count = collection.count()
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT COUNT(*) FROM embeddings")
chunk_count = cur.fetchone()[0]
pg.close()
except:
chunk_count = 0
# Watcher status
watcher_running = False
+33 -36
View File
@@ -10,8 +10,13 @@ import argparse
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env")
PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
DB_PATH = str(Path.home() / "aaronai" / "db")
@@ -115,16 +120,9 @@ def check_recent_journal(days=3):
# ─── Stage 3: Retrieve ──────────────────────────────────────────────────────
def retrieve(mode, task=None, project=None, n_results=8):
import chromadb
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
client = chromadb.PersistentClient(path=DB_PATH)
collection = client.get_or_create_collection(
name="aaronai",
metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True}
)
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode]
if task:
@@ -138,38 +136,37 @@ def retrieve(mode, task=None, project=None, n_results=8):
else:
query = "research fabrication teaching practice recent work"
embedding = embedder.encode([query]).tolist()
results = collection.query(
query_embeddings=embedding,
n_results=n_results * 3,
include=["documents", "metadatas", "distances"]
)
embedding = embedder.encode([query]).tolist()[0]
chunks = []
chunks = []
seen_sources = set()
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
relevance = 1 - dist
source = meta.get("source", "unknown")
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (embedding, embedding, n_results * 3))
if not (low <= relevance <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source,
"content": doc,
"relevance": relevance,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
for doc, source, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
return chunks
+25 -13
View File
@@ -3,7 +3,9 @@ import sys
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import chromadb
import psycopg2
import psycopg2.extras
import json
from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader
@@ -14,12 +16,10 @@ load_dotenv(Path.home() / "aaronai" / ".env")
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
db_path = str(Path.home() / "aaronai" / "db")
client = chromadb.PersistentClient(path=db_path)
collection = client.get_or_create_collection(
name="aaronai",
metadata={"hnsw:space": "cosine", "hnsw:allow_replace_deleted": True}
)
PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost")
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text_from_docx(path):
doc = Document(path)
@@ -98,12 +98,24 @@ def ingest_file(filepath):
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
collection.upsert(
documents=chunks,
embeddings=embeddings,
ids=ids,
metadatas=metadatas
)
pg = get_pg()
cur = pg.cursor()
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas):
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get('source'), 'document', None,
json.dumps(meta)
))
pg.commit()
pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
return len(chunks)
+3 -1
View File
@@ -3,7 +3,9 @@ import sys
from pathlib import Path
from datetime import datetime
from sentence_transformers import SentenceTransformer
import chromadb
import psycopg2
import psycopg2.extras
import json as json_module
# Paths
db_path = str(Path.home() / "aaronai" / "db")
+23 -7
View File
@@ -2,7 +2,9 @@ import json
import sys
from pathlib import Path
from sentence_transformers import SentenceTransformer
import chromadb
import psycopg2
import psycopg2.extras
import json as json_module
# Paths
db_path = str(Path.home() / "aaronai" / "db")
@@ -102,12 +104,26 @@ def ingest_conversations(path):
continue
embeddings = embedder.encode([n[1] for n in new]).tolist()
collection.upsert(
ids=[n[0] for n in new],
documents=[n[1] for n in new],
metadatas=[n[2] for n in new],
embeddings=embeddings,
)
pg = get_pg()
cur = pg.cursor()
for (chunk_id, chunk_text, meta), embedding in zip(new, embeddings):
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = EXCLUDED.created_at,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk_text, embedding,
meta.get('source'), meta.get('type'), meta.get('created_at'),
json_module.dumps(meta)
))
pg.commit()
pg.close()
total += len(new)
print(f"Conversations: {total} chunks added, {skipped} skipped")
+122
View File
@@ -0,0 +1,122 @@
"""
Migration: ChromaDB → pgvector
Re-embeds all documents from ChromaDB SQLite into PostgreSQL with pgvector.
Keeps ChromaDB intact as backup until migration is verified.
"""
import sqlite3
import psycopg2
import json
from pathlib import Path
from sentence_transformers import SentenceTransformer
CHROMA_SQLITE = str(Path.home() / "aaronai" / "db" / "chroma.sqlite3")
PG_DSN = "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost"
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
print("Connecting to databases...")
chroma = sqlite3.connect(CHROMA_SQLITE)
chroma.row_factory = sqlite3.Row
c = chroma.cursor()
pg = psycopg2.connect(PG_DSN)
pg_cur = pg.cursor()
# Get all documents with their metadata from ChromaDB
print("Reading documents from ChromaDB...")
c.execute("""
SELECT
e.id as row_id,
e.embedding_id,
MAX(CASE WHEN em.key = 'chroma:document' THEN em.string_value END) as document,
MAX(CASE WHEN em.key = 'source' THEN em.string_value END) as source,
MAX(CASE WHEN em.key = 'type' THEN em.string_value END) as type,
MAX(CASE WHEN em.key = 'created_at' THEN em.string_value END) as created_at
FROM embeddings e
LEFT JOIN embedding_metadata em ON e.id = em.id
GROUP BY e.id, e.embedding_id
HAVING document IS NOT NULL
ORDER BY e.id
""")
rows = c.fetchall()
print(f"Found {len(rows)} documents to migrate")
# Check existing in PostgreSQL
pg_cur.execute("SELECT id FROM embeddings")
existing_ids = set(r[0] for r in pg_cur.fetchall())
print(f"Already in PostgreSQL: {len(existing_ids)}")
# Filter to only new ones
to_migrate = [r for r in rows if r['embedding_id'] not in existing_ids]
print(f"Need to migrate: {len(to_migrate)}")
if not to_migrate:
print("Nothing to migrate — already complete")
chroma.close()
pg.close()
exit(0)
# Migrate in batches
batch_size = 200
migrated = 0
errors = 0
for i in range(0, len(to_migrate), batch_size):
batch = to_migrate[i:i+batch_size]
# Generate embeddings
texts = [r['document'] for r in batch]
try:
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
except Exception as e:
print(f"Embedding error at batch {i}: {e}")
errors += len(batch)
continue
# Insert into PostgreSQL
for row, embedding in zip(batch, embeddings):
try:
pg_cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = EXCLUDED.created_at,
metadata = EXCLUDED.metadata
""", (
row['embedding_id'],
row['document'],
embedding,
row['source'],
row['type'],
row['created_at'],
json.dumps({
'source': row['source'],
'type': row['type'],
'created_at': row['created_at'],
})
))
migrated += 1
except Exception as e:
print(f"Insert error for {row['embedding_id']}: {e}")
errors += 1
pg.commit()
print(f"Progress: {min(i+batch_size, len(to_migrate))}/{len(to_migrate)} ({errors} errors)")
# Final count
pg_cur.execute("SELECT COUNT(*) FROM embeddings")
final_count = pg_cur.fetchone()[0]
chroma.close()
pg.close()
print(f"\nMigration complete:")
print(f" Migrated: {migrated}")
print(f" Errors: {errors}")
print(f" PostgreSQL total: {final_count}")