Files
aaronAI/scripts/experiments/cost_test_graphiti_bulk_retry.py
T
aaron 3f7fba7e0e scripts/: separate production from experimental and deprecated
Moves 28 experiment scripts to scripts/experiments/ (E1, E1.4, E1.6, E2,
base_class, cascade, cost_test, briefing, consistency, token series).
Moves 2 dissolved-layer scripts to scripts/deprecated/ (consolidator_v0_1.py,
tier1_migration.py — under the bespoke decision both target retired
substrate work).
Removes 19 .bak* files from disk (gitignored, never tracked; git history
is the durable record of every prior version).

The 11 production scripts remain in scripts/. All systemd ExecStart paths,
api.py subprocess calls, and cron jobs continue to resolve correctly —
verified by grep against /etc/systemd/system/aaronai-*.service, scripts/
references in api.py, and the user crontab.

Track 1 inventory cross-cutting finding: scripts/ mixed 11 production
files with 32 experimental scripts and ~20 .bak files. After this commit
a clean-room reader can identify the live workers from a directory listing
alone.

Found by Track 1 inventory 2026-05-02. See
~/aaronai/docs/scripts-reorg-plan-2026-05-02.md for full reasoning.

After commit, run:
1. git log --oneline -3 — show the new commit on top
2. git status — confirm clean working tree (modulo the docs/ untracked files which are intentional)
2026-05-02 23:28:24 +00:00

123 lines
4.1 KiB
Python

"""
Retest just the previously-failed batches after raising MAX_QUEUED_QUERIES.
Reads failed sources from graphiti_bulk_cost_test.json and resubmits.
"""
import json, os, time
from pathlib import Path
import psycopg2, requests
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
GRAPHITI_URL = "http://localhost:8001"
PG_DSN = os.environ["PG_DSN"]
BATCH_SIZE = 5
PRIOR_RESULTS = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_cost_test.json"
OUT = Path.home() / "aaronai" / "experiments" / "graphiti_bulk_retry.json"
def fetch_doc_for_source(cur, source):
cur.execute("""
SELECT STRING_AGG(document, E'\\n\\n' ORDER BY id)
FROM embeddings WHERE source = %s
""", (source,))
row = cur.fetchone()
return row[0] if row else None
def submit_bulk_batch(batch):
payload = {"episodes": [
{"name": s, "content": d[:12000],
"source_description": "pgvector_migration_bulk_retry",
"timestamp": "2026-04-28T00:00:00"}
for s, d in batch
]}
t0 = time.time()
try:
r = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=900)
return {
"batch_size": len(batch),
"status_code": r.status_code,
"elapsed_s": round(time.time() - t0, 2),
"elapsed_per_episode_s": round((time.time() - t0) / len(batch), 2),
"error": None if r.ok else r.text[:500],
"sources": [s for s, _ in batch],
}
except Exception as e:
return {
"batch_size": len(batch),
"status_code": None,
"elapsed_s": round(time.time() - t0, 2),
"elapsed_per_episode_s": None,
"error": str(e)[:500],
"sources": [s for s, _ in batch],
}
def main():
prior = json.loads(PRIOR_RESULTS.read_text())
failed_sources = []
for batch_result in prior["results"]:
if batch_result["error"] is not None:
failed_sources.extend(batch_result["sources"])
print(f"Retrying {len(failed_sources)} previously-failed sources")
conn = psycopg2.connect(PG_DSN)
cur = conn.cursor()
sources_with_docs = []
for s in failed_sources:
doc = fetch_doc_for_source(cur, s)
if doc:
sources_with_docs.append((s, doc))
else:
print(f" WARN: could not find doc for source {s}")
cur.close(); conn.close()
print(f"Loaded {len(sources_with_docs)} source docs")
print()
batches = [sources_with_docs[i:i+BATCH_SIZE]
for i in range(0, len(sources_with_docs), BATCH_SIZE)]
results = []
total_start = time.time()
for i, batch in enumerate(batches, start=1):
avg = int(sum(len(d) for _, d in batch) / len(batch))
print(f"[batch {i:2d}/{len(batches)}] n={len(batch)} avg_chars={avg:6d}",
end=" ", flush=True)
result = submit_bulk_batch(batch)
results.append(result)
if result["error"]:
print(f" ERROR: {result['error'][:80]}")
else:
print(f" {result['status_code']} {result['elapsed_s']}s")
total_elapsed = time.time() - total_start
successful = [r for r in results if r["error"] is None]
failed = [r for r in results if r["error"] is not None]
summary = {
"n_retry_sources": len(sources_with_docs),
"n_batches": len(batches),
"successful_batches": len(successful),
"failed_batches": len(failed),
"successful_episodes": sum(r["batch_size"] for r in successful),
"failed_episodes": sum(r["batch_size"] for r in failed),
"total_elapsed_s": round(total_elapsed, 1),
"results": results,
}
OUT.write_text(json.dumps(summary, indent=2))
print()
print("=" * 60)
print("RETRY RESULTS")
print("=" * 60)
print(f"Episodes: {summary['successful_episodes']}/{len(sources_with_docs)} succeeded")
print(f"Batches: {summary['successful_batches']}/{summary['n_batches']} succeeded")
print(f"Total elapsed: {summary['total_elapsed_s']}s")
print()
print(f"Full results: {OUT}")
if __name__ == "__main__":
main()