Files
aaronAI/scripts/experiments/cascade_optimization_test.py
aaron 3f7fba7e0e scripts/: separate production from experimental and deprecated
Moves 28 experiment scripts to scripts/experiments/ (E1, E1.4, E1.6, E2,
base_class, cascade, cost_test, briefing, consistency, token series).
Moves 2 dissolved-layer scripts to scripts/deprecated/ (consolidator_v0_1.py,
tier1_migration.py — under the bespoke decision both target retired
substrate work).
Removes 19 .bak* files from disk (gitignored, never tracked; git history
is the durable record of every prior version).

The 11 production scripts remain in scripts/. All systemd ExecStart paths,
api.py subprocess calls, and cron jobs continue to resolve correctly —
verified by grep against /etc/systemd/system/aaronai-*.service, scripts/
references in api.py, and the user crontab.

Track 1 inventory cross-cutting finding: scripts/ mixed 11 production
files with 32 experimental scripts and ~20 .bak files. After this commit
a clean-room reader can identify the live workers from a directory listing
alone.

Found by Track 1 inventory 2026-05-02. See
~/aaronai/docs/scripts-reorg-plan-2026-05-02.md for full reasoning.

After commit, run:
1. git log --oneline -3 — show the new commit on top
2. git status — confirm clean working tree (modulo the docs/ untracked files which are intentional)
2026-05-02 23:28:24 +00:00

509 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Cascade Optimization Test — skip-small + compressed-draft
Tests whether two optimizations on the entity-drafter cascade meaningfully
improve the savings ceiling beyond the prior unoptimized cascade (12.66%).
Optimizations:
A — Skip-small-docs routing: docs <1000 chars bypass the local pass entirely
B — Compressed draft format: bare JSON array instead of markdown bullets
Conditions:
A — Baseline: single Claude Haiku call, full extraction (unchanged from prior)
B — Optimized cascade: skip-small + compressed draft, otherwise same cascade
Sample: 30 docs from briefing_test_v2_results.json:
- 10 small (<1000 chars) — should show 0% delta if skip-small works
- 12 medium (1000-5000 chars) — primary test bucket
- 8 large (5000-12000 chars, capped at 12K)
Mistral context: 12K (raised from 8K in prior run).
Outputs: ~/aaronai/experiments/cascade_optimization_results.json
"""
import json
import os
import re
import statistics
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import anthropic
import psycopg2
import requests
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
V2_FILE = Path.home() / "aaronai" / "briefing_test_v2_results.json"
OUTPUT_FILE = Path.home() / "aaronai" / "experiments" / "cascade_optimization_results.json"
HAIKU_MODEL = "claude-haiku-4-5-20251001"
HAIKU_MAX_TOKENS = 4096
HAIKU_TEMPERATURE = 0.0
OLLAMA_URL = "http://localhost:11434/api/generate"
LOCAL_MODEL = "mistral"
LOCAL_TIMEOUT = 180 # raised — 12K context can take longer
MAX_DOC_CHARS = 12000 # raised from 8K
SKIP_SMALL_THRESHOLD = 1000
HAIKU_IN_PER_M = 1.0
HAIKU_OUT_PER_M = 5.0
CONDITION_A_PROMPT = """Extract a knowledge graph from the document below.
Return ONLY valid JSON with this exact schema:
{
"entities": [
{"name": string, "type": string}
],
"edges": [
{"subject": string, "predicate": string, "object": string}
]
}
Entity types: use whatever fits the entity. Do not constrain yourself to a fixed list.
Edge predicates: natural language phrases that capture the actual relationship the document states or implies.
Extract every entity and every relationship the document states or strongly implies. Both subject and object in every edge must appear in entities. JSON only, no commentary, no markdown fences.
DOCUMENT:
"""
LOCAL_PROMPT = """List every named entity that appears in the document below — every person, organization, place, project, document, material, technique, date, event, or other named thing.
Return ONLY valid JSON:
{
"candidates": [string]
}
Just names. No types, no relationships. JSON only.
DOCUMENT:
"""
# Compressed draft format — bare JSON array, minimal preamble
CONDITION_B_API_PROMPT_COMPRESSED = """Extract a knowledge graph from the document below.
Local model entity candidates (hint, not authoritative — verify against the document, ignore false ones, add missed ones):
{local_draft_json}
Return ONLY valid JSON with this exact schema:
{
"entities": [
{"name": string, "type": string}
],
"edges": [
{"subject": string, "predicate": string, "object": string}
]
}
Entity types: use whatever fits. Edge predicates: natural language phrases capturing the actual relationship. Both subject and object in every edge must appear in entities. Extract every entity and every relationship the document states or strongly implies. JSON only, no commentary, no markdown fences.
DOCUMENT:
"""
def strip_json_fences(text):
if not text:
return ""
t = text.strip()
t = re.sub(r"^```(?:json)?\s*", "", t)
t = re.sub(r"\s*```$", "", t)
return t.strip()
def fetch_document_text(pg_conn, source):
cur = pg_conn.cursor()
cur.execute(
"SELECT document FROM embeddings WHERE source = %s ORDER BY id",
(source,),
)
rows = cur.fetchall()
cur.close()
if not rows:
return None, 0
full = "\n\n".join(r[0] for r in rows)
return full[:MAX_DOC_CHARS], len(full)
def call_haiku(client, prompt_text):
t0 = time.time()
resp = client.messages.create(
model=HAIKU_MODEL,
max_tokens=HAIKU_MAX_TOKENS,
temperature=HAIKU_TEMPERATURE,
messages=[{"role": "user", "content": prompt_text}],
)
return {
"input_tokens": resp.usage.input_tokens,
"output_tokens": resp.usage.output_tokens,
"latency_s": round(time.time() - t0, 2),
"response_text": resp.content[0].text if resp.content else "",
"stop_reason": resp.stop_reason,
}
def call_local(document_text):
t0 = time.time()
try:
resp = requests.post(
OLLAMA_URL,
json={
"model": LOCAL_MODEL,
"prompt": LOCAL_PROMPT + document_text,
"stream": False,
"format": "json",
"options": {"num_predict": 1024, "temperature": 0, "num_ctx": 12288},
},
timeout=LOCAL_TIMEOUT,
)
resp.raise_for_status()
return {
"response": resp.json().get("response", ""),
"latency_s": round(time.time() - t0, 2),
}
except Exception as e:
return {"error": str(e), "latency_s": round(time.time() - t0, 2)}
def parse_graph(raw):
cleaned = strip_json_fences(raw)
if not cleaned:
return None, None
try:
data = json.loads(cleaned)
except json.JSONDecodeError:
return None, None
if not isinstance(data, dict):
return None, None
ents = data.get("entities")
edges = data.get("edges")
if isinstance(ents, list) and isinstance(edges, list):
return len(ents), len(edges)
return None, None
def parse_candidates(raw):
cleaned = strip_json_fences(raw)
if not cleaned:
return None
try:
data = json.loads(cleaned)
except json.JSONDecodeError:
return None
if not isinstance(data, dict):
return None
cands = data.get("candidates")
if isinstance(cands, list):
return [str(c).strip() for c in cands if c]
return None
def stratify(docs):
"""Pick 10 small / 12 medium / 8 large by character length, in file order."""
sized = [(d, d["content_length"]) for d in docs]
small = [d for d, n in sized if n < 1000]
medium = [d for d, n in sized if 1000 <= n < 5000]
large = [d for d, n in sized if n >= 5000]
return small[:10] + medium[:12] + large[:8]
def main():
api_key = os.environ.get("ANTHROPIC_API_KEY")
pg_dsn = os.environ.get("PG_DSN")
if not api_key or not pg_dsn:
print("ERROR: ANTHROPIC_API_KEY or PG_DSN not set", file=sys.stderr)
sys.exit(1)
if not V2_FILE.exists():
print(f"ERROR: {V2_FILE} not found", file=sys.stderr)
sys.exit(1)
with open(V2_FILE) as f:
v2 = json.load(f)
docs_meta = [d for d in v2["documents"] if d.get("status") == "SUCCESS"]
sample = stratify(docs_meta)
print(f"Sample: {len(sample)} docs (10s/12m/8l, file order)")
print(f"Skip-small threshold: <{SKIP_SMALL_THRESHOLD} chars")
print(f"Mistral context: 12288 tokens, doc cap {MAX_DOC_CHARS} chars")
print(f"Haiku model: {HAIKU_MODEL} temp={HAIKU_TEMPERATURE} max_tokens={HAIKU_MAX_TOKENS}")
print()
client = anthropic.Anthropic(api_key=api_key)
pg_conn = psycopg2.connect(pg_dsn)
results = []
started_at = datetime.now(timezone.utc).isoformat()
t_total = time.time()
for i, doc_meta in enumerate(sample, 1):
source = doc_meta["source"]
doc_text, original_len = fetch_document_text(pg_conn, source)
if not doc_text:
print(f"[{i:02d}/{len(sample)}] {source[:55]} — SKIP (not in pgvector)")
results.append({"source": source, "skipped": "not_in_pgvector"})
continue
sent_len = len(doc_text)
truncated = original_len > sent_len
size_bucket = (
"small" if sent_len < 1000
else "medium" if sent_len < 5000
else "large"
)
skip_small_routed = sent_len < SKIP_SMALL_THRESHOLD
trunc_marker = "*" if truncated else " "
route_marker = "[skip-small]" if skip_small_routed else "[cascade] "
print(f"[{i:02d}/{len(sample)}] [{size_bucket:6s}] [{sent_len:>5}c{trunc_marker}] "
f"{route_marker} {source[:50]}", flush=True)
# Condition A — always runs
try:
a = call_haiku(client, CONDITION_A_PROMPT + doc_text)
a_ents, a_edges = parse_graph(a["response_text"])
print(f" A: in={a['input_tokens']} out={a['output_tokens']} "
f"ents={a_ents} edges={a_edges} stop={a['stop_reason']} t={a['latency_s']}s",
flush=True)
except Exception as e:
print(f" A FAILED: {e}", flush=True)
a = {"error": str(e)}
a_ents = a_edges = None
# Condition B
if skip_small_routed:
# Skip-small: B = A. Same call, no local pass.
print(f" B: routed to baseline (skip-small)", flush=True)
b = a
b_ents = a_ents
b_edges = a_edges
local_result = {"skipped": "skip_small_routed"}
local_candidates = []
local_raw = ""
else:
local_result = call_local(doc_text)
if "error" in local_result:
print(f" B local FAILED: {local_result['error']} — recording skip", flush=True)
results.append({
"source": source,
"size_bucket": size_bucket,
"doc_chars_original": original_len,
"doc_chars_sent": sent_len,
"truncated": truncated,
"skip_small_routed": False,
"condition_a": {
"input_tokens": a.get("input_tokens"),
"output_tokens": a.get("output_tokens"),
"latency_s": a.get("latency_s"),
"entity_count": a_ents,
"edge_count": a_edges,
"stop_reason": a.get("stop_reason"),
"response_text": a.get("response_text", "")[:4000],
"error": a.get("error"),
},
"condition_b": {
"skipped": "local_model_failed",
"local_error": local_result["error"],
"local_latency_s": local_result.get("latency_s"),
},
})
continue
local_raw = local_result["response"]
cands = parse_candidates(local_raw)
local_candidates = cands or []
print(f" B local: t={local_result['latency_s']}s candidates={len(local_candidates)}",
flush=True)
if not local_candidates:
print(f" B local: empty draft — skipping API call", flush=True)
results.append({
"source": source,
"size_bucket": size_bucket,
"doc_chars_original": original_len,
"doc_chars_sent": sent_len,
"truncated": truncated,
"skip_small_routed": False,
"condition_a": {
"input_tokens": a.get("input_tokens"),
"output_tokens": a.get("output_tokens"),
"latency_s": a.get("latency_s"),
"entity_count": a_ents,
"edge_count": a_edges,
"stop_reason": a.get("stop_reason"),
"response_text": a.get("response_text", "")[:4000],
"error": a.get("error"),
},
"condition_b": {
"skipped": "local_draft_empty",
"local_latency_s": local_result.get("latency_s"),
"local_raw": local_raw[:1000],
},
})
continue
# Compressed draft format — bare JSON array
local_draft_json = json.dumps(local_candidates, ensure_ascii=False)
b_prompt = CONDITION_B_API_PROMPT_COMPRESSED.replace("{local_draft_json}", local_draft_json) + doc_text
try:
b = call_haiku(client, b_prompt)
b_ents, b_edges = parse_graph(b["response_text"])
print(f" B api: in={b['input_tokens']} out={b['output_tokens']} "
f"ents={b_ents} edges={b_edges} stop={b['stop_reason']} t={b['latency_s']}s",
flush=True)
except Exception as e:
print(f" B api FAILED: {e}", flush=True)
b = {"error": str(e)}
b_ents = b_edges = None
if "input_tokens" in a and "input_tokens" in b:
in_pct = (b["input_tokens"] - a["input_tokens"]) / a["input_tokens"] * 100 if a["input_tokens"] else 0.0
out_pct = (b["output_tokens"] - a["output_tokens"]) / a["output_tokens"] * 100 if a["output_tokens"] else 0.0
edge_pct_str = "n/a"
if a_edges and b_edges is not None and a_edges > 0:
edge_pct_str = f"{(b_edges - a_edges) / a_edges * 100:+.1f}%"
print(f" Δ input={in_pct:+.1f}% output={out_pct:+.1f}% edges={edge_pct_str}", flush=True)
results.append({
"source": source,
"size_bucket": size_bucket,
"doc_chars_original": original_len,
"doc_chars_sent": sent_len,
"truncated": truncated,
"skip_small_routed": skip_small_routed,
"condition_a": {
"input_tokens": a.get("input_tokens"),
"output_tokens": a.get("output_tokens"),
"latency_s": a.get("latency_s"),
"entity_count": a_ents,
"edge_count": a_edges,
"stop_reason": a.get("stop_reason"),
"response_text": a.get("response_text", "")[:4000],
"error": a.get("error"),
},
"condition_b": {
"skip_small_routed": skip_small_routed,
"local_latency_s": local_result.get("latency_s"),
"local_candidates": local_candidates,
"local_raw": local_raw[:1000],
"api_input_tokens": b.get("input_tokens"),
"api_output_tokens": b.get("output_tokens"),
"api_latency_s": b.get("latency_s"),
"entity_count": b_ents,
"edge_count": b_edges,
"stop_reason": b.get("stop_reason"),
"response_text": b.get("response_text", "")[:4000],
"error": b.get("error"),
},
})
pg_conn.close()
total_elapsed = round(time.time() - t_total, 1)
valid = [r for r in results
if r.get("condition_a", {}).get("input_tokens") is not None
and r.get("condition_b", {}).get("api_input_tokens") is not None]
a_in = sum(r["condition_a"]["input_tokens"] for r in valid)
a_out = sum(r["condition_a"]["output_tokens"] for r in valid)
b_in = sum(r["condition_b"]["api_input_tokens"] for r in valid)
b_out = sum(r["condition_b"]["api_output_tokens"] for r in valid)
a_cost = (a_in * HAIKU_IN_PER_M + a_out * HAIKU_OUT_PER_M) / 1_000_000
b_cost = (b_in * HAIKU_IN_PER_M + b_out * HAIKU_OUT_PER_M) / 1_000_000
by_bucket = {}
for bucket in ("small", "medium", "large"):
rows = [r for r in valid if r["size_bucket"] == bucket]
if not rows:
by_bucket[bucket] = None
continue
ai = sum(r["condition_a"]["input_tokens"] for r in rows)
ao = sum(r["condition_a"]["output_tokens"] for r in rows)
bi = sum(r["condition_b"]["api_input_tokens"] for r in rows)
bo = sum(r["condition_b"]["api_output_tokens"] for r in rows)
ae = [r["condition_a"]["edge_count"] for r in rows if r["condition_a"]["edge_count"] is not None]
be = [r["condition_b"]["edge_count"] for r in rows if r["condition_b"]["edge_count"] is not None]
skip_count = sum(1 for r in rows if r.get("skip_small_routed"))
by_bucket[bucket] = {
"n": len(rows),
"n_skip_small_routed": skip_count,
"n_cascade": len(rows) - skip_count,
"a_input_tokens": ai,
"a_output_tokens": ao,
"b_input_tokens": bi,
"b_output_tokens": bo,
"input_delta_pct": round((bi - ai) / ai * 100, 2) if ai else None,
"output_delta_pct": round((bo - ao) / ao * 100, 2) if ao else None,
"a_avg_edges": round(statistics.mean(ae), 1) if ae else None,
"b_avg_edges": round(statistics.mean(be), 1) if be else None,
}
summary = {
"experiment": "cascade_optimization_test",
"title": "Cascade Optimization — skip-small + compressed-draft",
"started_at": started_at,
"completed_at": datetime.now(timezone.utc).isoformat(),
"haiku_model": HAIKU_MODEL,
"haiku_temperature": HAIKU_TEMPERATURE,
"haiku_max_tokens": HAIKU_MAX_TOKENS,
"local_model": LOCAL_MODEL,
"max_doc_chars": MAX_DOC_CHARS,
"skip_small_threshold": SKIP_SMALL_THRESHOLD,
"n_documents": len(sample),
"n_valid_pairs": len(valid),
"n_skipped": len(sample) - len(valid),
"total_elapsed_s": total_elapsed,
"totals": {
"a_input_tokens": a_in,
"a_output_tokens": a_out,
"b_input_tokens": b_in,
"b_output_tokens": b_out,
"a_cost_usd": round(a_cost, 4),
"b_cost_usd": round(b_cost, 4),
"cost_delta_usd": round(b_cost - a_cost, 4),
"cost_delta_pct": round((b_cost - a_cost) / a_cost * 100, 2) if a_cost else None,
"prior_unoptimized_cascade_pct": -12.66,
"note": "API cost only — local Mistral runtime on VPS not monetized",
},
"by_size_bucket": by_bucket,
"results": results,
}
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(OUTPUT_FILE, "w") as f:
json.dump(summary, f, indent=2)
print()
print("=" * 60)
print(f"DONE — {len(valid)}/{len(sample)} valid pairs in {total_elapsed}s")
print(f"A total cost: ${a_cost:.4f} (in={a_in} out={a_out})")
print(f"B total cost: ${b_cost:.4f} (in={b_in} out={b_out})")
delta_pct = summary['totals']['cost_delta_pct']
if delta_pct is not None:
verdict = "B cheaper" if delta_pct < 0 else "B more expensive"
print(f"Cost delta: {delta_pct:+.2f}% ({verdict})")
opt_delta = delta_pct - (-12.66)
print(f"Optimization delta vs prior cascade: {opt_delta:+.2f} points "
f"(prior was -12.66%)")
print()
print("By size bucket:")
for bucket, stats in by_bucket.items():
if stats:
print(f" {bucket:6s} (n={stats['n']}, skip={stats['n_skip_small_routed']}): "
f"in {stats['input_delta_pct']:+.1f}% "
f"out {stats['output_delta_pct']:+.1f}% "
f"edges A={stats['a_avg_edges']} B={stats['b_avg_edges']}")
print()
print("Results: " + str(OUTPUT_FILE))
if __name__ == "__main__":
main()