corpus integrity: ingest_failures tracking in watcher, reconciliation script, corpus status/retry/reconcile endpoints

This commit is contained in:
2026-04-30 21:54:39 +00:00
parent 655dea6ae5
commit 74e2c34f43
3 changed files with 439 additions and 0 deletions
+230
View File
@@ -0,0 +1,230 @@
#!/usr/bin/env python3
"""
corpus_integrity.py — BirdAI Corpus Integrity Check
Compares three sources of truth:
1. Filesystem (Nextcloud) — what files exist
2. pgvector (embeddings table) — what's been through Stage 1
3. Graphiti (migration state + stage_3_queue) — what's been through Stage 3
Usage:
python3 corpus_integrity.py # report only
python3 corpus_integrity.py --fix # report + auto-queue gaps for retry
python3 corpus_integrity.py --json # output JSON to stdout
"""
import os
import sys
import json
import argparse
from pathlib import Path
from datetime import datetime
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
MIGRATION_STATE = str(Path.home() / "aaronai" / "experiments" / "tier1_migration_state.json")
REPORT_PATH = str(Path.home() / "aaronai" / "corpus_integrity_report.json")
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def get_filesystem_files():
files = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir(): continue
if path.suffix.lower() not in SUPPORTED: continue
if path.name.startswith((".", "~$")): continue
if "Admin/Backups" in str(path) or "Backups" in path.parts: continue
if "Journal/Media" in str(path): continue
files.append({"source": path.name, "filepath": str(path),
"size": path.stat().st_size, "mtime": path.stat().st_mtime})
return files
def get_pgvector_sources():
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM embeddings WHERE source IS NOT NULL")
sources = {row[0] for row in cur.fetchall()}
pg.close()
return sources
except Exception as e:
print(f"ERROR: pgvector: {e}", file=sys.stderr)
return set()
def get_graphiti_sources():
sources = set()
try:
state_path = Path(MIGRATION_STATE)
if state_path.exists():
state = json.loads(state_path.read_text())
for filepath in state.get("ingested", []):
sources.add(Path(filepath).name)
except Exception as e:
print(f"WARNING: migration state: {e}", file=sys.stderr)
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT DISTINCT source FROM stage_3_queue WHERE completed_at IS NOT NULL")
for row in cur.fetchall(): sources.add(row[0])
pg.close()
except Exception as e:
print(f"WARNING: stage_3_queue: {e}", file=sys.stderr)
return sources
def get_ingest_failures():
failures = {}
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT source, filepath, error, retry_count, first_failed_at, last_failed_at
FROM ingest_failures WHERE resolved = FALSE ORDER BY last_failed_at DESC
""")
for row in cur.fetchall():
failures[row[0]] = {"source": row[0], "filepath": row[1], "error": row[2],
"retry_count": row[3], "first_failed_at": str(row[4]),
"last_failed_at": str(row[5])}
pg.close()
except Exception as e:
print(f"WARNING: ingest_failures: {e}", file=sys.stderr)
return failures
def extract_text_for_retry(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
try:
if suffix == ".docx":
from docx import Document as D
return "\n".join(p.text for p in D(path).paragraphs if p.text.strip())
elif suffix == ".pdf":
from pypdf import PdfReader
return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(path)
return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr)
return ""
def queue_for_retry(source, full_text, filepath):
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length,
enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
return True
except Exception as e:
print(f"WARNING: queue failed {source}: {e}", file=sys.stderr)
return False
def run_reconciliation(fix=False):
print(f"BirdAI Corpus Integrity Check — {datetime.now().isoformat()}")
print()
print("Scanning filesystem...")
fs_files = get_filesystem_files()
fs_sources = {f["source"]: f for f in fs_files}
print(f" Filesystem: {len(fs_files)} files")
print("Querying pgvector...")
pv_sources = get_pgvector_sources()
print(f" pgvector: {len(pv_sources)} distinct sources")
print("Querying Graphiti...")
gr_sources = get_graphiti_sources()
print(f" Graphiti: {len(gr_sources)} sources")
print("Querying ingest failures...")
failures = get_ingest_failures()
print(f" Failures: {len(failures)} unresolved")
print()
both, pv_only, neither, gr_only = [], [], [], []
for source, finfo in fs_sources.items():
in_pv = source in pv_sources
in_gr = source in gr_sources
if in_pv and in_gr: both.append(finfo)
elif in_pv: pv_only.append(finfo)
elif in_gr: gr_only.append(finfo)
else: neither.append(finfo)
orphans_pv = pv_sources - set(fs_sources.keys())
orphans_gr = gr_sources - set(fs_sources.keys())
print(f"Results:")
print(f" Both (pgvector + Graphiti): {len(both)}")
print(f" pgvector only: {len(pv_only)}")
print(f" Neither (corpus gap): {len(neither)}")
print(f" Graphiti only: {len(gr_only)}")
print(f" Ingest failures: {len(failures)}")
print(f" pgvector orphans: {len(orphans_pv)}")
print(f" Graphiti orphans: {len(orphans_gr)}")
print()
auto_queued = []
if fix and neither:
print(f"Auto-queuing {len(neither)} gap files...")
for finfo in neither:
text = extract_text_for_retry(finfo["filepath"])
if text.strip():
if queue_for_retry(finfo["source"], text, finfo["filepath"]):
auto_queued.append(finfo["source"])
print(f" Queued: {finfo['source']}")
else:
print(f" Skipped (unreadable): {finfo['source']}")
print()
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"filesystem_total": len(fs_files), "pgvector_total": len(pv_sources),
"graphiti_total": len(gr_sources), "both": len(both),
"pgvector_only": len(pv_only), "neither": len(neither),
"graphiti_only": len(gr_only), "failures": len(failures),
"orphans_pgvector": len(orphans_pv), "orphans_graphiti": len(orphans_gr),
},
"gaps": [f["source"] for f in neither],
"failures": list(failures.values()),
"auto_queued": auto_queued,
"pgvector_only_sample": [f["source"] for f in pv_only[:20]],
"graphiti_only": list(gr_only),
}
Path(REPORT_PATH).write_text(json.dumps(report, indent=2))
print(f"Report written to: {REPORT_PATH}")
return report
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--fix", action="store_true")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
report = run_reconciliation(fix=args.fix)
if args.json:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()