From 74e2c34f439b323694784fb59b883d304513b2b7 Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Thu, 30 Apr 2026 21:54:39 +0000 Subject: [PATCH] corpus integrity: ingest_failures tracking in watcher, reconciliation script, corpus status/retry/reconcile endpoints --- scripts/api.py | 173 +++++++++++++++++++++++++++ scripts/corpus_integrity.py | 230 ++++++++++++++++++++++++++++++++++++ scripts/watcher.py | 36 ++++++ 3 files changed, 439 insertions(+) create mode 100644 scripts/corpus_integrity.py diff --git a/scripts/api.py b/scripts/api.py index bcfd5b2..85b5336 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -957,6 +957,179 @@ async def clear_all_conversations(auth: str = Depends(require_auth)): return JSONResponse({"cleared": True}) + +# ─── Corpus Integrity Endpoints ───────────────────────────────────────────── + +CORPUS_INTEGRITY_SCRIPT = str(Path.home() / "aaronai" / "scripts" / "corpus_integrity.py") +CORPUS_REPORT_PATH = Path.home() / "aaronai" / "corpus_integrity_report.json" +SUPPORTED_EXTS = {".pdf", ".docx", ".pptx", ".txt", ".md"} +MIGRATION_STATE_PATH = Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json" + + +def get_corpus_status_data(): + fs_count = 0 + try: + root = Path(NEXTCLOUD_PATH) + for path in root.rglob("*"): + if path.is_file() and path.suffix.lower() in SUPPORTED_EXTS: + if path.name.startswith((".", "~$")): continue + if "Admin/Backups" in str(path) or "Backups" in path.parts: continue + if "Journal/Media" in str(path): continue + fs_count += 1 + except Exception: + pass + + pv_count = 0 + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT COUNT(DISTINCT source) FROM embeddings WHERE source IS NOT NULL") + pv_count = cur.fetchone()[0] + pg.close() + except Exception: + pass + + gr_sources = set() + try: + if MIGRATION_STATE_PATH.exists(): + state = json.loads(MIGRATION_STATE_PATH.read_text()) + for fp in state.get("ingested", []): + gr_sources.add(Path(fp).name) + except Exception: + pass + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL") + for row in cur.fetchall(): gr_sources.add(row[0]) + pg.close() + except Exception: + pass + + failures = [] + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at + FROM ingest_failures WHERE resolved = FALSE + ORDER BY last_failed_at DESC LIMIT 50 + """) + for row in cur.fetchall(): + failures.append({ + "source": row[0], "filepath": row[1], "error": row[2], + "retry_count": row[3], "first_failed_at": str(row[4]), + "last_failed_at": str(row[5]), + }) + pg.close() + except Exception: + pass + + last_report = None + try: + if CORPUS_REPORT_PATH.exists(): + report = json.loads(CORPUS_REPORT_PATH.read_text()) + last_report = { + "timestamp": report.get("timestamp"), + "gaps": report.get("summary", {}).get("neither", 0), + "auto_queued": len(report.get("auto_queued", [])), + } + except Exception: + pass + + return { + "filesystem": fs_count, + "pgvector": pv_count, + "graphiti": len(gr_sources), + "failures": failures, + "failure_count": len(failures), + "last_reconciliation": last_report, + } + + +@app.get("/api/corpus/status") +async def corpus_status(auth: str = Depends(require_auth)): + try: + return JSONResponse(get_corpus_status_data()) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) + + +@app.post("/api/corpus/retry") +async def corpus_retry(request: Request, auth: str = Depends(require_auth)): + try: + body = await request.json() + source = body.get("source", "") + if not source: + return JSONResponse({"error": "source required"}, status_code=400) + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT filepath FROM ingest_failures WHERE source = %s", (source,)) + row = cur.fetchone() + pg.close() + if not row: + return JSONResponse({"error": "source not found in failures"}, status_code=404) + filepath = Path(row[0]) + if not filepath.exists(): + return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404) + suffix = filepath.suffix.lower() + text = "" + try: + if suffix in {".txt", ".md"}: + text = filepath.read_text(encoding="utf-8", errors="ignore") + elif suffix == ".pdf": + from pypdf import PdfReader + text = "".join(p.extract_text() + "\n" for p in PdfReader(filepath).pages if p.extract_text()) + elif suffix == ".docx": + from docx import Document as DocxDocument + text = "\n".join(p.text for p in DocxDocument(filepath).paragraphs if p.text.strip()) + elif suffix == ".pptx": + from pptx import Presentation + prs = Presentation(filepath) + text = "\n".join(shape.text for slide in prs.slides for shape in slide.shapes + if hasattr(shape, "text") and shape.text.strip()) + except Exception as e: + return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500) + if not text.strip(): + return JSONResponse({"error": "file produces empty text — may be corrupt"}, status_code=422) + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + INSERT INTO stage_2_queue (source, full_text, char_length) + VALUES (%s, %s, %s) + ON CONFLICT (source) DO UPDATE SET + full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length, + enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0 + """, (source, text[:50000], len(text))) + cur.execute(""" + UPDATE ingest_failures SET retry_count = retry_count + 1, last_failed_at = NOW() + WHERE source = %s + """, (source,)) + pg.commit() + pg.close() + return JSONResponse({"queued": True, "source": source}) + except Exception as e: + return JSONResponse({"error": str(e)}, status_code=500) + + +@app.post("/api/corpus/reconcile") +async def corpus_reconcile(request: Request, background_tasks: BackgroundTasks, auth: str = Depends(require_auth)): + try: + body = await request.json() + fix = body.get("fix", True) + except Exception: + fix = True + def run_reconcile(): + try: + cmd = [PYTHON, CORPUS_INTEGRITY_SCRIPT] + if fix: + cmd.append("--fix") + subprocess.run(cmd, cwd=str(Path.home() / "aaronai"), timeout=300) + except Exception as e: + print(f"Reconciliation failed: {e}") + background_tasks.add_task(run_reconcile) + return JSONResponse({"started": True, "fix": fix}) + # ─── Scheduler ────────────────────────────────────────────────────────────── scheduler = BackgroundScheduler() diff --git a/scripts/corpus_integrity.py b/scripts/corpus_integrity.py new file mode 100644 index 0000000..6130a95 --- /dev/null +++ b/scripts/corpus_integrity.py @@ -0,0 +1,230 @@ +#!/usr/bin/env python3 +""" +corpus_integrity.py — BirdAI Corpus Integrity Check + +Compares three sources of truth: + 1. Filesystem (Nextcloud) — what files exist + 2. pgvector (embeddings table) — what's been through Stage 1 + 3. Graphiti (migration state + stage_3_queue) — what's been through Stage 3 + +Usage: + python3 corpus_integrity.py # report only + python3 corpus_integrity.py --fix # report + auto-queue gaps for retry + python3 corpus_integrity.py --json # output JSON to stdout +""" + +import os +import sys +import json +import argparse +from pathlib import Path +from datetime import datetime + +import psycopg2 +from dotenv import load_dotenv + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" +MIGRATION_STATE = str(Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json") +REPORT_PATH = str(Path.home() / "aaronai" / "corpus_integrity_report.json") +SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"} +PG_DSN = os.getenv("PG_DSN") + + +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def get_filesystem_files(): + files = [] + root = Path(NEXTCLOUD_PATH) + for path in root.rglob("*"): + if path.is_dir(): continue + if path.suffix.lower() not in SUPPORTED: continue + if path.name.startswith((".", "~$")): continue + if "Admin/Backups" in str(path) or "Backups" in path.parts: continue + if "Journal/Media" in str(path): continue + files.append({"source": path.name, "filepath": str(path), + "size": path.stat().st_size, "mtime": path.stat().st_mtime}) + return files + + +def get_pgvector_sources(): + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT DISTINCT source FROM embeddings WHERE source IS NOT NULL") + sources = {row[0] for row in cur.fetchall()} + pg.close() + return sources + except Exception as e: + print(f"ERROR: pgvector: {e}", file=sys.stderr) + return set() + + +def get_graphiti_sources(): + sources = set() + try: + state_path = Path(MIGRATION_STATE) + if state_path.exists(): + state = json.loads(state_path.read_text()) + for filepath in state.get("ingested", []): + sources.add(Path(filepath).name) + except Exception as e: + print(f"WARNING: migration state: {e}", file=sys.stderr) + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL") + for row in cur.fetchall(): sources.add(row[0]) + pg.close() + except Exception as e: + print(f"WARNING: stage_3_queue: {e}", file=sys.stderr) + return sources + + +def get_ingest_failures(): + failures = {} + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at + FROM ingest_failures WHERE resolved = FALSE ORDER BY last_failed_at DESC + """) + for row in cur.fetchall(): + failures[row[0]] = {"source": row[0], "filepath": row[1], "error": row[2], + "retry_count": row[3], "first_failed_at": str(row[4]), + "last_failed_at": str(row[5])} + pg.close() + except Exception as e: + print(f"WARNING: ingest_failures: {e}", file=sys.stderr) + return failures + + +def extract_text_for_retry(filepath): + path = Path(filepath) + suffix = path.suffix.lower() + try: + if suffix == ".docx": + from docx import Document as D + return "\n".join(p.text for p in D(path).paragraphs if p.text.strip()) + elif suffix == ".pdf": + from pypdf import PdfReader + return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text()) + elif suffix == ".pptx": + from pptx import Presentation + prs = Presentation(path) + return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes + if hasattr(shape, "text") and shape.text.strip()) + elif suffix in {".txt", ".md"}: + return path.read_text(encoding="utf-8", errors="ignore") + except Exception as e: + print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr) + return "" + + +def queue_for_retry(source, full_text, filepath): + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + INSERT INTO stage_2_queue (source, full_text, char_length) + VALUES (%s, %s, %s) + ON CONFLICT (source) DO UPDATE SET + full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length, + enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0 + """, (source, full_text[:50000], len(full_text))) + pg.commit() + pg.close() + return True + except Exception as e: + print(f"WARNING: queue failed {source}: {e}", file=sys.stderr) + return False + + +def run_reconciliation(fix=False): + print(f"BirdAI Corpus Integrity Check — {datetime.now().isoformat()}") + print() + print("Scanning filesystem...") + fs_files = get_filesystem_files() + fs_sources = {f["source"]: f for f in fs_files} + print(f" Filesystem: {len(fs_files)} files") + print("Querying pgvector...") + pv_sources = get_pgvector_sources() + print(f" pgvector: {len(pv_sources)} distinct sources") + print("Querying Graphiti...") + gr_sources = get_graphiti_sources() + print(f" Graphiti: {len(gr_sources)} sources") + print("Querying ingest failures...") + failures = get_ingest_failures() + print(f" Failures: {len(failures)} unresolved") + print() + + both, pv_only, neither, gr_only = [], [], [], [] + for source, finfo in fs_sources.items(): + in_pv = source in pv_sources + in_gr = source in gr_sources + if in_pv and in_gr: both.append(finfo) + elif in_pv: pv_only.append(finfo) + elif in_gr: gr_only.append(finfo) + else: neither.append(finfo) + + orphans_pv = pv_sources - set(fs_sources.keys()) + orphans_gr = gr_sources - set(fs_sources.keys()) + + print(f"Results:") + print(f" Both (pgvector + Graphiti): {len(both)}") + print(f" pgvector only: {len(pv_only)}") + print(f" Neither (corpus gap): {len(neither)}") + print(f" Graphiti only: {len(gr_only)}") + print(f" Ingest failures: {len(failures)}") + print(f" pgvector orphans: {len(orphans_pv)}") + print(f" Graphiti orphans: {len(orphans_gr)}") + print() + + auto_queued = [] + if fix and neither: + print(f"Auto-queuing {len(neither)} gap files...") + for finfo in neither: + text = extract_text_for_retry(finfo["filepath"]) + if text.strip(): + if queue_for_retry(finfo["source"], text, finfo["filepath"]): + auto_queued.append(finfo["source"]) + print(f" Queued: {finfo['source']}") + else: + print(f" Skipped (unreadable): {finfo['source']}") + print() + + report = { + "timestamp": datetime.now().isoformat(), + "summary": { + "filesystem_total": len(fs_files), "pgvector_total": len(pv_sources), + "graphiti_total": len(gr_sources), "both": len(both), + "pgvector_only": len(pv_only), "neither": len(neither), + "graphiti_only": len(gr_only), "failures": len(failures), + "orphans_pgvector": len(orphans_pv), "orphans_graphiti": len(orphans_gr), + }, + "gaps": [f["source"] for f in neither], + "failures": list(failures.values()), + "auto_queued": auto_queued, + "pgvector_only_sample": [f["source"] for f in pv_only[:20]], + "graphiti_only": list(gr_only), + } + Path(REPORT_PATH).write_text(json.dumps(report, indent=2)) + print(f"Report written to: {REPORT_PATH}") + return report + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--fix", action="store_true") + parser.add_argument("--json", action="store_true") + args = parser.parse_args() + report = run_reconciliation(fix=args.fix) + if args.json: + print(json.dumps(report, indent=2)) + +if __name__ == "__main__": + main() diff --git a/scripts/watcher.py b/scripts/watcher.py index 32de0d7..dfedf34 100644 --- a/scripts/watcher.py +++ b/scripts/watcher.py @@ -99,6 +99,7 @@ def extract_text(path: Path) -> str: return path.read_text(encoding="utf-8", errors="ignore") except Exception as e: log.warning(f"Text extraction failed for {path.name}: {e}") + record_ingest_failure(path, f"Text extraction failed: {e}") return "" @@ -141,6 +142,38 @@ def enqueue_stage2(source: str, full_text: str): log.warning(f"Stage 2 enqueue failed (non-fatal): {e}") +def record_ingest_failure(filepath: Path, error: str): + """Write extraction or ingest failure to ingest_failures table for UI visibility.""" + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) + VALUES (%s, %s, %s, 0, NOW(), NOW()) + ON CONFLICT (source) DO UPDATE SET + error = EXCLUDED.error, + retry_count = ingest_failures.retry_count + 1, + last_failed_at = NOW(), + resolved = FALSE + """, (filepath.name, str(filepath), error[:1000])) + pg.commit() + pg.close() + except Exception as e: + log.warning(f"Could not record ingest failure (non-fatal): {e}") + + +def resolve_ingest_failure(source: str): + """Mark a previously failed file as resolved after successful ingest.""" + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,)) + pg.commit() + pg.close() + except Exception as e: + log.warning(f"Could not resolve ingest failure record (non-fatal): {e}") + + def ingest_file(filepath: Path, embedder) -> int: if filepath.name.startswith(("~$", ".")): return 0 @@ -156,6 +189,7 @@ def ingest_file(filepath: Path, embedder) -> int: embeddings = embedder.encode(chunks).tolist() except Exception as e: log.error(f"Embedding failed for {filepath.name}: {e}") + record_ingest_failure(filepath, f"Embedding failed: {e}") return 0 source = filepath.name try: @@ -177,8 +211,10 @@ def ingest_file(filepath: Path, embedder) -> int: pg.close() except Exception as e: log.error(f"pgvector write failed for {filepath.name}: {e}") + record_ingest_failure(filepath, f"pgvector write failed: {e}") return 0 log.info(f"Indexed {len(chunks)} chunks: {filepath.name}") + resolve_ingest_failure(source) enqueue_stage2(source, text) return len(chunks)