ed2d090afc
Read-only inspection of the frame data Mistral produces in Stage 2, in service of Track 2 substrate design (Step 2.4 operation set spec). Artifacts: - New SQL view `stage2_frames_v` over `stage_3_queue.stage2_metadata` (CREATE OR REPLACE; idempotent; raw JSONB exposed alongside structured fields so worker-version drift is inspectable). - Analysis script: frequency, label-hygiene collisions, per-doc count, co-occurrence (top-K), file-type \u00d7 frame cross-tab, worker-version split, data-gap accounting, corpus-wide coverage. - JSON sidecar for diff-across-runs reproducibility. - Markdown report with explicit Track 2 viability section. Headline findings: - Frames cluster meaningfully on the framed-doc subset (subject to validation on larger samples for the file-type cross-tab). - Only 56% of corpus has frame coverage. 198 conversation sources bypass Stage 2 by design (`ingest_conversations.py` writes directly to embeddings); 339 short docs (<2000 chars) skip Mistral by char-gate; 12 Stage 2 failures. - All 14 voice notes and all 39 dream outputs are in the data gap. Primary capture and self-reflection channels are silent to the frame system. Dreamer cannot frame-condition on its own output. - 54 normalized label collisions (`Professional Experience` vs `Professional_Experience`, etc.) — any router must normalize first. - "Education" is a near-universal frame (36% of frame-extracted docs); cheap 20-doc hand-inspection diagnostic in report \u00a78 to distinguish prompt artifact from corpus shape. - File-type \u00d7 frame stratification is concrete signal that ties to Improvement #2 (`embeddings.type` backfill); currently NULL for 71% of rows. No production code touched. View is droppable; script is read-only.
297 lines
10 KiB
Python
297 lines
10 KiB
Python
"""Read-only analysis of Stage 2 frame data via stage2_frames_v.
|
||
|
||
Produces seven sections (frequency, hygiene, per-doc count, co-occurrence,
|
||
folder cross-tab, worker-version split, data-gap accounting) and writes a JSON
|
||
sidecar for diffing across runs.
|
||
|
||
Usage: venv/bin/python3 scripts/experiments/frame_distribution_report.py
|
||
"""
|
||
import os
|
||
import json
|
||
import re
|
||
import sys
|
||
from collections import Counter, defaultdict
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import psycopg2
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
|
||
OUT_PATH = Path.home() / "aaronai" / "experiments" / f"frame_distribution_{datetime.now().strftime('%Y-%m-%d')}.json"
|
||
TOP_K = 20 # for co-occurrence; revisit after seeing the long tail
|
||
|
||
|
||
def normalize(label):
|
||
return re.sub(r"\s+", " ", label.strip().lower().replace("_", " "))
|
||
|
||
|
||
def folder_bin(source):
|
||
"""Classify source by type. stage_3_queue stores bare filenames, so we
|
||
bin by what kind of file it is, not where it lives in the tree."""
|
||
if not source:
|
||
return "unknown"
|
||
if re.match(r"^(Claude|ChatGPT|Aaron AI):", source):
|
||
return "conversation" # bypasses Stage 2/3, will not appear here
|
||
s = source.lower()
|
||
if re.search(r"\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-voice\.md$", s):
|
||
return "voice_note"
|
||
if re.search(r"\d{4}-\d{2}-\d{2}-(nrem|early-rem|late-rem|synthesis|lucid)", s):
|
||
return "dream_output"
|
||
if s.endswith(".md"):
|
||
return "markdown"
|
||
if s.endswith(".pdf"):
|
||
return "pdf"
|
||
if s.endswith(".docx") or s.endswith(".doc"):
|
||
return "docx"
|
||
if s.endswith(".pptx") or s.endswith(".ppt"):
|
||
return "pptx"
|
||
if s.endswith(".txt"):
|
||
return "txt"
|
||
return "other"
|
||
|
||
|
||
def fetch_rows(cur):
|
||
cur.execute("""
|
||
SELECT source, char_length, active_frames, worker_version, raw_metadata
|
||
FROM stage2_frames_v
|
||
""")
|
||
rows = []
|
||
for source, char_length, frames, worker_version, raw in cur.fetchall():
|
||
if not isinstance(frames, list):
|
||
continue
|
||
rows.append({
|
||
"source": source,
|
||
"char_length": char_length,
|
||
"frames": [str(f) for f in frames if f],
|
||
"worker_version": worker_version,
|
||
"raw_keys": sorted(raw.keys()) if isinstance(raw, dict) else [],
|
||
})
|
||
return rows
|
||
|
||
|
||
def section_frequency(rows):
|
||
counter = Counter()
|
||
for r in rows:
|
||
for f in r["frames"]:
|
||
counter[f] += 1
|
||
return counter
|
||
|
||
|
||
def section_hygiene(frequency):
|
||
"""Group raw labels by normalized form; flag collisions."""
|
||
groups = defaultdict(list)
|
||
for raw, count in frequency.items():
|
||
groups[normalize(raw)].append((raw, count))
|
||
collisions = {k: v for k, v in groups.items() if len(v) > 1}
|
||
return collisions
|
||
|
||
|
||
def section_per_doc_count(rows):
|
||
counts = Counter(len(r["frames"]) for r in rows)
|
||
return counts
|
||
|
||
|
||
def section_cooccurrence(rows, top_frames):
|
||
top_set = set(top_frames)
|
||
pair_counts = Counter()
|
||
for r in rows:
|
||
present = [f for f in r["frames"] if f in top_set]
|
||
for i in range(len(present)):
|
||
for j in range(i + 1, len(present)):
|
||
a, b = sorted([present[i], present[j]])
|
||
pair_counts[(a, b)] += 1
|
||
return pair_counts
|
||
|
||
|
||
def section_folder_crosstab(rows, top_frames):
|
||
top_set = set(top_frames)
|
||
table = defaultdict(Counter) # frame -> bin -> count
|
||
bin_totals = Counter()
|
||
for r in rows:
|
||
b = folder_bin(r["source"])
|
||
bin_totals[b] += 1
|
||
for f in r["frames"]:
|
||
if f in top_set:
|
||
table[f][b] += 1
|
||
return table, bin_totals
|
||
|
||
|
||
def section_worker_versions(rows):
|
||
counter = Counter(r["worker_version"] or "unknown" for r in rows)
|
||
raw_keys_by_version = defaultdict(Counter)
|
||
for r in rows:
|
||
v = r["worker_version"] or "unknown"
|
||
raw_keys_by_version[v][tuple(r["raw_keys"])] += 1
|
||
return counter, raw_keys_by_version
|
||
|
||
|
||
def section_data_gap(cur):
|
||
"""Docs that completed Stage 2 but never had frames extracted (<2000 chars)."""
|
||
cur.execute("""
|
||
SELECT source, char_length
|
||
FROM stage_2_queue
|
||
WHERE completed_at IS NOT NULL AND char_length < 2000
|
||
""")
|
||
missing = cur.fetchall()
|
||
by_bin = Counter(folder_bin(s) for s, _ in missing)
|
||
char_lengths = [c for _, c in missing]
|
||
return {
|
||
"count": len(missing),
|
||
"by_type_bin": dict(by_bin),
|
||
"char_length": {
|
||
"min": min(char_lengths) if char_lengths else None,
|
||
"max": max(char_lengths) if char_lengths else None,
|
||
"median": sorted(char_lengths)[len(char_lengths) // 2] if char_lengths else None,
|
||
},
|
||
"sample_sources": [s for s, _ in missing[:10]],
|
||
}
|
||
|
||
|
||
def section_corpus_coverage(cur):
|
||
"""How much of the embeddings corpus has frame coverage?"""
|
||
cur.execute("SELECT count(DISTINCT source) FROM embeddings")
|
||
total = cur.fetchone()[0]
|
||
cur.execute("""
|
||
SELECT count(DISTINCT source) FROM embeddings
|
||
WHERE source LIKE 'Claude:%' OR source LIKE 'ChatGPT:%'
|
||
OR source LIKE 'Aaron AI:%' OR type='aaronai_conversation'
|
||
""")
|
||
conversations = cur.fetchone()[0]
|
||
cur.execute("SELECT count(DISTINCT source) FROM stage_3_queue WHERE stage2_metadata IS NOT NULL")
|
||
with_frames = cur.fetchone()[0]
|
||
cur.execute("""
|
||
SELECT count(DISTINCT source) FROM stage_2_queue
|
||
WHERE completed_at IS NOT NULL AND char_length < 2000
|
||
""")
|
||
short_no_frames = cur.fetchone()[0]
|
||
cur.execute("""
|
||
SELECT count(DISTINCT source) FROM stage_2_queue
|
||
WHERE failed_at IS NOT NULL
|
||
""")
|
||
failed = cur.fetchone()[0]
|
||
return {
|
||
"total_distinct_sources_in_embeddings": total,
|
||
"conversations_no_frames_by_design": conversations,
|
||
"files_with_frames": with_frames,
|
||
"files_short_no_frames": short_no_frames,
|
||
"files_stage2_failed": failed,
|
||
"frame_coverage_pct": round(100.0 * with_frames / max(total, 1), 1),
|
||
}
|
||
|
||
|
||
def main():
|
||
conn = psycopg2.connect(os.environ["PG_DSN"])
|
||
cur = conn.cursor()
|
||
|
||
rows = fetch_rows(cur)
|
||
n_docs = len(rows)
|
||
print(f"=== Stage 2 frame distribution report ({n_docs} docs) ===\n")
|
||
|
||
# 1. Frequency
|
||
freq = section_frequency(rows)
|
||
print(f"--- 1. Frame frequency ({len(freq)} distinct labels) ---")
|
||
for label, count in freq.most_common(30):
|
||
print(f" {count:5d} {label}")
|
||
print()
|
||
|
||
# 2. Hygiene
|
||
collisions = section_hygiene(freq)
|
||
print(f"--- 2. Label hygiene (normalized collisions: {len(collisions)}) ---")
|
||
for norm, variants in sorted(collisions.items(), key=lambda kv: -sum(c for _, c in kv[1])):
|
||
variant_str = ", ".join(f"{r!r}:{c}" for r, c in sorted(variants, key=lambda x: -x[1]))
|
||
print(f" '{norm}': {variant_str}")
|
||
print()
|
||
|
||
# 3. Per-doc frame count
|
||
per_doc = section_per_doc_count(rows)
|
||
print("--- 3. Per-doc frame count ---")
|
||
for n in sorted(per_doc):
|
||
print(f" {n} frames: {per_doc[n]} docs")
|
||
print()
|
||
|
||
# 4. Co-occurrence (top-K)
|
||
top_frames = [f for f, _ in freq.most_common(TOP_K)]
|
||
pairs = section_cooccurrence(rows, top_frames)
|
||
print(f"--- 4. Co-occurrence (top-{TOP_K} frames, top-30 pairs) ---")
|
||
for (a, b), count in pairs.most_common(30):
|
||
print(f" {count:4d} {a} × {b}")
|
||
print()
|
||
|
||
# 5. Folder cross-tab
|
||
crosstab, bin_totals = section_folder_crosstab(rows, top_frames)
|
||
print(f"--- 5. Frame × folder cross-tab (top-{TOP_K} frames) ---")
|
||
bins_sorted = [b for b, _ in bin_totals.most_common()]
|
||
print(f" bins (with totals): " + ", ".join(f"{b}({n})" for b, n in bin_totals.most_common(10)))
|
||
for f in top_frames:
|
||
row_data = crosstab[f]
|
||
if not row_data:
|
||
continue
|
||
cells = ", ".join(f"{b}={c}" for b, c in row_data.most_common(5))
|
||
print(f" {f}: {cells}")
|
||
print()
|
||
|
||
# 6. Worker versions
|
||
versions, keys_by_version = section_worker_versions(rows)
|
||
print("--- 6. Worker version split ---")
|
||
for v, count in versions.most_common():
|
||
print(f" v{v}: {count} docs")
|
||
top_shapes = keys_by_version[v].most_common(3)
|
||
for keys, kcount in top_shapes:
|
||
print(f" {kcount} docs with keys={list(keys)}")
|
||
print()
|
||
|
||
# 7. Data gap
|
||
gap = section_data_gap(cur)
|
||
print("--- 7. Data-gap accounting (Stage 2 docs <2000 chars; never frame-extracted) ---")
|
||
print(f" count: {gap['count']}")
|
||
print(f" char_length: min={gap['char_length']['min']}, median={gap['char_length']['median']}, max={gap['char_length']['max']}")
|
||
print(f" by type bin: {gap['by_type_bin']}")
|
||
print(f" sample sources: {gap['sample_sources']}")
|
||
print()
|
||
|
||
# 8. Corpus coverage
|
||
coverage = section_corpus_coverage(cur)
|
||
print("--- 8. Corpus-wide frame coverage ---")
|
||
print(f" total distinct sources in embeddings: {coverage['total_distinct_sources_in_embeddings']}")
|
||
print(f" conversations (no frames by design): {coverage['conversations_no_frames_by_design']}")
|
||
print(f" files with frames: {coverage['files_with_frames']}")
|
||
print(f" files short, no frames: {coverage['files_short_no_frames']}")
|
||
print(f" files Stage 2 failed: {coverage['files_stage2_failed']}")
|
||
print(f" frame coverage: {coverage['frame_coverage_pct']}% of corpus")
|
||
print()
|
||
|
||
# JSON sidecar
|
||
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
sidecar = {
|
||
"generated_at": datetime.now().isoformat(),
|
||
"n_docs_with_frames": n_docs,
|
||
"n_distinct_labels": len(freq),
|
||
"top_30_frames": freq.most_common(30),
|
||
"label_collisions": {
|
||
k: [(r, c) for r, c in v] for k, v in collisions.items()
|
||
},
|
||
"per_doc_frame_count": dict(per_doc),
|
||
"top_30_pairs": [
|
||
{"a": a, "b": b, "count": c}
|
||
for (a, b), c in pairs.most_common(30)
|
||
],
|
||
"folder_crosstab": {
|
||
f: dict(crosstab[f]) for f in top_frames if crosstab[f]
|
||
},
|
||
"bin_totals": dict(bin_totals),
|
||
"worker_versions": dict(versions),
|
||
"data_gap": gap,
|
||
"corpus_coverage": coverage,
|
||
}
|
||
OUT_PATH.write_text(json.dumps(sidecar, indent=2, default=str))
|
||
print(f"JSON sidecar written: {OUT_PATH}")
|
||
|
||
cur.close()
|
||
conn.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|