94 lines
3.1 KiB
Python
94 lines
3.1 KiB
Python
"""Retry attempt #2 — for sources that timed out after MAX_QUEUED_QUERIES bump."""
|
|
import json, os, time
|
|
from pathlib import Path
|
|
import psycopg2, requests
|
|
from dotenv import load_dotenv
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
GRAPHITI_URL = "http://localhost:8001"
|
|
PG_DSN = os.environ["PG_DSN"]
|
|
BATCH_SIZE = 3 # smaller batches given timeouts
|
|
|
|
PRIOR = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_retry.json"
|
|
OUT = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_retry2.json"
|
|
|
|
|
|
def fetch_doc(cur, source):
|
|
cur.execute("SELECT STRING_AGG(document, E'\\n\\n' ORDER BY id) FROM embeddings WHERE source = %s", (source,))
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def submit_batch(batch):
|
|
payload = {"episodes": [
|
|
{"name": s, "content": d[:12000],
|
|
"source_description": "pgvector_migration_bulk_retry2",
|
|
"timestamp": "2026-04-28T00:00:00"}
|
|
for s, d in batch
|
|
]}
|
|
t0 = time.time()
|
|
try:
|
|
r = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=900)
|
|
return {
|
|
"batch_size": len(batch),
|
|
"status_code": r.status_code,
|
|
"elapsed_s": round(time.time() - t0, 2),
|
|
"error": None if r.ok else r.text[:500],
|
|
"sources": [s for s, _ in batch],
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"batch_size": len(batch),
|
|
"status_code": None,
|
|
"elapsed_s": round(time.time() - t0, 2),
|
|
"error": str(e)[:500],
|
|
"sources": [s for s, _ in batch],
|
|
}
|
|
|
|
|
|
def main():
|
|
prior = json.loads(PRIOR.read_text())
|
|
failed = []
|
|
for r in prior["results"]:
|
|
if r["error"] is not None:
|
|
failed.extend(r["sources"])
|
|
print(f"Retry #2: {len(failed)} sources still failing")
|
|
|
|
conn = psycopg2.connect(PG_DSN); cur = conn.cursor()
|
|
sources = []
|
|
for s in failed:
|
|
d = fetch_doc(cur, s)
|
|
if d: sources.append((s, d))
|
|
cur.close(); conn.close()
|
|
|
|
batches = [sources[i:i+BATCH_SIZE] for i in range(0, len(sources), BATCH_SIZE)]
|
|
print(f"Submitting {len(batches)} batches of up to {BATCH_SIZE}\n")
|
|
|
|
results = []
|
|
for i, batch in enumerate(batches, 1):
|
|
avg = int(sum(len(d) for _, d in batch) / len(batch))
|
|
print(f"[batch {i}/{len(batches)}] n={len(batch)} avg_chars={avg:6d}", end=" ", flush=True)
|
|
r = submit_batch(batch)
|
|
results.append(r)
|
|
if r["error"]: print(f" ERROR: {r['error'][:80]}")
|
|
else: print(f" {r['status_code']} {r['elapsed_s']}s")
|
|
|
|
succ = [r for r in results if r["error"] is None]
|
|
fail = [r for r in results if r["error"] is not None]
|
|
summary = {
|
|
"n_sources": len(sources),
|
|
"successful_batches": len(succ),
|
|
"failed_batches": len(fail),
|
|
"successful_episodes": sum(r["batch_size"] for r in succ),
|
|
"failed_episodes": sum(r["batch_size"] for r in fail),
|
|
"results": results,
|
|
}
|
|
OUT.write_text(json.dumps(summary, indent=2))
|
|
print()
|
|
print(f"Episodes: {summary['successful_episodes']}/{len(sources)} succeeded")
|
|
print(f"Full results: {OUT}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|