diff --git a/scripts/api.py b/scripts/api.py index 7da8df4..45420ef 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -31,6 +31,9 @@ from fastapi.responses import StreamingResponse from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger +from encoding import extract_text as encoding_extract_text +from ingest import ingest_directory + load_dotenv(Path.home() / "aaronai" / ".env") MEMORY_PATH = Path.home() / "aaronai" / "memory.md" @@ -39,7 +42,6 @@ SETTINGS_PATH = Path.home() / "aaronai" / "settings.json" WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" -INGEST_SCRIPT = str(Path.home() / "aaronai" / "scripts" / "ingest.py") PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3") DEFAULT_SETTINGS = { @@ -908,13 +910,62 @@ async def list_captures(): except Exception as e: return JSONResponse({"captures": []}) -@app.post("/api/reindex") -async def trigger_reindex(auth: str = Depends(require_auth)): +REINDEX_STATUS_PATH = Path.home() / "aaronai" / "reindex_status.json" + + +def _read_reindex_status() -> dict: + if REINDEX_STATUS_PATH.exists(): + try: + return json.loads(REINDEX_STATUS_PATH.read_text()) + except Exception: + return {} + return {} + + +def _write_reindex_status(state: dict): + REINDEX_STATUS_PATH.write_text(json.dumps(state, indent=2)) + + +def _reindex_running() -> bool: + return _read_reindex_status().get("status") == "running" + + +def _run_reindex_background(): + """Background-thread entry: shares api.py's module-level embedder.""" + started = datetime.now().isoformat() + _write_reindex_status({"status": "running", "started_at": started}) try: - subprocess.Popen([PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH]) - return JSONResponse({"started": True, "message": "Re-indexing started in background"}) + result = ingest_directory(Path(NEXTCLOUD_PATH), embedder=embedder) + _write_reindex_status({ + "status": "complete", + "started_at": started, + "finished_at": datetime.now().isoformat(), + **result, + }) except Exception as e: - return JSONResponse({"started": False, "error": str(e)}) + _write_reindex_status({ + "status": "error", + "started_at": started, + "finished_at": datetime.now().isoformat(), + "error": str(e), + }) + + +@app.post("/api/reindex") +async def trigger_reindex(background_tasks: BackgroundTasks, + auth: str = Depends(require_auth)): + if _reindex_running(): + return JSONResponse( + {"started": False, "message": "reindex already running"}, + status_code=409, + ) + background_tasks.add_task(_run_reindex_background) + return JSONResponse({"started": True, "message": "Re-indexing started in background"}) + + +@app.get("/api/reindex/status") +async def reindex_status(auth: str = Depends(require_auth)): + return JSONResponse(_read_reindex_status()) @app.delete("/api/conversations") async def clear_all_conversations(auth: str = Depends(require_auth)): @@ -1042,22 +1093,8 @@ async def corpus_retry(request: Request, auth: str = Depends(require_auth)): filepath = Path(row[0]) if not filepath.exists(): return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404) - suffix = filepath.suffix.lower() - text = "" try: - if suffix in {".txt", ".md"}: - text = filepath.read_text(encoding="utf-8", errors="ignore") - elif suffix == ".pdf": - from pypdf import PdfReader - text = "".join(p.extract_text() + "\n" for p in PdfReader(filepath).pages if p.extract_text()) - elif suffix == ".docx": - from docx import Document as DocxDocument - text = "\n".join(p.text for p in DocxDocument(filepath).paragraphs if p.text.strip()) - elif suffix == ".pptx": - from pptx import Presentation - prs = Presentation(filepath) - text = "\n".join(shape.text for slide in prs.slides for shape in slide.shapes - if hasattr(shape, "text") and shape.text.strip()) + text = encoding_extract_text(filepath) except Exception as e: return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500) if not text.strip(): diff --git a/scripts/corpus_integrity.py b/scripts/corpus_integrity.py index 4208ba6..78035ae 100644 --- a/scripts/corpus_integrity.py +++ b/scripts/corpus_integrity.py @@ -23,6 +23,9 @@ from datetime import datetime import psycopg2 from dotenv import load_dotenv +sys.path.insert(0, str(Path(__file__).parent)) +from encoding import extract_text + load_dotenv(Path.home() / "aaronai" / ".env", override=True) NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" @@ -103,28 +106,6 @@ def get_ingest_failures(): return failures -def extract_text_for_retry(filepath): - path = Path(filepath) - suffix = path.suffix.lower() - try: - if suffix == ".docx": - from docx import Document as D - return "\n".join(p.text for p in D(path).paragraphs if p.text.strip()) - elif suffix == ".pdf": - from pypdf import PdfReader - return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text()) - elif suffix == ".pptx": - from pptx import Presentation - prs = Presentation(path) - return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes - if hasattr(shape, "text") and shape.text.strip()) - elif suffix in {".txt", ".md"}: - return path.read_text(encoding="utf-8", errors="ignore") - except Exception as e: - print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr) - return "" - - def queue_for_retry(source, full_text, filepath): try: pg = get_pg() @@ -188,7 +169,7 @@ def run_reconciliation(fix=False): if fix and neither: print(f"Auto-queuing {len(neither)} gap files...") for finfo in neither: - text = extract_text_for_retry(finfo["filepath"]) + text = extract_text(Path(finfo["filepath"])) if text.strip(): if queue_for_retry(finfo["source"], text, finfo["filepath"]): auto_queued.append(finfo["source"]) diff --git a/scripts/encoding.py b/scripts/encoding.py new file mode 100644 index 0000000..7b91fa4 --- /dev/null +++ b/scripts/encoding.py @@ -0,0 +1,120 @@ +""" +Aaron AI Stage 1 encoding helpers — single canonical implementation of: + - extract_text(filepath) — four-extension text extraction + - chunk_text(text, chunk_size, overlap) — word-based chunking + - chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows + - write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT + +Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry. +Replaces four separate extract reimplementations and two extract-chunk-embed paths. +""" + +import hashlib +import json +import logging +from pathlib import Path + +from docx import Document as DocxDocument +from pypdf import PdfReader +from pptx import Presentation + +log = logging.getLogger("encoding") + +SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"} +DEFAULT_CHUNK_SIZE = 500 +DEFAULT_CHUNK_OVERLAP = 50 + + +def extract_text(filepath: Path) -> str: + """Return the text of a supported file. Returns "" on any failure or + unsupported extension. Does not write to ingest_failures — caller decides.""" + suffix = filepath.suffix.lower() + try: + if suffix == ".docx": + doc = DocxDocument(filepath) + return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) + elif suffix == ".pdf": + reader = PdfReader(filepath) + return "".join( + page.extract_text() + "\n" + for page in reader.pages if page.extract_text() + ) + elif suffix == ".pptx": + prs = Presentation(filepath) + return "\n".join( + shape.text for slide in prs.slides + for shape in slide.shapes + if hasattr(shape, "text") and shape.text.strip() + ) + elif suffix in {".txt", ".md"}: + return filepath.read_text(encoding="utf-8", errors="ignore") + except Exception as e: + log.warning(f"Text extraction failed for {filepath.name}: {e}") + return "" + + +def chunk_text(text: str, + chunk_size: int = DEFAULT_CHUNK_SIZE, + overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]: + """Word-based chunking. Empty chunks filtered.""" + words = text.split() + chunks = [] + start = 0 + while start < len(words): + chunk = " ".join(words[start:start + chunk_size]) + if chunk.strip(): + chunks.append(chunk) + start += chunk_size - overlap + return chunks + + +def _chunk_id(filepath, source: str, index: int) -> str: + basis = str(filepath) if filepath else source + return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}" + + +def chunk_and_embed(text: str, + source: str, + embedder, + filepath=None, + folder=None) -> list[dict]: + """Chunk text, embed each chunk, return rows ready for write_embeddings_batch.""" + chunks = chunk_text(text) + if not chunks: + return [] + embeddings = embedder.encode(chunks).tolist() + rows = [] + for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): + rows.append({ + "id": _chunk_id(filepath, source, i), + "document": chunk, + "embedding": emb, + "source": source, + "type": "document", + "metadata": { + "source": source, + "filepath": str(filepath) if filepath else source, + "folder": folder, + }, + }) + return rows + + +def write_embeddings_batch(conn, batch: list[dict]) -> int: + """Single canonical INSERT. Sets created_at = NOW() server-side. Commits.""" + if not batch: + return 0 + cur = conn.cursor() + for row in batch: + cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + metadata = EXCLUDED.metadata + """, (row["id"], row["document"], row["embedding"], + row["source"], row["type"], json.dumps(row["metadata"]))) + conn.commit() + return len(batch) diff --git a/scripts/failures.py b/scripts/failures.py new file mode 100644 index 0000000..8bc13bf --- /dev/null +++ b/scripts/failures.py @@ -0,0 +1,30 @@ +""" +Aaron AI ingest_failures helpers — shared by watcher.py and ingest.py. + +Both modules write structured failure rows so the SettingsPanel "Ingest Health" +view sees the same shape regardless of ingest path. Functions take an explicit +conn parameter; the caller decides transaction boundaries and exception +handling. Both current callers wrap with their own log-and-swallow shims. +""" + + +def record_ingest_failure(conn, source: str, filepath, error: str) -> None: + """Insert or update an ingest_failures row. Commits.""" + cur = conn.cursor() + cur.execute(""" + INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) + VALUES (%s, %s, %s, 0, NOW(), NOW()) + ON CONFLICT (source) DO UPDATE SET + error = EXCLUDED.error, + retry_count = ingest_failures.retry_count + 1, + last_failed_at = NOW(), + resolved = FALSE + """, (source, str(filepath), error[:1000])) + conn.commit() + + +def resolve_ingest_failure(conn, source: str) -> None: + """Mark a previously failed source as resolved. Commits.""" + cur = conn.cursor() + cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,)) + conn.commit() diff --git a/scripts/ingest.py b/scripts/ingest.py index 5d32d5b..9cbfa6d 100644 --- a/scripts/ingest.py +++ b/scripts/ingest.py @@ -1,70 +1,37 @@ +""" +Aaron AI bulk ingester. Two entry points: + - ingest_directory(folder, embedder=None) — programmatic; called from + api.py /api/reindex with the api process's shared embedder + - python3 scripts/ingest.py — CLI back-compat; loads its own embedder + +Stage 1 helpers (extract / chunk / embed / write) live in scripts/encoding.py. +Failure tracking SQL lives in scripts/failures.py. +""" + import os import sys -import hashlib from pathlib import Path from dotenv import load_dotenv import psycopg2 -import psycopg2.extras -import json from sentence_transformers import SentenceTransformer -from docx import Document -from pypdf import PdfReader -from pptx import Presentation + +from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED +from failures import ( + record_ingest_failure as _record_failure_sql, + resolve_ingest_failure as _resolve_failure_sql, +) load_dotenv(Path.home() / "aaronai" / ".env", override=True) -print("Loading embedding model...") -embedder = SentenceTransformer("all-MiniLM-L6-v2") - PG_DSN = os.getenv("PG_DSN") + def get_pg(): return psycopg2.connect(PG_DSN) -def extract_text_from_docx(path): - doc = Document(path) - return "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) - -def extract_text_from_pdf(path): - reader = PdfReader(path) - text = "" - for page in reader.pages: - extracted = page.extract_text() - if extracted: - text += extracted + "\n" - return text - -def extract_text_from_pptx(path): - prs = Presentation(path) - text = "" - for slide in prs.slides: - for shape in slide.shapes: - if hasattr(shape, "text") and shape.text.strip(): - text += shape.text + "\n" - return text - -def extract_text_from_txt(path): - with open(path, "r", encoding="utf-8", errors="ignore") as f: - return f.read() - -def chunk_text(text, chunk_size=500, overlap=50): - words = text.split() - chunks = [] - start = 0 - while start < len(words): - end = start + chunk_size - chunk = " ".join(words[start:end]) - if chunk.strip(): - chunks.append(chunk) - start += chunk_size - overlap - return chunks - -def make_id(filepath, chunk_index): - path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8] - return f"{path_hash}_{chunk_index}" def enqueue_stage2(source, full_text): - """Enqueue document for Stage 2 (Mistral orientation) → Stage 3 (Graphiti ingest). + """Enqueue document for Stage 2 (Mistral orientation) -> Stage 3 (Graphiti ingest). TEMPORARY: this queue feed will be removed when pgvector is decommissioned and the watcher calls Stage 2 directly. """ @@ -87,94 +54,108 @@ def enqueue_stage2(source, full_text): except Exception as e: print(f" Stage 2 queue insert failed (non-fatal): {e}") -def ingest_file(filepath): - path = Path(filepath) - suffix = path.suffix.lower() - - if path.name.startswith("~$") or path.name.startswith("."): - return 0 +def _record_failure(filepath: Path, error: str) -> None: try: - if suffix == ".docx": - text = extract_text_from_docx(path) - elif suffix == ".pdf": - text = extract_text_from_pdf(path) - elif suffix == ".pptx": - text = extract_text_from_pptx(path) - elif suffix in [".txt", ".md"]: - text = extract_text_from_txt(path) - else: - return 0 - - if not text.strip(): - return 0 - - chunks = chunk_text(text) - if not chunks: - return 0 - - embeddings = embedder.encode(chunks).tolist() - ids = [make_id(path, i) for i in range(len(chunks))] - metadatas = [{ - "source": path.name, - "filepath": str(path), - "folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent)) - } for _ in chunks] - - # STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti) pg = get_pg() - cur = pg.cursor() - for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas): - cur.execute(""" - INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) - VALUES (%s, %s, %s::vector, %s, %s, %s, %s) - ON CONFLICT (id) DO UPDATE SET - document = EXCLUDED.document, - embedding = EXCLUDED.embedding, - source = EXCLUDED.source, - metadata = EXCLUDED.metadata - """, ( - chunk_id, chunk, embedding, - meta.get("source"), "document", None, - json.dumps(meta) - )) - pg.commit() - pg.close() - print(f" Indexed {len(chunks)} chunks: {path.name}") - - # Enqueue for Stage 2 → Stage 3 (Graphiti pipeline) - # SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue - if not os.getenv("SKIP_STAGE2_ENQUEUE"): - enqueue_stage2(path.name, text) - - return len(chunks) - + try: + _record_failure_sql(pg, filepath.name, filepath, error) + finally: + pg.close() except Exception as e: - print(f" Error: {path.name}: {e}") + print(f" Could not record ingest failure (non-fatal): {e}") + + +def _resolve_failure(source: str) -> None: + try: + pg = get_pg() + try: + _resolve_failure_sql(pg, source) + finally: + pg.close() + except Exception as e: + print(f" Could not resolve ingest failure record (non-fatal): {e}") + + +def _ingest_one(filepath: Path, embedder, root: Path = None) -> int: + """Ingest a single file. Returns chunk count, 0 on skip/failure.""" + if filepath.name.startswith(("~$", ".")): return 0 + if filepath.suffix.lower() not in SUPPORTED: + return 0 + text = extract_text(filepath) + if not text.strip(): + _record_failure(filepath, "Text extraction failed or empty") + return 0 + folder_rel = None + if root is not None: + try: + folder_rel = str(filepath.parent.relative_to(root)) + except ValueError: + pass + try: + rows = chunk_and_embed(text, filepath.name, embedder, + filepath=filepath, folder=folder_rel) + except Exception as e: + _record_failure(filepath, f"Embedding failed: {e}") + return 0 + if not rows: + return 0 + try: + pg = get_pg() + try: + write_embeddings_batch(pg, rows) + finally: + pg.close() + except Exception as e: + _record_failure(filepath, f"pgvector write failed: {e}") + return 0 + print(f" Indexed {len(rows)} chunks: {filepath.name}") + _resolve_failure(filepath.name) + if not os.getenv("SKIP_STAGE2_ENQUEUE"): + enqueue_stage2(filepath.name, text) + return len(rows) + + +def ingest_directory(folder, embedder=None) -> dict: + """Programmatic entry point. Returns {scanned, ingested, failed, total_chunks}. + + If embedder is None, loads its own SentenceTransformer (CLI back-compat path). + Caller (e.g. api.py /api/reindex) should pass its module-level embedder so + the ~200MB model isn't reloaded per call. + """ + folder = Path(folder) + if not folder.exists(): + return {"scanned": 0, "ingested": 0, "failed": 0, "total_chunks": 0, + "error": f"folder not found: {folder}"} + + if embedder is None: + print("Loading embedding model...") + embedder = SentenceTransformer("all-MiniLM-L6-v2") + + files = [f for f in folder.rglob("*") + if f.suffix.lower() in SUPPORTED + and not f.name.startswith(("~$", "."))] + print(f"Found {len(files)} files to process") + + ingested = failed = total_chunks = 0 + for f in files: + n = _ingest_one(f, embedder, root=folder) + if n > 0: + ingested += 1 + total_chunks += n + else: + failed += 1 + return {"scanned": len(files), "ingested": ingested, "failed": failed, + "total_chunks": total_chunks} + def ingest_folder(folder_path): - folder = Path(folder_path) - if not folder.exists(): - print(f"Folder not found: {folder_path}") - sys.exit(1) + """CLI back-compat wrapper. Loads its own embedder.""" + result = ingest_directory(Path(folder_path)) + print(f"\nDone. {result['ingested']} files / {result['total_chunks']} chunks indexed; " + f"{result['failed']} failed.") - supported = [".docx", ".pdf", ".pptx", ".txt", ".md"] - files = [f for f in folder.rglob("*") - if f.suffix.lower() in supported - and not f.name.startswith("~$") - and not f.name.startswith(".")] - - if not files: - print("No supported files found.") - sys.exit(1) - - print(f"Found {len(files)} files to process\n") - total_chunks = 0 - for f in files: - total_chunks += ingest_file(f) - - print(f"\nDone. Total chunks indexed: {total_chunks}") if __name__ == "__main__": target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs") diff --git a/scripts/watcher.py b/scripts/watcher.py index fe7852b..506a560 100644 --- a/scripts/watcher.py +++ b/scripts/watcher.py @@ -19,7 +19,6 @@ Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3 import os import time import json -import hashlib import logging import threading from pathlib import Path @@ -30,9 +29,11 @@ from sentence_transformers import SentenceTransformer from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from docx import Document as DocxDocument -from pypdf import PdfReader -from pptx import Presentation +from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED +from failures import ( + record_ingest_failure as _record_failure_sql, + resolve_ingest_failure as _resolve_failure_sql, +) load_dotenv(Path.home() / "aaronai" / ".env", override=True) @@ -42,10 +43,7 @@ STATE_FILE = "/home/aaron/aaronai/watcher_state.json" STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat" -SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"} DEBOUNCE_SECONDS = 120 -CHUNK_SIZE = 500 -CHUNK_OVERLAP = 50 EMBED_MODEL = "all-MiniLM-L6-v2" PG_DSN = os.getenv("PG_DSN") @@ -76,49 +74,6 @@ def get_pg(): return psycopg2.connect(PG_DSN) -def extract_text(path: Path) -> str: - suffix = path.suffix.lower() - try: - if suffix == ".docx": - doc = DocxDocument(path) - return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) - elif suffix == ".pdf": - reader = PdfReader(path) - return "".join( - page.extract_text() + "\n" - for page in reader.pages if page.extract_text() - ) - elif suffix == ".pptx": - prs = Presentation(path) - return "\n".join( - shape.text for slide in prs.slides - for shape in slide.shapes - if hasattr(shape, "text") and shape.text.strip() - ) - elif suffix in {".txt", ".md"}: - return path.read_text(encoding="utf-8", errors="ignore") - except Exception as e: - log.warning(f"Text extraction failed for {path.name}: {e}") - record_ingest_failure(path, f"Text extraction failed: {e}") - return "" - - -def chunk_text(text: str) -> list: - words = text.split() - chunks = [] - start = 0 - while start < len(words): - chunk = " ".join(words[start:start + CHUNK_SIZE]) - if chunk.strip(): - chunks.append(chunk) - start += CHUNK_SIZE - CHUNK_OVERLAP - return chunks - - -def make_chunk_id(filepath: Path, chunk_index: int) -> str: - return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}" - - def enqueue_stage2(source: str, full_text: str): if os.getenv("SKIP_STAGE2_ENQUEUE"): return @@ -143,21 +98,15 @@ def enqueue_stage2(source: str, full_text: str): def record_ingest_failure(filepath: Path, error: str): - """Write extraction or ingest failure to ingest_failures table for UI visibility.""" + """Write extraction or ingest failure to ingest_failures table for UI visibility. + Local wrapper around failures.record_ingest_failure — opens conn, delegates, + logs non-fatal errors so the caller never has to handle them.""" try: pg = get_pg() - cur = pg.cursor() - cur.execute(""" - INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) - VALUES (%s, %s, %s, 0, NOW(), NOW()) - ON CONFLICT (source) DO UPDATE SET - error = EXCLUDED.error, - retry_count = ingest_failures.retry_count + 1, - last_failed_at = NOW(), - resolved = FALSE - """, (filepath.name, str(filepath), error[:1000])) - pg.commit() - pg.close() + try: + _record_failure_sql(pg, filepath.name, filepath, error) + finally: + pg.close() except Exception as e: log.warning(f"Could not record ingest failure (non-fatal): {e}") @@ -166,10 +115,10 @@ def resolve_ingest_failure(source: str): """Mark a previously failed file as resolved after successful ingest.""" try: pg = get_pg() - cur = pg.cursor() - cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,)) - pg.commit() - pg.close() + try: + _resolve_failure_sql(pg, source) + finally: + pg.close() except Exception as e: log.warning(f"Could not resolve ingest failure record (non-fatal): {e}") @@ -181,42 +130,37 @@ def ingest_file(filepath: Path, embedder) -> int: return 0 text = extract_text(filepath) if not text.strip(): + record_ingest_failure(filepath, "Text extraction failed or empty") return 0 - chunks = chunk_text(text) - if not chunks: - return 0 + folder_rel = None try: - embeddings = embedder.encode(chunks).tolist() + folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH)) + except ValueError: + pass + try: + rows = chunk_and_embed(text, filepath.name, embedder, + filepath=filepath, folder=folder_rel) except Exception as e: log.error(f"Embedding failed for {filepath.name}: {e}") record_ingest_failure(filepath, f"Embedding failed: {e}") return 0 + if not rows: + return 0 source = filepath.name try: - pg = get_pg() - cur = pg.cursor() - for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): - chunk_id = make_chunk_id(filepath, i) - cur.execute(""" - INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) - VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) - ON CONFLICT (id) DO UPDATE SET - document = EXCLUDED.document, - embedding = EXCLUDED.embedding, - source = EXCLUDED.source, - metadata = EXCLUDED.metadata - """, (chunk_id, chunk, embedding, source, "document", - json.dumps({"source": source, "filepath": str(filepath)}))) - pg.commit() - pg.close() + pg = get_pg() + try: + write_embeddings_batch(pg, rows) + finally: + pg.close() except Exception as e: log.error(f"pgvector write failed for {filepath.name}: {e}") record_ingest_failure(filepath, f"pgvector write failed: {e}") return 0 - log.info(f"Indexed {len(chunks)} chunks: {filepath.name}") + log.info(f"Indexed {len(rows)} chunks: {filepath.name}") resolve_ingest_failure(source) enqueue_stage2(source, text) - return len(chunks) + return len(rows) def ingest_files(paths: list, embedder, state: dict) -> dict: