#!/usr/bin/env python3 """ corpus_integrity.py — BirdAI Corpus Integrity Check Compares three sources of truth: 1. Filesystem (Nextcloud) — what files exist 2. pgvector (embeddings table) — what's been through Stage 1 3. Graphiti (migration state + stage_3_queue) — what's been through Stage 3 Usage: python3 corpus_integrity.py # report only python3 corpus_integrity.py --fix # report + auto-queue gaps for retry python3 corpus_integrity.py --json # output JSON to stdout """ import os import sys import json import argparse from pathlib import Path from datetime import datetime import psycopg2 from dotenv import load_dotenv sys.path.insert(0, str(Path(__file__).parent)) from encoding import extract_text load_dotenv(Path.home() / "aaronai" / ".env", override=True) NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" MIGRATION_STATE = str(Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json") REPORT_PATH = str(Path.home() / "aaronai" / "corpus_integrity_report.json") SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"} PG_DSN = os.getenv("PG_DSN") def get_pg(): return psycopg2.connect(PG_DSN) def get_filesystem_files(): files = [] root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): if path.is_dir(): continue if path.suffix.lower() not in SUPPORTED: continue if path.name.startswith((".", "~$")): continue if "Admin/Backups" in str(path) or "Backups" in path.parts: continue if "Journal/Media" in str(path): continue files.append({"source": path.name, "filepath": str(path), "size": path.stat().st_size, "mtime": path.stat().st_mtime}) return files def get_pgvector_sources(): try: pg = get_pg() cur = pg.cursor() cur.execute("SELECT DISTINCT source FROM embeddings WHERE source IS NOT NULL") sources = {row[0] for row in cur.fetchall()} pg.close() return sources except Exception as e: print(f"ERROR: pgvector: {e}", file=sys.stderr) return set() def get_graphiti_sources(): sources = set() try: state_path = Path(MIGRATION_STATE) if state_path.exists(): state = json.loads(state_path.read_text()) for filepath in state.get("ingested", []): sources.add(Path(filepath).name) except Exception as e: print(f"WARNING: migration state: {e}", file=sys.stderr) try: pg = get_pg() cur = pg.cursor() cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL") for row in cur.fetchall(): sources.add(row[0]) pg.close() except Exception as e: print(f"WARNING: stage_3_queue: {e}", file=sys.stderr) return sources def get_ingest_failures(): failures = {} try: pg = get_pg() cur = pg.cursor() cur.execute(""" SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at FROM ingest_failures WHERE resolved = FALSE ORDER BY last_failed_at DESC """) for row in cur.fetchall(): failures[row[0]] = {"source": row[0], "filepath": row[1], "error": row[2], "retry_count": row[3], "first_failed_at": str(row[4]), "last_failed_at": str(row[5])} pg.close() except Exception as e: print(f"WARNING: ingest_failures: {e}", file=sys.stderr) return failures def queue_for_retry(source, full_text, filepath): try: pg = get_pg() cur = pg.cursor() cur.execute(""" INSERT INTO stage_2_queue (source, full_text, char_length) VALUES (%s, %s, %s) ON CONFLICT (source) DO UPDATE SET full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length, enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0 """, (source, full_text, len(full_text))) pg.commit() pg.close() return True except Exception as e: print(f"WARNING: queue failed {source}: {e}", file=sys.stderr) return False def run_reconciliation(fix=False): print(f"BirdAI Corpus Integrity Check — {datetime.now().isoformat()}") print() print("Scanning filesystem...") fs_files = get_filesystem_files() fs_sources = {f["source"]: f for f in fs_files} print(f" Filesystem: {len(fs_files)} files") print("Querying pgvector...") pv_sources = get_pgvector_sources() print(f" pgvector: {len(pv_sources)} distinct sources") print("Querying Graphiti...") gr_sources = get_graphiti_sources() print(f" Graphiti: {len(gr_sources)} sources") print("Querying ingest failures...") failures = get_ingest_failures() print(f" Failures: {len(failures)} unresolved") print() both, pv_only, neither, gr_only = [], [], [], [] for source, finfo in fs_sources.items(): in_pv = source in pv_sources in_gr = source in gr_sources if in_pv and in_gr: both.append(finfo) elif in_pv: pv_only.append(finfo) elif in_gr: gr_only.append(finfo) else: neither.append(finfo) orphans_pv = pv_sources - set(fs_sources.keys()) orphans_gr = gr_sources - set(fs_sources.keys()) print(f"Results:") print(f" Both (pgvector + Graphiti): {len(both)}") print(f" pgvector only: {len(pv_only)}") print(f" Neither (corpus gap): {len(neither)}") print(f" Graphiti only: {len(gr_only)}") print(f" Ingest failures: {len(failures)}") print(f" pgvector orphans: {len(orphans_pv)}") print(f" Graphiti orphans: {len(orphans_gr)}") print() auto_queued = [] if fix and neither: print(f"Auto-queuing {len(neither)} gap files...") for finfo in neither: text = extract_text(Path(finfo["filepath"])) if text.strip(): if queue_for_retry(finfo["source"], text, finfo["filepath"]): auto_queued.append(finfo["source"]) print(f" Queued: {finfo['source']}") else: print(f" Skipped (unreadable): {finfo['source']}") try: pg = get_pg() cur = pg.cursor() cur.execute(""" INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) VALUES (%s, %s, %s, 0, NOW(), NOW()) ON CONFLICT (source) DO UPDATE SET error = EXCLUDED.error, last_failed_at = NOW() """, (finfo["source"], finfo["filepath"], "Empty text — likely scanned, encrypted, or corrupt. Requires manual review or OCR.")) pg.commit() pg.close() except Exception as e: print(f" WARNING: could not record failure: {e}") print() report = { "timestamp": datetime.now().isoformat(), "summary": { "filesystem_total": len(fs_files), "pgvector_total": len(pv_sources), "graphiti_total": len(gr_sources), "both": len(both), "pgvector_only": len(pv_only), "neither": len(neither), "graphiti_only": len(gr_only), "failures": len(failures), "orphans_pgvector": len(orphans_pv), "orphans_graphiti": len(orphans_gr), }, "gaps": [f["source"] for f in neither], "failures": list(failures.values()), "auto_queued": auto_queued, "pgvector_only_sample": [f["source"] for f in pv_only[:20]], "graphiti_only": list(gr_only), } Path(REPORT_PATH).write_text(json.dumps(report, indent=2)) print(f"Report written to: {REPORT_PATH}") return report def main(): parser = argparse.ArgumentParser() parser.add_argument("--fix", action="store_true") parser.add_argument("--json", action="store_true") args = parser.parse_args() report = run_reconciliation(fix=args.fix) if args.json: print(json.dumps(report, indent=2)) if __name__ == "__main__": main()