add experiment scripts and results; watcher.py latest changes

This commit is contained in:
2026-04-30 18:06:03 +00:00
parent 1cf26df450
commit f11cacd9c9
55 changed files with 23594 additions and 726 deletions
+190
View File
@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
E1.8 Phase 2 — Evaluate
Pulls predicate counts from FalkorDB for each group_id and compares.
Run after e1_8_taxfree_cascade.py completes.
"""
import json, subprocess
from pathlib import Path
RESULTS_PATH = Path.home() / "aaronai" / "experiments" / "e1_8_results.json"
EVAL_PATH = Path.home() / "aaronai" / "experiments" / "e1_8_eval.json"
GROUP_TAXFREE = "aaron_e18_taxfree"
GROUP_BASELINE = "aaron_e18_baseline"
GROUP_STANDARD = "aaron_e18_standard"
GROUP_PROD = "aaron"
GROUP_E14 = "aaron_cascade_e14"
def query(group_id, cypher):
result = subprocess.run(
["docker", "exec", "falkordb", "redis-cli", "GRAPH.QUERY", group_id, cypher],
capture_output=True, text=True
)
return result.stdout
def get_episode_uuid(group_id, episode_name):
safe = episode_name.replace("'", "\'")
cypher = f"MATCH (e:Episodic) WHERE e.name = '{safe}' RETURN e.uuid LIMIT 1"
output = query(group_id, cypher)
for line in output.split("\n"):
line = line.strip()
if len(line) == 36 and line.count("-") == 4:
return line
return None
def count_preds(group_id, uuid):
cypher = f"MATCH ()-[r:RELATES_TO]->() WHERE '{uuid}' IN r.episodes RETURN count(distinct r.name) AS p"
output = query(group_id, cypher)
for line in output.split("\n"):
line = line.strip()
if line.isdigit():
return int(line)
return 0
def count_edges(group_id, uuid):
cypher = f"MATCH ()-[r:RELATES_TO]->() WHERE '{uuid}' IN r.episodes RETURN count(r) AS n"
output = query(group_id, cypher)
for line in output.split("\n"):
line = line.strip()
if line.isdigit():
return int(line)
return 0
def eval_source(name, groups):
result = {"name": name}
for label, group_id in groups.items():
uuid = get_episode_uuid(group_id, name)
if uuid:
result[f"{label}_preds"] = count_preds(group_id, uuid)
result[f"{label}_edges"] = count_edges(group_id, uuid)
else:
result[f"{label}_preds"] = None
result[f"{label}_edges"] = None
return result
def run():
print("E1.8 — Evaluation phase")
print("=" * 60)
results = json.loads(RESULTS_PATH.read_text())
eval_results = {"subsample_a": [], "subsample_b": []}
# Sub-sample A — compare taxfree vs prod (baseline) vs e14 cascade
print("\nSub-sample A")
print(f"{'Source':<55} {'prod':>5} {'e14c':>5} {'tf':>5} {'e14Δ':>6} {'tfΔ':>6}")
print("-" * 90)
a_records = []
for item in results["subsample_a"]:
name = item["name"]
r = eval_source(name, {
"prod": GROUP_PROD,
"e14": GROUP_E14,
"tf": GROUP_TAXFREE,
})
r["bucket"] = item["bucket"]
r["taxfree_metadata"] = item.get("taxfree_metadata")
r["e14_delta_preds"] = item.get("e14_delta_preds")
prod = r.get("prod_preds") or 0
e14 = r.get("e14_preds") or 0
tf = r.get("tf_preds") or 0
e14_delta = ((e14 - prod) / prod * 100) if prod > 0 else 0
tf_delta = ((tf - prod) / prod * 100) if prod > 0 else 0
display = name[:53] + ".." if len(name) > 55 else name
print(f"{display:<55} {prod:>5} {e14:>5} {tf:>5} {e14_delta:>+5.0f}% {tf_delta:>+5.0f}%")
r["tf_delta_vs_prod"] = tf_delta
r["e14_delta_vs_prod"] = e14_delta
a_records.append(r)
eval_results["subsample_a"].append(r)
# Aggregate Sub-sample A
valid = [r for r in a_records if r.get("prod_preds") and r.get("tf_preds")]
if valid:
mean_e14_delta = sum(r["e14_delta_vs_prod"] for r in valid) / len(valid)
mean_tf_delta = sum(r["tf_delta_vs_prod"] for r in valid) / len(valid)
print(f"\nAggregate Sub-sample A (n={len(valid)}):")
print(f" E1.4 cascade mean delta vs prod: {mean_e14_delta:+.1f}%")
print(f" Taxonomy-free mean delta vs prod: {mean_tf_delta:+.1f}%")
print(f" Taxonomy-free vs E1.4 cascade: {mean_tf_delta - mean_e14_delta:+.1f}pp")
# Sub-sample B — all three conditions
print("\n\nSub-sample B")
print(f"{'Source':<55} {'base':>5} {'std':>5} {'tf':>5} {'stdΔ':>6} {'tfΔ':>6}")
print("-" * 90)
b_records = []
for item in results["subsample_b"]:
name = item["name"]
r = eval_source(name, {
"base": GROUP_BASELINE,
"std": GROUP_STANDARD,
"tf": GROUP_TAXFREE,
})
r["bucket"] = item["bucket"]
r["taxfree_metadata"] = item.get("taxfree_metadata")
r["standard_metadata"] = item.get("standard_metadata")
base = r.get("base_preds") or 0
std = r.get("std_preds") or 0
tf = r.get("tf_preds") or 0
std_delta = ((std - base) / base * 100) if base > 0 else 0
tf_delta = ((tf - base) / base * 100) if base > 0 else 0
display = name[:53] + ".." if len(name) > 55 else name
print(f"{display:<55} {base:>5} {std:>5} {tf:>5} {std_delta:>+5.0f}% {tf_delta:>+5.0f}%")
r["std_delta_vs_base"] = std_delta
r["tf_delta_vs_base"] = tf_delta
b_records.append(r)
eval_results["subsample_b"].append(r)
# Aggregate Sub-sample B
valid_b = [r for r in b_records if r.get("base_preds") and r.get("tf_preds")]
if valid_b:
mean_std_delta = sum(r["std_delta_vs_base"] for r in valid_b) / len(valid_b)
mean_tf_delta = sum(r["tf_delta_vs_base"] for r in valid_b) / len(valid_b)
print(f"\nAggregate Sub-sample B (n={len(valid_b)}):")
print(f" Standard cascade mean delta vs baseline: {mean_std_delta:+.1f}%")
print(f" Taxonomy-free mean delta vs baseline: {mean_tf_delta:+.1f}%")
# By bucket
print("\nPer-bucket (Sub-sample B):")
for bucket in ["high", "mid", "document"]:
br = [r for r in valid_b if r["bucket"] == bucket]
if not br:
continue
m_std = sum(r["std_delta_vs_base"] for r in br) / len(br)
m_tf = sum(r["tf_delta_vs_base"] for r in br) / len(br)
print(f" [{bucket:>8}] n={len(br)} std={m_std:+.0f}% tf={m_tf:+.0f}%")
# Decision rule evaluation
print("\n" + "=" * 60)
print("DECISION RULE:")
if valid:
improvement = mean_tf_delta - mean_e14_delta
if improvement >= 20:
print(f" ✓ STRONG RECOVERY (+{improvement:.1f}pp) — Stage 3.1 ships as taxonomy-free")
elif improvement >= 5:
print(f" ~ PARTIAL RECOVERY (+{improvement:.1f}pp) — orientation helps, needs refinement")
elif improvement >= 0:
print(f" ~ MARGINAL (+{improvement:.1f}pp) — consider API extractor prompt redesign (E1.9)")
else:
print(f" ✗ NEGATIVE ({improvement:.1f}pp) — taxonomy-free introduces more noise than standard")
EVAL_PATH.write_text(json.dumps(eval_results, indent=2))
print(f"\nEval saved to {EVAL_PATH}")
if __name__ == "__main__":
run()
+285
View File
@@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
E1.8 Phase 1 — Ingest
Runs taxonomy-free and standard cascade ingestion for Sub-samples A and B.
Run this first, then run e1_8_eval.py to pull predicate counts.
"""
import os, json, time, psycopg2, requests
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
GRAPHITI_URL = "http://localhost:8001"
RESULTS_PATH = Path.home() / "aaronai" / "experiments" / "e1_8_results.json"
GROUP_TAXFREE = "aaron_e18_taxfree"
GROUP_BASELINE = "aaron_e18_baseline"
GROUP_STANDARD = "aaron_e18_standard"
TAXFREE_PROMPT = """You are a metadata extraction system. Given a document, describe its content shape for use as orientation context in a knowledge graph extraction pass.
Do not summarize content. Do not extract entities. Do not assign a single category label.
Instead, describe:
- What domains or frames are active in this content (there may be several simultaneously)
- How those frames relate to each other in this specific document
- What kind of relational content a knowledge graph extractor should look for
Output JSON only. No prose, no explanation, no markdown.
Schema:
{
"active_frames": ["<frame 1>", "<frame 2>", ...],
"frame_relationships": "<one sentence describing how the frames interact in this document>",
"extraction_orientation": "<one sentence orienting the extractor toward the most relationship-rich content>",
"one_sentence_summary": "<one sentence describing what the document is about>"
}
Document:
"""
STANDARD_PROMPT = """You are a metadata extraction system. Given a document, produce structural and content metadata in strict JSON format.
Do not summarize the content beyond the one-sentence summary field. Do not extract entities or relationships. Do not interpret meaning. Produce only the metadata schema below.
Output JSON only. No prose, no explanation, no markdown code fences.
Schema:
{
"language": "<ISO 639-1 code>",
"char_length": <integer>,
"primary_format": "<prose|slides|code|structured|mixed>",
"structural_signals": {
"has_headings": <boolean>,
"has_bullet_lists": <boolean>,
"has_numbered_lists": <boolean>,
"has_tables": <boolean>,
"has_code_blocks": <boolean>,
"has_dates": <boolean>
},
"content_signals": {
"has_named_people": <boolean>,
"has_institutional_language": <boolean>,
"has_technical_terminology": <boolean>,
"has_first_person": <boolean>,
"has_quotations": <boolean>
},
"domain_class": "<technical|administrative|educational|personal|conversational>",
"one_sentence_summary": "<one sentence describing what the document is about>"
}
Document:
"""
SUBSAMPLE_A = [
{"name": "Claude: Lubbock on everything album lyrics", "bucket": "high"},
{"name": "ChatGPT: Tulsa Concept Album Guide", "bucket": "high"},
{"name": "ChatGPT: Rhino 3D object flow", "bucket": "high"},
{"name": "Claude: SUNY faculty conflict of interest policies", "bucket": "mid"},
{"name": "Claude: Interview presentation research and preparation", "bucket": "mid"},
{"name": "Claude: Research Statement Restructure", "bucket": "mid"},
{"name": "ChatGPT: Respect Individual Interests for Christmas", "bucket": "low"},
{"name": "University of North Texas Cover letter.pdf", "bucket": "document"},
{"name": "Claude: Finding ideal rural housing near University of Utah", "bucket": "high"},
{"name": "ChatGPT: SEC coaches with OSU ties", "bucket": "high"},
{"name": "Claude: Bonding ASA 3D printed parts", "bucket": "mid"},
{"name": "ChatGPT: Title: User request summary.", "bucket": "low"},
{"name": "ChatGPT: Scholarship Recommendation Letter Tips", "bucket": "low"},
]
SUBSAMPLE_B = [
{"name": "ChatGPT: Job application comparison", "bucket": "high"},
{"name": "ChatGPT: External review for tenure", "bucket": "high"},
{"name": "Claude: University of Utah interview teaching example", "bucket": "high"},
{"name": "ChatGPT: Starting Dropship Gun Business", "bucket": "high"},
{"name": "ChatGPT: Analyze business plan", "bucket": "high"},
{"name": "ChatGPT: Outdoor Layering Explained", "bucket": "mid"},
{"name": "ChatGPT: Limits in Calculus.", "bucket": "mid"},
{"name": "ChatGPT: Academic Program Director Role", "bucket": "mid"},
{"name": "ChatGPT: Lonely Island Poop Skit", "bucket": "mid"},
{"name": "ChatGPT: Parse Tidal playlist", "bucket": "mid"},
{"name": "NO thesis proposal.pdf", "bucket": "document"},
{"name": "PWM.pdf", "bucket": "document"},
{"name": "Will_It_Print.pdf", "bucket": "document"},
{"name": "Kim Kedem Ind Study F2025 Syllabus.docx", "bucket": "document"},
{"name": "Aaron Nelson Graduate Transcript.pdf", "bucket": "document"},
]
def get_pg():
return psycopg2.connect(PG_DSN)
def get_document_text(source_name):
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT document FROM embeddings WHERE source = %s ORDER BY id LIMIT 20", (source_name,))
rows = cur.fetchall()
pg.close()
return " ".join(r[0] for r in rows)[:12000]
def run_mistral(prompt_prefix, doc_text, label=""):
print(f" → Mistral {label} running...", flush=True)
payload = {"model": "mistral:latest", "prompt": prompt_prefix + doc_text, "stream": False, "format": "json"}
resp = requests.post("http://localhost:11434/api/generate", json=payload, timeout=300)
resp.raise_for_status()
raw = resp.json().get("response", "{}")
print(f" → Mistral {label} done ({len(raw)} chars)", flush=True)
try:
return json.loads(raw)
except Exception:
return {"error": "parse_failed", "raw": raw[:200]}
def build_taxfree_orientation(meta):
frames = ", ".join(meta.get("active_frames", []))
rel = meta.get("frame_relationships", "")
orient = meta.get("extraction_orientation", "")
summary = meta.get("one_sentence_summary", "")
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
def build_standard_orientation(meta):
dc = meta.get("domain_class", "unknown")
pf = meta.get("primary_format", "unknown")
summary = meta.get("one_sentence_summary", "")
cs = meta.get("content_signals", {})
return (f"domain_class: {dc}\nprimary_format: {pf}\none_sentence_summary: {summary}\n"
f"has_named_people: {cs.get('has_named_people', False)}\n"
f"has_technical_terminology: {cs.get('has_technical_terminology', False)}")
def ingest(source_name, doc_text, orientation, group_id):
payload = {
"episodes": [{
"name": source_name,
"content": doc_text[:12000],
"source_description": orientation,
"timestamp": "2026-04-28T00:00:00",
}],
"group_id": group_id,
}
resp = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=300)
resp.raise_for_status()
def save(results):
RESULTS_PATH.write_text(json.dumps(results, indent=2))
def run():
print("E1.8 — Ingest phase")
print("=" * 60)
# Load existing results if resuming
if RESULTS_PATH.exists():
results = json.loads(RESULTS_PATH.read_text())
done_a = {r["name"] for r in results.get("subsample_a", [])}
done_b = {r["name"] for r in results.get("subsample_b", [])}
print(f"Resuming: {len(done_a)} A done, {len(done_b)} B done")
else:
results = {"subsample_a": [], "subsample_b": []}
done_a, done_b = set(), set()
e14_data = json.loads((Path.home() / "aaronai" / "experiments" / "e14_per_source_comparison.json").read_text())
e14_by_name = {s["name"]: s for s in e14_data}
# Sub-sample A — taxonomy-free only (baseline + standard from E1.4)
print("\nSub-sample A — taxonomy-free ingestion only")
for item in SUBSAMPLE_A:
name = item["name"]
if name in done_a:
print(f" SKIP (done): {name}")
continue
print(f"\n {name}")
doc_text = get_document_text(name)
if not doc_text:
print(f" SKIP — no text")
continue
tf_meta = run_mistral(TAXFREE_PROMPT, doc_text, "taxfree")
print(f" frames: {tf_meta.get('active_frames', 'ERROR')}")
orientation = build_taxfree_orientation(tf_meta)
try:
ingest(name, doc_text, orientation, GROUP_TAXFREE)
time.sleep(3)
print(f" ingested to {GROUP_TAXFREE}")
except Exception as e:
print(f" ingest failed: {e}")
continue
e14 = e14_by_name.get(name, {})
results["subsample_a"].append({
"name": name,
"bucket": item["bucket"],
"taxfree_metadata": tf_meta,
"taxfree_orientation": orientation,
"e14_prod_preds": e14.get("prod_preds"),
"e14_cascade_preds": e14.get("cascade_preds"),
"e14_delta_preds": e14.get("delta_preds"),
"e14_prod_edges": e14.get("prod_edges"),
"e14_cascade_edges": e14.get("cascade_edges"),
"e14_delta_edges": e14.get("delta_edges"),
})
save(results)
# Sub-sample B — all three conditions
print("\nSub-sample B — all three conditions")
for item in SUBSAMPLE_B:
name = item["name"]
if name in done_b:
print(f" SKIP (done): {name}")
continue
print(f"\n {name} ({item['bucket']})")
doc_text = get_document_text(name)
if not doc_text:
print(f" SKIP — no text")
continue
entry = {"name": name, "bucket": item["bucket"],
"taxfree_metadata": None, "standard_metadata": None}
# Baseline
try:
ingest(name, doc_text, "", GROUP_BASELINE)
time.sleep(3)
print(f" baseline ingested")
except Exception as e:
print(f" baseline failed: {e}")
# Standard
std_meta = run_mistral(STANDARD_PROMPT, doc_text, "standard")
entry["standard_metadata"] = std_meta
try:
ingest(name, doc_text, build_standard_orientation(std_meta), GROUP_STANDARD)
time.sleep(3)
print(f" standard ingested, domain_class={std_meta.get('domain_class','?')}")
except Exception as e:
print(f" standard failed: {e}")
# Taxonomy-free
tf_meta = run_mistral(TAXFREE_PROMPT, doc_text, "taxfree")
entry["taxfree_metadata"] = tf_meta
print(f" frames: {tf_meta.get('active_frames', 'ERROR')}")
try:
ingest(name, doc_text, build_taxfree_orientation(tf_meta), GROUP_TAXFREE)
time.sleep(3)
print(f" taxfree ingested")
except Exception as e:
print(f" taxfree failed: {e}")
results["subsample_b"].append(entry)
save(results)
print("\n" + "=" * 60)
print(f"Ingest complete. Results at {RESULTS_PATH}")
print("Now run: python3 ~/aaronai/scripts/experiments/e1_8_eval.py")
if __name__ == "__main__":
run()
+204
View File
@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
E1.9 Phase 1 — Retroactive validation
For each E1.8 source, query the production graph with frame_relationships
to get a coverage score, then check whether the routing tier prediction
matches the actual best-performing condition from E1.8.
No API spend required — uses existing E1.8 data and Graphiti search only.
"""
import json, requests
from pathlib import Path
GRAPHITI_URL = "http://localhost:8001"
E18_PATH = Path.home() / "aaronai" / "experiments" / "e1_8_eval.json"
E18_INGEST_PATH = Path.home() / "aaronai" / "experiments" / "e1_8_results.json"
RESULTS_PATH = Path.home() / "aaronai" / "experiments" / "e1_9_retroactive.json"
# Routing thresholds
HIGH_THRESHOLD = 0.70 # baseline
LOW_THRESHOLD = 0.40 # taxonomy-free
def get_coverage_score(query, group_id="aaron"):
"""Query production graph and return coverage score based on result count.
Score: 0 = no results, 0.33 = 1 result, 0.66 = 2 results, 1.0 = 3+ results.
Uses result count because Graphiti fulltext search returns score=0 for all hits.
"""
if not query or not query.strip():
return 0.0
try:
resp = requests.get(
f"{GRAPHITI_URL}/search",
params={"query": query, "limit": 3, "group_id": group_id},
timeout=30
)
resp.raise_for_status()
results = resp.json().get("results", [])
n = len(results)
return min(n / 3.0, 1.0)
except Exception as e:
print(f" Search error: {e}")
return 0.0
def assign_tier(coverage_score):
if coverage_score >= HIGH_THRESHOLD:
return "baseline"
elif coverage_score >= LOW_THRESHOLD:
return "standard"
else:
return "taxfree"
def best_condition_from_e18(record, subsample):
"""
Determine which condition actually performed best for this source in E1.8.
Sub-sample A: compare prod (baseline), e14 (standard cascade), tf (taxfree)
Sub-sample B: compare base, std, tf
"""
if subsample == "a":
prod = record.get("prod_preds") or 0
e14 = record.get("e14_preds") or 0
tf = record.get("tf_preds") or 0
best_score = max(prod, e14, tf)
if best_score == 0:
return "unknown"
if tf == best_score:
return "taxfree"
elif e14 == best_score:
return "standard"
else:
return "baseline"
else:
base = record.get("base_preds") or 0
std = record.get("std_preds") or 0
tf = record.get("tf_preds") or 0
best_score = max(base, std, tf)
if best_score == 0:
return "unknown"
if tf == best_score:
return "taxfree"
elif std == best_score:
return "standard"
else:
return "baseline"
def run():
print("E1.9 Phase 1 — Retroactive validation")
print("=" * 60)
e18_eval = json.loads(E18_PATH.read_text())
e18_ingest = json.loads(E18_INGEST_PATH.read_text())
# Build frame_relationships lookup from ingest results
fr_lookup = {}
for item in e18_ingest.get("subsample_a", []):
meta = item.get("taxfree_metadata", {})
if meta:
fr_lookup[item["name"]] = meta.get("frame_relationships", "")
for item in e18_ingest.get("subsample_b", []):
meta = item.get("taxfree_metadata", {})
if meta:
fr_lookup[item["name"]] = meta.get("frame_relationships", "")
results = []
correct = 0
total = 0
# Sub-sample A
print("\nSub-sample A")
print(f"{'Source':<50} {'cov':>5} {'tier':<10} {'predicted':<10} {'actual':<10} {'match'}")
print("-" * 95)
for record in e18_eval["subsample_a"]:
name = record["name"]
fr = fr_lookup.get(name, "")
coverage = get_coverage_score(fr)
tier = assign_tier(coverage)
actual_best = best_condition_from_e18(record, "a")
match = "" if tier == actual_best else ""
if actual_best != "unknown":
total += 1
if tier == actual_best:
correct += 1
display = name[:48] + ".." if len(name) > 50 else name
print(f"{display:<50} {coverage:>5.2f} {tier:<10} {tier:<10} {actual_best:<10} {match}")
results.append({
"name": name, "subsample": "a", "bucket": record.get("bucket"),
"frame_relationships": fr, "coverage_score": coverage,
"predicted_tier": tier, "actual_best": actual_best, "match": tier == actual_best,
})
# Sub-sample B
print("\nSub-sample B")
print(f"{'Source':<50} {'cov':>5} {'tier':<10} {'predicted':<10} {'actual':<10} {'match'}")
print("-" * 95)
for record in e18_eval["subsample_b"]:
name = record["name"]
fr = fr_lookup.get(name, "")
coverage = get_coverage_score(fr)
tier = assign_tier(coverage)
actual_best = best_condition_from_e18(record, "b")
match = "" if tier == actual_best else ""
if actual_best != "unknown":
total += 1
if tier == actual_best:
correct += 1
display = name[:48] + ".." if len(name) > 50 else name
print(f"{display:<50} {coverage:>5.2f} {tier:<10} {tier:<10} {actual_best:<10} {match}")
results.append({
"name": name, "subsample": "b", "bucket": record.get("bucket"),
"frame_relationships": fr, "coverage_score": coverage,
"predicted_tier": tier, "actual_best": actual_best, "match": tier == actual_best,
})
# Summary
rate = correct / total * 100 if total > 0 else 0
print(f"\n{'=' * 60}")
print(f"Validation rate: {correct}/{total} ({rate:.1f}%)")
print()
if rate >= 70:
print("✓ SIGNAL VALIDATED — coverage score predicts best condition")
print(" Proceed to Phase 2 (new ingestion with routing)")
elif rate >= 50:
print("~ MARGINAL — adjust thresholds before Phase 2")
print(" Review mismatch patterns below")
else:
print("✗ SIGNAL NOT PREDICTIVE — frame_relationships coverage")
print(" may not be the right signal. Consider active_frames fallback.")
# Mismatch analysis
mismatches = [r for r in results if not r["match"] and r["actual_best"] != "unknown"]
if mismatches:
print(f"\nMismatches ({len(mismatches)}):")
for r in mismatches:
print(f" [{r['bucket']:<8}] cov={r['coverage_score']:.2f} predicted={r['predicted_tier']} actual={r['actual_best']} | {r['name'][:50]}")
# Coverage score distribution
scores = [r["coverage_score"] for r in results]
print(f"\nCoverage score distribution:")
print(f" Mean: {sum(scores)/len(scores):.2f}")
print(f" Min: {min(scores):.2f}")
print(f" Max: {max(scores):.2f}")
high = sum(1 for s in scores if s >= HIGH_THRESHOLD)
mid = sum(1 for s in scores if LOW_THRESHOLD <= s < HIGH_THRESHOLD)
low = sum(1 for s in scores if s < LOW_THRESHOLD)
print(f" Tier distribution: baseline={high} standard={mid} taxfree={low}")
# Save
output = {
"validation_rate": rate,
"correct": correct,
"total": total,
"thresholds": {"high": HIGH_THRESHOLD, "low": LOW_THRESHOLD},
"results": results,
}
RESULTS_PATH.write_text(json.dumps(output, indent=2))
print(f"\nSaved to {RESULTS_PATH}")
if __name__ == "__main__":
run()