""" Retest just the previously-failed batches after raising MAX_QUEUED_QUERIES. Reads failed sources from graphiti_bulk_cost_test.json and resubmits. """ import json, os, time from pathlib import Path import psycopg2, requests from dotenv import load_dotenv load_dotenv(Path.home() / "aaronai" / ".env") GRAPHITI_URL = "http://localhost:8001" PG_DSN = os.environ["PG_DSN"] BATCH_SIZE = 5 PRIOR_RESULTS = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_cost_test.json" OUT = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_retry.json" def fetch_doc_for_source(cur, source): cur.execute(""" SELECT STRING_AGG(document, E'\\n\\n' ORDER BY id) FROM embeddings WHERE source = %s """, (source,)) row = cur.fetchone() return row[0] if row else None def submit_bulk_batch(batch): payload = {"episodes": [ {"name": s, "content": d[:12000], "source_description": "pgvector_migration_bulk_retry", "timestamp": "2026-04-28T00:00:00"} for s, d in batch ]} t0 = time.time() try: r = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=900) return { "batch_size": len(batch), "status_code": r.status_code, "elapsed_s": round(time.time() - t0, 2), "elapsed_per_episode_s": round((time.time() - t0) / len(batch), 2), "error": None if r.ok else r.text[:500], "sources": [s for s, _ in batch], } except Exception as e: return { "batch_size": len(batch), "status_code": None, "elapsed_s": round(time.time() - t0, 2), "elapsed_per_episode_s": None, "error": str(e)[:500], "sources": [s for s, _ in batch], } def main(): prior = json.loads(PRIOR_RESULTS.read_text()) failed_sources = [] for batch_result in prior["results"]: if batch_result["error"] is not None: failed_sources.extend(batch_result["sources"]) print(f"Retrying {len(failed_sources)} previously-failed sources") conn = psycopg2.connect(PG_DSN) cur = conn.cursor() sources_with_docs = [] for s in failed_sources: doc = fetch_doc_for_source(cur, s) if doc: sources_with_docs.append((s, doc)) else: print(f" WARN: could not find doc for source {s}") cur.close(); conn.close() print(f"Loaded {len(sources_with_docs)} source docs") print() batches = [sources_with_docs[i:i+BATCH_SIZE] for i in range(0, len(sources_with_docs), BATCH_SIZE)] results = [] total_start = time.time() for i, batch in enumerate(batches, start=1): avg = int(sum(len(d) for _, d in batch) / len(batch)) print(f"[batch {i:2d}/{len(batches)}] n={len(batch)} avg_chars={avg:6d}", end=" ", flush=True) result = submit_bulk_batch(batch) results.append(result) if result["error"]: print(f" ERROR: {result['error'][:80]}") else: print(f" {result['status_code']} {result['elapsed_s']}s") total_elapsed = time.time() - total_start successful = [r for r in results if r["error"] is None] failed = [r for r in results if r["error"] is not None] summary = { "n_retry_sources": len(sources_with_docs), "n_batches": len(batches), "successful_batches": len(successful), "failed_batches": len(failed), "successful_episodes": sum(r["batch_size"] for r in successful), "failed_episodes": sum(r["batch_size"] for r in failed), "total_elapsed_s": round(total_elapsed, 1), "results": results, } OUT.write_text(json.dumps(summary, indent=2)) print() print("=" * 60) print("RETRY RESULTS") print("=" * 60) print(f"Episodes: {summary['successful_episodes']}/{len(sources_with_docs)} succeeded") print(f"Batches: {summary['successful_batches']}/{summary['n_batches']} succeeded") print(f"Total elapsed: {summary['total_elapsed_s']}s") print() print(f"Full results: {OUT}") if __name__ == "__main__": main()