Files
aaronAI/deprecated/migrate_to_graphiti.py

92 lines
3.3 KiB
Python

"""
Aaron AI — Migration: pgvector to Graphiti
One-time migration. Test with limit first: python3 migrate_to_graphiti.py 100
"""
import os, sys, json, time, requests, psycopg2
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
GRAPHITI_URL = "http://localhost:8001"
PG_DSN = os.getenv("PG_DSN")
GROUP_ID = "aaron"
BATCH_PAUSE = 0.5
PROGRESS_FILE = Path.home() / "aaronai" / "migration_progress.json"
def load_progress():
if PROGRESS_FILE.exists():
return json.loads(PROGRESS_FILE.read_text())
return {"completed_ids": [], "failed_ids": []}
def save_progress(progress):
PROGRESS_FILE.write_text(json.dumps(progress, indent=2))
def migrate(limit=None):
try:
resp = requests.get(f"{GRAPHITI_URL}/health", timeout=5)
print(f"Graphiti: {resp.json()}")
except Exception as e:
print(f"ERROR: sidecar not reachable — {e}"); sys.exit(1)
progress = load_progress()
completed_ids = set(progress["completed_ids"])
failed_ids = progress["failed_ids"]
if completed_ids:
print(f"Resuming — {len(completed_ids)} done, {len(failed_ids)} failed")
pg = psycopg2.connect(PG_DSN)
cur = pg.cursor()
query = "SELECT id, document, source, created_at FROM embeddings ORDER BY created_at ASC"
if limit:
query += f" LIMIT {limit}"
cur.execute(query)
rows = cur.fetchall()
pg.close()
pending = [r for r in rows if r[0] not in completed_ids]
print(f"Total: {len(rows)} | Pending: {len(pending)}{' [TEST]' if limit else ''}\n")
success = len(completed_ids)
failed = len(failed_ids)
start = time.time()
for i, (id, document, source, created_at) in enumerate(pending):
try:
src = (source or "unknown").replace("/", "-").replace(" ", "-")[:80]
name = f"{src}-{id[:8]}"
requests.post(f"{GRAPHITI_URL}/episodes", json={
"name": name,
"content": document,
"source_description": source or "nextcloud-corpus",
"timestamp": created_at or datetime.now().isoformat(),
"group_id": GROUP_ID,
}, timeout=120).raise_for_status()
success += 1
progress["completed_ids"].append(id)
if success % 10 == 0:
save_progress(progress)
if (i + 1) % 50 == 0:
elapsed = time.time() - start
rate = (i + 1) / elapsed
remaining = (len(pending) - i - 1) / rate if rate > 0 else 0
print(f" [{i+1}/{len(pending)}] {success} ok, {failed} failed | ~{remaining/60:.0f} min left")
time.sleep(BATCH_PAUSE)
except Exception as e:
failed += 1
progress["failed_ids"].append({"id": id, "error": str(e)})
print(f" FAILED {id}: {e}")
save_progress(progress)
time.sleep(2)
save_progress(progress)
elapsed = time.time() - start
print(f"\nDone — {success} ok, {failed} failed, {elapsed/60:.1f} min")
if limit and len(pending) > 0:
est = (elapsed / len(pending)) * 12915 / 60
print(f"Estimated full run: ~{est:.0f} min")
if __name__ == "__main__":
migrate(int(sys.argv[1]) if len(sys.argv) > 1 else None)