experiments/frame_distribution_report: Stage 2 frame analysis (Track 1 Improvement #3)

Read-only inspection of the frame data Mistral produces in Stage 2, in
service of Track 2 substrate design (Step 2.4 operation set spec).

Artifacts:
- New SQL view `stage2_frames_v` over `stage_3_queue.stage2_metadata`
  (CREATE OR REPLACE; idempotent; raw JSONB exposed alongside structured
  fields so worker-version drift is inspectable).
- Analysis script: frequency, label-hygiene collisions, per-doc count,
  co-occurrence (top-K), file-type \u00d7 frame cross-tab, worker-version split,
  data-gap accounting, corpus-wide coverage.
- JSON sidecar for diff-across-runs reproducibility.
- Markdown report with explicit Track 2 viability section.

Headline findings:
- Frames cluster meaningfully on the framed-doc subset (subject to
  validation on larger samples for the file-type cross-tab).
- Only 56% of corpus has frame coverage. 198 conversation sources bypass
  Stage 2 by design (`ingest_conversations.py` writes directly to
  embeddings); 339 short docs (<2000 chars) skip Mistral by char-gate;
  12 Stage 2 failures.
- All 14 voice notes and all 39 dream outputs are in the data gap.
  Primary capture and self-reflection channels are silent to the frame
  system. Dreamer cannot frame-condition on its own output.
- 54 normalized label collisions (`Professional Experience` vs
  `Professional_Experience`, etc.) — any router must normalize first.
- "Education" is a near-universal frame (36% of frame-extracted docs);
  cheap 20-doc hand-inspection diagnostic in report \u00a78 to distinguish
  prompt artifact from corpus shape.
- File-type \u00d7 frame stratification is concrete signal that ties to
  Improvement #2 (`embeddings.type` backfill); currently NULL for 71% of
  rows.

No production code touched. View is droppable; script is read-only.
This commit is contained in:
2026-05-03 20:32:37 +00:00
parent e5898f3019
commit ed2d090afc
3 changed files with 1458 additions and 0 deletions
@@ -0,0 +1,296 @@
"""Read-only analysis of Stage 2 frame data via stage2_frames_v.
Produces seven sections (frequency, hygiene, per-doc count, co-occurrence,
folder cross-tab, worker-version split, data-gap accounting) and writes a JSON
sidecar for diffing across runs.
Usage: venv/bin/python3 scripts/experiments/frame_distribution_report.py
"""
import os
import json
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
load_dotenv()
OUT_PATH = Path.home() / "aaronai" / "experiments" / f"frame_distribution_{datetime.now().strftime('%Y-%m-%d')}.json"
TOP_K = 20 # for co-occurrence; revisit after seeing the long tail
def normalize(label):
return re.sub(r"\s+", " ", label.strip().lower().replace("_", " "))
def folder_bin(source):
"""Classify source by type. stage_3_queue stores bare filenames, so we
bin by what kind of file it is, not where it lives in the tree."""
if not source:
return "unknown"
if re.match(r"^(Claude|ChatGPT|Aaron AI):", source):
return "conversation" # bypasses Stage 2/3, will not appear here
s = source.lower()
if re.search(r"\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-voice\.md$", s):
return "voice_note"
if re.search(r"\d{4}-\d{2}-\d{2}-(nrem|early-rem|late-rem|synthesis|lucid)", s):
return "dream_output"
if s.endswith(".md"):
return "markdown"
if s.endswith(".pdf"):
return "pdf"
if s.endswith(".docx") or s.endswith(".doc"):
return "docx"
if s.endswith(".pptx") or s.endswith(".ppt"):
return "pptx"
if s.endswith(".txt"):
return "txt"
return "other"
def fetch_rows(cur):
cur.execute("""
SELECT source, char_length, active_frames, worker_version, raw_metadata
FROM stage2_frames_v
""")
rows = []
for source, char_length, frames, worker_version, raw in cur.fetchall():
if not isinstance(frames, list):
continue
rows.append({
"source": source,
"char_length": char_length,
"frames": [str(f) for f in frames if f],
"worker_version": worker_version,
"raw_keys": sorted(raw.keys()) if isinstance(raw, dict) else [],
})
return rows
def section_frequency(rows):
counter = Counter()
for r in rows:
for f in r["frames"]:
counter[f] += 1
return counter
def section_hygiene(frequency):
"""Group raw labels by normalized form; flag collisions."""
groups = defaultdict(list)
for raw, count in frequency.items():
groups[normalize(raw)].append((raw, count))
collisions = {k: v for k, v in groups.items() if len(v) > 1}
return collisions
def section_per_doc_count(rows):
counts = Counter(len(r["frames"]) for r in rows)
return counts
def section_cooccurrence(rows, top_frames):
top_set = set(top_frames)
pair_counts = Counter()
for r in rows:
present = [f for f in r["frames"] if f in top_set]
for i in range(len(present)):
for j in range(i + 1, len(present)):
a, b = sorted([present[i], present[j]])
pair_counts[(a, b)] += 1
return pair_counts
def section_folder_crosstab(rows, top_frames):
top_set = set(top_frames)
table = defaultdict(Counter) # frame -> bin -> count
bin_totals = Counter()
for r in rows:
b = folder_bin(r["source"])
bin_totals[b] += 1
for f in r["frames"]:
if f in top_set:
table[f][b] += 1
return table, bin_totals
def section_worker_versions(rows):
counter = Counter(r["worker_version"] or "unknown" for r in rows)
raw_keys_by_version = defaultdict(Counter)
for r in rows:
v = r["worker_version"] or "unknown"
raw_keys_by_version[v][tuple(r["raw_keys"])] += 1
return counter, raw_keys_by_version
def section_data_gap(cur):
"""Docs that completed Stage 2 but never had frames extracted (<2000 chars)."""
cur.execute("""
SELECT source, char_length
FROM stage_2_queue
WHERE completed_at IS NOT NULL AND char_length < 2000
""")
missing = cur.fetchall()
by_bin = Counter(folder_bin(s) for s, _ in missing)
char_lengths = [c for _, c in missing]
return {
"count": len(missing),
"by_type_bin": dict(by_bin),
"char_length": {
"min": min(char_lengths) if char_lengths else None,
"max": max(char_lengths) if char_lengths else None,
"median": sorted(char_lengths)[len(char_lengths) // 2] if char_lengths else None,
},
"sample_sources": [s for s, _ in missing[:10]],
}
def section_corpus_coverage(cur):
"""How much of the embeddings corpus has frame coverage?"""
cur.execute("SELECT count(DISTINCT source) FROM embeddings")
total = cur.fetchone()[0]
cur.execute("""
SELECT count(DISTINCT source) FROM embeddings
WHERE source LIKE 'Claude:%' OR source LIKE 'ChatGPT:%'
OR source LIKE 'Aaron AI:%' OR type='aaronai_conversation'
""")
conversations = cur.fetchone()[0]
cur.execute("SELECT count(DISTINCT source) FROM stage_3_queue WHERE stage2_metadata IS NOT NULL")
with_frames = cur.fetchone()[0]
cur.execute("""
SELECT count(DISTINCT source) FROM stage_2_queue
WHERE completed_at IS NOT NULL AND char_length < 2000
""")
short_no_frames = cur.fetchone()[0]
cur.execute("""
SELECT count(DISTINCT source) FROM stage_2_queue
WHERE failed_at IS NOT NULL
""")
failed = cur.fetchone()[0]
return {
"total_distinct_sources_in_embeddings": total,
"conversations_no_frames_by_design": conversations,
"files_with_frames": with_frames,
"files_short_no_frames": short_no_frames,
"files_stage2_failed": failed,
"frame_coverage_pct": round(100.0 * with_frames / max(total, 1), 1),
}
def main():
conn = psycopg2.connect(os.environ["PG_DSN"])
cur = conn.cursor()
rows = fetch_rows(cur)
n_docs = len(rows)
print(f"=== Stage 2 frame distribution report ({n_docs} docs) ===\n")
# 1. Frequency
freq = section_frequency(rows)
print(f"--- 1. Frame frequency ({len(freq)} distinct labels) ---")
for label, count in freq.most_common(30):
print(f" {count:5d} {label}")
print()
# 2. Hygiene
collisions = section_hygiene(freq)
print(f"--- 2. Label hygiene (normalized collisions: {len(collisions)}) ---")
for norm, variants in sorted(collisions.items(), key=lambda kv: -sum(c for _, c in kv[1])):
variant_str = ", ".join(f"{r!r}:{c}" for r, c in sorted(variants, key=lambda x: -x[1]))
print(f" '{norm}': {variant_str}")
print()
# 3. Per-doc frame count
per_doc = section_per_doc_count(rows)
print("--- 3. Per-doc frame count ---")
for n in sorted(per_doc):
print(f" {n} frames: {per_doc[n]} docs")
print()
# 4. Co-occurrence (top-K)
top_frames = [f for f, _ in freq.most_common(TOP_K)]
pairs = section_cooccurrence(rows, top_frames)
print(f"--- 4. Co-occurrence (top-{TOP_K} frames, top-30 pairs) ---")
for (a, b), count in pairs.most_common(30):
print(f" {count:4d} {a} × {b}")
print()
# 5. Folder cross-tab
crosstab, bin_totals = section_folder_crosstab(rows, top_frames)
print(f"--- 5. Frame × folder cross-tab (top-{TOP_K} frames) ---")
bins_sorted = [b for b, _ in bin_totals.most_common()]
print(f" bins (with totals): " + ", ".join(f"{b}({n})" for b, n in bin_totals.most_common(10)))
for f in top_frames:
row_data = crosstab[f]
if not row_data:
continue
cells = ", ".join(f"{b}={c}" for b, c in row_data.most_common(5))
print(f" {f}: {cells}")
print()
# 6. Worker versions
versions, keys_by_version = section_worker_versions(rows)
print("--- 6. Worker version split ---")
for v, count in versions.most_common():
print(f" v{v}: {count} docs")
top_shapes = keys_by_version[v].most_common(3)
for keys, kcount in top_shapes:
print(f" {kcount} docs with keys={list(keys)}")
print()
# 7. Data gap
gap = section_data_gap(cur)
print("--- 7. Data-gap accounting (Stage 2 docs <2000 chars; never frame-extracted) ---")
print(f" count: {gap['count']}")
print(f" char_length: min={gap['char_length']['min']}, median={gap['char_length']['median']}, max={gap['char_length']['max']}")
print(f" by type bin: {gap['by_type_bin']}")
print(f" sample sources: {gap['sample_sources']}")
print()
# 8. Corpus coverage
coverage = section_corpus_coverage(cur)
print("--- 8. Corpus-wide frame coverage ---")
print(f" total distinct sources in embeddings: {coverage['total_distinct_sources_in_embeddings']}")
print(f" conversations (no frames by design): {coverage['conversations_no_frames_by_design']}")
print(f" files with frames: {coverage['files_with_frames']}")
print(f" files short, no frames: {coverage['files_short_no_frames']}")
print(f" files Stage 2 failed: {coverage['files_stage2_failed']}")
print(f" frame coverage: {coverage['frame_coverage_pct']}% of corpus")
print()
# JSON sidecar
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
sidecar = {
"generated_at": datetime.now().isoformat(),
"n_docs_with_frames": n_docs,
"n_distinct_labels": len(freq),
"top_30_frames": freq.most_common(30),
"label_collisions": {
k: [(r, c) for r, c in v] for k, v in collisions.items()
},
"per_doc_frame_count": dict(per_doc),
"top_30_pairs": [
{"a": a, "b": b, "count": c}
for (a, b), c in pairs.most_common(30)
],
"folder_crosstab": {
f: dict(crosstab[f]) for f in top_frames if crosstab[f]
},
"bin_totals": dict(bin_totals),
"worker_versions": dict(versions),
"data_gap": gap,
"corpus_coverage": coverage,
}
OUT_PATH.write_text(json.dumps(sidecar, indent=2, default=str))
print(f"JSON sidecar written: {OUT_PATH}")
cur.close()
conn.close()
if __name__ == "__main__":
main()