92 lines
3.3 KiB
Python
92 lines
3.3 KiB
Python
"""
|
|
Aaron AI — Migration: pgvector to Graphiti
|
|
One-time migration. Test with limit first: python3 migrate_to_graphiti.py 100
|
|
"""
|
|
import os, sys, json, time, requests, psycopg2
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
GRAPHITI_URL = "http://localhost:8001"
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
GROUP_ID = "aaron"
|
|
BATCH_PAUSE = 0.5
|
|
PROGRESS_FILE = Path.home() / "aaronai" / "migration_progress.json"
|
|
|
|
def load_progress():
|
|
if PROGRESS_FILE.exists():
|
|
return json.loads(PROGRESS_FILE.read_text())
|
|
return {"completed_ids": [], "failed_ids": []}
|
|
|
|
def save_progress(progress):
|
|
PROGRESS_FILE.write_text(json.dumps(progress, indent=2))
|
|
|
|
def migrate(limit=None):
|
|
try:
|
|
resp = requests.get(f"{GRAPHITI_URL}/health", timeout=5)
|
|
print(f"Graphiti: {resp.json()}")
|
|
except Exception as e:
|
|
print(f"ERROR: sidecar not reachable — {e}"); sys.exit(1)
|
|
|
|
progress = load_progress()
|
|
completed_ids = set(progress["completed_ids"])
|
|
failed_ids = progress["failed_ids"]
|
|
if completed_ids:
|
|
print(f"Resuming — {len(completed_ids)} done, {len(failed_ids)} failed")
|
|
|
|
pg = psycopg2.connect(PG_DSN)
|
|
cur = pg.cursor()
|
|
query = "SELECT id, document, source, created_at FROM embeddings ORDER BY created_at ASC"
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
pg.close()
|
|
|
|
pending = [r for r in rows if r[0] not in completed_ids]
|
|
print(f"Total: {len(rows)} | Pending: {len(pending)}{' [TEST]' if limit else ''}\n")
|
|
|
|
success = len(completed_ids)
|
|
failed = len(failed_ids)
|
|
start = time.time()
|
|
|
|
for i, (id, document, source, created_at) in enumerate(pending):
|
|
try:
|
|
src = (source or "unknown").replace("/", "-").replace(" ", "-")[:80]
|
|
name = f"{src}-{id[:8]}"
|
|
requests.post(f"{GRAPHITI_URL}/episodes", json={
|
|
"name": name,
|
|
"content": document,
|
|
"source_description": source or "nextcloud-corpus",
|
|
"timestamp": created_at or datetime.now().isoformat(),
|
|
"group_id": GROUP_ID,
|
|
}, timeout=120).raise_for_status()
|
|
success += 1
|
|
progress["completed_ids"].append(id)
|
|
if success % 10 == 0:
|
|
save_progress(progress)
|
|
if (i + 1) % 50 == 0:
|
|
elapsed = time.time() - start
|
|
rate = (i + 1) / elapsed
|
|
remaining = (len(pending) - i - 1) / rate if rate > 0 else 0
|
|
print(f" [{i+1}/{len(pending)}] {success} ok, {failed} failed | ~{remaining/60:.0f} min left")
|
|
time.sleep(BATCH_PAUSE)
|
|
except Exception as e:
|
|
failed += 1
|
|
progress["failed_ids"].append({"id": id, "error": str(e)})
|
|
print(f" FAILED {id}: {e}")
|
|
save_progress(progress)
|
|
time.sleep(2)
|
|
|
|
save_progress(progress)
|
|
elapsed = time.time() - start
|
|
print(f"\nDone — {success} ok, {failed} failed, {elapsed/60:.1f} min")
|
|
if limit and len(pending) > 0:
|
|
est = (elapsed / len(pending)) * 12915 / 60
|
|
print(f"Estimated full run: ~{est:.0f} min")
|
|
|
|
if __name__ == "__main__":
|
|
migrate(int(sys.argv[1]) if len(sys.argv) > 1 else None)
|