""" Aaron AI bulk ingester. Two entry points: - ingest_directory(folder, embedder=None) — programmatic; called from api.py /api/reindex with the api process's shared embedder - python3 scripts/ingest.py — CLI back-compat; loads its own embedder Stage 1 helpers (extract / chunk / embed / write) live in scripts/encoding.py. Failure tracking SQL lives in scripts/failures.py. """ import os import sys from pathlib import Path from dotenv import load_dotenv import psycopg2 from sentence_transformers import SentenceTransformer from encoding import extract_blocks, chunk_and_embed, write_embeddings_batch, SUPPORTED from failures import ( record_ingest_failure as _record_failure_sql, resolve_ingest_failure as _resolve_failure_sql, ) load_dotenv(Path.home() / "aaronai" / ".env", override=True) PG_DSN = os.getenv("PG_DSN") def get_pg(): return psycopg2.connect(PG_DSN) def enqueue_stage2(source, full_text): """Enqueue document for Stage 2 (Mistral orientation) -> Stage 3 (Graphiti ingest). TEMPORARY: this queue feed will be removed when pgvector is decommissioned and the watcher calls Stage 2 directly. """ try: pg = get_pg() cur = pg.cursor() cur.execute(""" INSERT INTO stage_2_queue (source, full_text, char_length) VALUES (%s, %s, %s) ON CONFLICT (source) DO UPDATE SET full_text = EXCLUDED.full_text, char_length = EXCLUDED.char_length, enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0 """, (source, full_text, len(full_text))) pg.commit() pg.close() except Exception as e: print(f" Stage 2 queue insert failed (non-fatal): {e}") def _record_failure(filepath: Path, error: str) -> None: try: pg = get_pg() try: _record_failure_sql(pg, filepath.name, filepath, error) finally: pg.close() except Exception as e: print(f" Could not record ingest failure (non-fatal): {e}") def _resolve_failure(source: str) -> None: try: pg = get_pg() try: _resolve_failure_sql(pg, source) finally: pg.close() except Exception as e: print(f" Could not resolve ingest failure record (non-fatal): {e}") IGNORED_TOP_FOLDERS = {"Drafts"} def _ingest_one(filepath: Path, embedder, root: Path = None) -> int: """Ingest a single file. Returns chunk count, 0 on skip/failure.""" if filepath.name.startswith(("~$", ".")): return 0 if filepath.suffix.lower() not in SUPPORTED: return 0 if root is not None: try: rel = filepath.parent.relative_to(root) if rel.parts and rel.parts[0] in IGNORED_TOP_FOLDERS: return 0 except ValueError: pass blocks = extract_blocks(filepath) if not blocks or not any( (b.get("text") or "").strip() or (b.get("heading") or "").strip() for b in blocks ): _record_failure(filepath, "Text extraction failed or empty") return 0 folder_rel = None if root is not None: try: folder_rel = str(filepath.parent.relative_to(root)) except ValueError: pass try: rows = chunk_and_embed(blocks, filepath.name, embedder, filepath=filepath, folder=folder_rel) except Exception as e: _record_failure(filepath, f"Embedding failed: {e}") return 0 if not rows: return 0 try: pg = get_pg() try: write_embeddings_batch(pg, rows) finally: pg.close() except Exception as e: _record_failure(filepath, f"pgvector write failed: {e}") return 0 print(f" Indexed {len(rows)} chunks: {filepath.name}") _resolve_failure(filepath.name) if not os.getenv("SKIP_STAGE2_ENQUEUE"): full_text = "\n".join( f"{b['heading']}\n{b['text']}" if b.get("heading") else b.get("text", "") for b in blocks ) enqueue_stage2(filepath.name, full_text) return len(rows) def ingest_directory(folder, embedder=None) -> dict: """Programmatic entry point. Returns {scanned, ingested, failed, total_chunks}. If embedder is None, loads its own SentenceTransformer (CLI back-compat path). Caller (e.g. api.py /api/reindex) should pass its module-level embedder so the ~200MB model isn't reloaded per call. """ folder = Path(folder) if not folder.exists(): return {"scanned": 0, "ingested": 0, "failed": 0, "total_chunks": 0, "error": f"folder not found: {folder}"} if embedder is None: print("Loading embedding model...") embedder = SentenceTransformer("all-MiniLM-L6-v2") files = [f for f in folder.rglob("*") if f.suffix.lower() in SUPPORTED and not f.name.startswith(("~$", "."))] print(f"Found {len(files)} files to process") ingested = failed = total_chunks = 0 for f in files: n = _ingest_one(f, embedder, root=folder) if n > 0: ingested += 1 total_chunks += n else: failed += 1 return {"scanned": len(files), "ingested": ingested, "failed": failed, "total_chunks": total_chunks} def ingest_folder(folder_path): """CLI back-compat wrapper. Loads its own embedder.""" result = ingest_directory(Path(folder_path)) print(f"\nDone. {result['ingested']} files / {result['total_chunks']} chunks indexed; " f"{result['failed']} failed.") if __name__ == "__main__": target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs") print(f"Ingesting from: {target}\n") ingest_folder(target)