""" Measure actual Graphiti episode-add cost on a stratified sample of pgvector sources. """ import json, os, random, time from pathlib import Path import psycopg2, requests from dotenv import load_dotenv load_dotenv(Path.home() / "aaronai" / ".env") GRAPHITI_URL = "http://localhost:8001" PG_DSN = os.environ["PG_DSN"] SAMPLE_SIZE = 50 RANDOM_SEED = 42 OUT = Path.home() / "aaronai" / "experiments" / "graphiti_cost_test.json" OUT.parent.mkdir(parents=True, exist_ok=True) def fetch_stratified_sample(): conn = psycopg2.connect(PG_DSN) cur = conn.cursor() cur.execute(""" SELECT source, STRING_AGG(document, E'\\n\\n' ORDER BY id) AS full_doc FROM embeddings GROUP BY source """) sources = [(s, doc) for s, doc in cur.fetchall() if doc] cur.close(); conn.close() random.seed(RANDOM_SEED) short = [(s, d) for s, d in sources if len(d) < 1000] medium = [(s, d) for s, d in sources if 1000 <= len(d) < 5000] long_ = [(s, d) for s, d in sources if len(d) >= 5000] print(f"Pool: short={len(short)} medium={len(medium)} long={len(long_)}") sample = ( random.sample(short, min(15, len(short))) + random.sample(medium, min(25, len(medium))) + random.sample(long_, min(10, len(long_))) ) print(f"Sample: {len(sample)} sources") return sample def submit_episode(source: str, document: str) -> dict: payload = { "name": source, "content": document[:12000], "source_description": "pgvector_migration_cost_test", "timestamp": "2026-04-28T00:00:00", } t0 = time.time() try: r = requests.post(f"{GRAPHITI_URL}/episodes", json=payload, timeout=600) return { "source": source, "doc_chars": len(document), "doc_chars_sent": min(len(document), 12000), "status_code": r.status_code, "elapsed_s": round(time.time() - t0, 2), "error": None if r.ok else r.text[:500], } except Exception as e: return { "source": source, "doc_chars": len(document), "doc_chars_sent": min(len(document), 12000), "status_code": None, "elapsed_s": round(time.time() - t0, 2), "error": str(e)[:500], } def main(): print("=" * 60) print("Graphiti Migration Cost Test (Haiku 4.5)") print("=" * 60) print() print("BEFORE running:") print(" 1. Open https://console.anthropic.com/settings/usage") print(" 2. Note current spend.") print() input("Press Enter when noted... ") print() sample = fetch_stratified_sample() if not sample: print("ERROR: empty sample"); return # Smoke test print(f"Smoke test on first source ({sample[0][0][:50]}...):") smoke = submit_episode(*sample[0]) print(f" status={smoke['status_code']} elapsed={smoke['elapsed_s']}s") if smoke["error"]: print(f" ERROR: {smoke['error']}") OUT.write_text(json.dumps({"smoke_test": smoke}, indent=2)) print("Halted — fix smoke test before bulk run.") return print(f" OK. Proceeding with {len(sample)} sources.") print() results = [smoke] total_start = time.time() for i, (source, doc) in enumerate(sample[1:], start=2): bucket = "short" if len(doc) < 1000 else "medium" if len(doc) < 5000 else "long" print(f"[{i:2d}/{len(sample)}] [{bucket:6s}] [{len(doc):6d}c] {source[:50]:50s}", end=" ", flush=True) result = submit_episode(source, doc) results.append(result) if result["error"]: print(f" ERROR: {result['error'][:80]}") else: print(f" {result['status_code']} {result['elapsed_s']}s") total_elapsed = time.time() - total_start successful = [r for r in results if r["error"] is None] failed = [r for r in results if r["error"] is not None] summary = { "sample_size": len(sample), "successful": len(successful), "failed": len(failed), "total_elapsed_s": round(total_elapsed, 1), "mean_elapsed_per_episode_s": round( sum(r["elapsed_s"] for r in successful) / max(len(successful), 1), 2 ), "by_bucket": {}, "results": results, } for bname, lo, hi in [("short", 0, 1000), ("medium", 1000, 5000), ("long", 5000, 10**9)]: b = [r for r in successful if lo <= r["doc_chars"] < hi] if b: summary["by_bucket"][bname] = { "n": len(b), "mean_elapsed_s": round(sum(r["elapsed_s"] for r in b) / len(b), 2), "mean_chars": int(sum(r["doc_chars"] for r in b) / len(b)), } conn = psycopg2.connect(PG_DSN) cur = conn.cursor() cur.execute("SELECT COUNT(DISTINCT source) FROM embeddings") total_sources = cur.fetchone()[0] cur.close(); conn.close() summary["total_corpus_sources"] = total_sources summary["estimated_migration_hours"] = round( total_sources * summary["mean_elapsed_per_episode_s"] / 3600, 1 ) OUT.write_text(json.dumps(summary, indent=2)) print() print("=" * 60) print("RESULTS") print("=" * 60) print(f"Sample: {summary['successful']}/{summary['sample_size']} succeeded, {summary['failed']} failed") print(f"Total elapsed: {summary['total_elapsed_s']}s") print(f"Mean per episode: {summary['mean_elapsed_per_episode_s']}s") for bucket, stats in summary["by_bucket"].items(): print(f" {bucket:6s} n={stats['n']:3d} chars~{stats['mean_chars']:6d} elapsed~{stats['mean_elapsed_s']}s") print() print(f"Total corpus sources: {summary['total_corpus_sources']}") print(f"Estimated migration runtime: {summary['estimated_migration_hours']} hours") print() print("AFTER:") print(" Wait 5 min; note new Anthropic spend; subtract.") print(f" test_cost / {summary['successful']} = per-episode cost") print(f" per-episode * {summary['total_corpus_sources']} = full migration estimate") print() print(f"Full results: {OUT}") if __name__ == "__main__": main()