#!/usr/bin/env python3 """ Audit Expansion Pack Generator — type-aware stratified draw of 12 documents from base_class_validation_results.json for n=20 audit expansion. Per audit-expansion-protocol.md amendment 2026-04-28: The seed=43 length-only random draw concentrated on course modules in the small and medium buckets, missing voice captures, syllabi, and conversational documents present in the candidate distribution. This script implements type-aware stratification within each length bucket to produce a sample representative of BirdAI's document-type mix. Targets (12 total): small (4): 2 course_module + 2 voice_capture medium (4): 2 course_module + 1 syllabus + 1 other large (4): 1 course_ppt + 1 syllabus + 1 faculty_report + 1 conversational Output: ~/aaronai/experiments/audit_expansion_pack.json Usage: python3 ~/aaronai/scripts/audit_expansion_draw.py python3 ~/aaronai/scripts/audit_expansion_draw.py --dry-run """ import argparse import json import random import re import sys import time from pathlib import Path EXPERIMENTS = Path.home() / "aaronai" / "experiments" VALIDATION_RESULTS = EXPERIMENTS / "base_class_validation_results.json" EXISTING_AUDIT_PACK = EXPERIMENTS / "base_class_audit_pack.json" OUTPUT_FILE = EXPERIMENTS / "audit_expansion_pack.json" SEED = 43 # Type-aware targets per bucket TYPE_TARGETS = { "small": {"course_module": 2, "voice_capture": 2}, "medium": {"course_module": 2, "syllabus": 1, "other": 1}, "large": {"course_ppt": 1, "syllabus": 1, "faculty_report": 1, "conversational": 1}, } def classify(source, bucket): """Map a source filename to a document type, scoped to bucket where type categories overlap (e.g., 'course_module' vs 'course_ppt').""" s = source.lower() # Voice captures — pattern: YYYY-MM-DD-HH-MM-voice.md if re.match(r"\d{4}-\d{2}-\d{2}-\d{2}-\d{2}-voice\.md$", source): return "voice_capture" # Conversational exports — pattern: "Claude: ..." or "ChatGPT: ..." if source.startswith("Claude:") or source.startswith("ChatGPT:"): return "conversational" # Syllabus — must contain "syllabus" in the name if "syllabus" in s: return "syllabus" # Faculty / annual reports if "faculty report" in s or "annual report" in s: return "faculty_report" # Course PPTs (large bucket) — pattern: "_PPT_" or "_v3.pptx" or "Mod0N_" if bucket == "large" and (".pptx" in s or "_ppt_" in s or re.match(r"mod\d+_", s)): return "course_ppt" # Course modules (small/medium bucket) — pattern: "0N_*.docx" or numeric prefix if re.match(r"^\d{2}_", source): return "course_module" # Everything else falls into 'other' for medium; not used in small/large targets return "other" def main(): parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() if not VALIDATION_RESULTS.exists(): print(f"ERROR: {VALIDATION_RESULTS} not found", file=sys.stderr) sys.exit(1) with open(VALIDATION_RESULTS) as f: validation = json.load(f) all_docs = validation["results"] print(f"Loaded {len(all_docs)} documents from validation results") print(f"Experiment: {validation.get('title', 'unknown')}") # Load existing audit pack to exclude its sources (audit pack uses 'pairs') excluded_sources = set() if EXISTING_AUDIT_PACK.exists(): with open(EXISTING_AUDIT_PACK) as f: existing = json.load(f) existing_pairs = existing.get("pairs", existing.get("results", existing)) for doc in existing_pairs: src = doc.get("source") if src: excluded_sources.add(src) print(f"Excluding {len(excluded_sources)} sources already in audit pack") # Filter to valid candidates valid_docs = [] for doc in all_docs: src = doc.get("source") if src in excluded_sources: continue if not doc.get("condition_a") or not doc.get("condition_b"): continue bucket = doc.get("size_bucket") if bucket not in TYPE_TARGETS: continue doc["_type"] = classify(src, bucket) valid_docs.append(doc) print(f"Valid candidate documents: {len(valid_docs)}") # Print what's available per (bucket, type) before drawing print(f"\nCandidates by (bucket, type):") for bucket in TYPE_TARGETS: bucket_docs = [d for d in valid_docs if d["size_bucket"] == bucket] types_in_bucket = {} for d in bucket_docs: types_in_bucket.setdefault(d["_type"], []).append(d) print(f" {bucket}:") for t in sorted(types_in_bucket.keys()): target = TYPE_TARGETS[bucket].get(t, "—") print(f" {t:>16}: {len(types_in_bucket[t])} avail, target {target}") # Stratified type-aware draw random.seed(SEED) drawn = [] warnings = [] for bucket, type_targets in TYPE_TARGETS.items(): bucket_docs = [d for d in valid_docs if d["size_bucket"] == bucket] for doc_type, target in type_targets.items(): type_docs = [d for d in bucket_docs if d["_type"] == doc_type] if len(type_docs) < target: msg = (f"WARNING: bucket={bucket} type={doc_type} " f"available={len(type_docs)} target={target}") warnings.append(msg) print(msg, file=sys.stderr) n_to_draw = min(target, len(type_docs)) sample = random.sample(type_docs, n_to_draw) drawn.extend(sample) # Report draw print(f"\nDrew {len(drawn)} documents:") for d in drawn: src = d.get("source", "") chars = d.get("doc_chars_original", 0) bucket = d.get("size_bucket", "?") doc_type = d.get("_type", "?") truncated = " (TRUNCATED)" if d.get("truncated") else "" print(f" [{bucket:>6}/{doc_type:>16}] {chars:>6}c {src}{truncated}") # Bucket-level summary bucket_counts = {"small": 0, "medium": 0, "large": 0} for d in drawn: bucket_counts[d["size_bucket"]] += 1 print(f"\nBucket totals: {bucket_counts}") if args.dry_run: print(f"\n--dry-run set, not writing output file") return output = { "metadata": { "generated_at": time.strftime("%Y-%m-%dT%H:%M:%S"), "source_validation_file": str(VALIDATION_RESULTS), "seed": SEED, "stratification": "type-aware within length bucket", "type_targets": TYPE_TARGETS, "bucket_counts": bucket_counts, "excluded_count": len(excluded_sources), "warnings": warnings, "purpose": "n=20 audit expansion per audit-expansion-protocol.md (type-aware amendment)", }, "results": drawn, } with open(OUTPUT_FILE, "w") as f: json.dump(output, f, indent=2, default=str) print(f"\nWrote {OUTPUT_FILE}") print(f" {len(drawn)} documents ready for rating") if __name__ == "__main__": main()