scripts/encoding.py: Stage 1 dual-implementation consolidation (Track 1 Finding 11)

Consolidates four extract paths and two extract-chunk-embed-write pipelines
into a single shared encoding module. Fixes the embedder lifecycle
divergence between watcher and /api/reindex (no more 200MB reload per
reindex click) and unifies failure tracking so /api/reindex failures now
surface in SettingsPanel "Ingest Health".

New files:
- scripts/encoding.py — extract_text, chunk_text, chunk_and_embed,
  write_embeddings_batch
- scripts/failures.py — record_ingest_failure, resolve_ingest_failure
  (shared by watcher.py and ingest.py)

Refactored:
- scripts/watcher.py — drops local extract/chunk/embed implementations
  and CHUNK_SIZE/CHUNK_OVERLAP/SUPPORTED constants; imports from encoding
  and failures. Now writes ingest_failures row on empty-text-extract
  (was silent return 0).
- scripts/ingest.py — substantial rewrite. Exposes ingest_directory(folder,
  embedder=None) for in-process invocation; CLI back-compat preserved via
  ingest_folder wrapper. Module-level SentenceTransformer load removed.
- scripts/corpus_integrity.py — imports extract_text from encoding;
  extract_text_for_retry function removed.
- scripts/api.py — /api/reindex rewritten with BackgroundTasks (uses
  module-level embedder; no subprocess); new /api/reindex/status endpoint
  reading ~/aaronai/reindex_status.json; /api/corpus/retry imports
  extract_text from encoding; INGEST_SCRIPT constant removed (dead after
  this refactor); 409 reentrance guard prevents double-click stomping.

Behavior changes:
- /api/reindex no longer subprocess.Popens; runs in FastAPI BackgroundTasks
  threadpool, doesn't block API thread.
- /api/reindex no longer reloads SentenceTransformer on each click.
- /api/reindex failures newly write to ingest_failures (visible in
  SettingsPanel "Ingest Health" — badge will jump on first reindex).
- New embeddings rows always have created_at = NOW() (canonical, server-side).
- New embeddings rows always include metadata.folder field (None when not
  derivable).
- /api/reindex returns 409 on second click while a job is running.
- New /api/reindex/status endpoint for polling.

Existing 9,815 NULL created_at rows remain unchanged; backfill is a
separate decision if desired.

199 insertions, 256 deletions across 6 files (codebase shrinks net).

Found by Track 1 inventory 2026-05-02 (Finding 11 / cross-cutting F11).
Pre-commit verification: BackgroundTasks already imported, sys.path
resolves correctly via script-path semantics, static import clean.
This commit is contained in:
2026-05-03 01:40:47 +00:00
parent a317df66f8
commit 1101bef226
6 changed files with 357 additions and 264 deletions
+58 -21
View File
@@ -31,6 +31,9 @@ from fastapi.responses import StreamingResponse
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from encoding import extract_text as encoding_extract_text
from ingest import ingest_directory
load_dotenv(Path.home() / "aaronai" / ".env") load_dotenv(Path.home() / "aaronai" / ".env")
MEMORY_PATH = Path.home() / "aaronai" / "memory.md" MEMORY_PATH = Path.home() / "aaronai" / "memory.md"
@@ -39,7 +42,6 @@ SETTINGS_PATH = Path.home() / "aaronai" / "settings.json"
WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log") WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = str(Path.home() / "aaronai" / "scripts" / "ingest.py")
PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3") PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3")
DEFAULT_SETTINGS = { DEFAULT_SETTINGS = {
@@ -908,13 +910,62 @@ async def list_captures():
except Exception as e: except Exception as e:
return JSONResponse({"captures": []}) return JSONResponse({"captures": []})
@app.post("/api/reindex") REINDEX_STATUS_PATH = Path.home() / "aaronai" / "reindex_status.json"
async def trigger_reindex(auth: str = Depends(require_auth)):
def _read_reindex_status() -> dict:
if REINDEX_STATUS_PATH.exists():
try: try:
subprocess.Popen([PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH]) return json.loads(REINDEX_STATUS_PATH.read_text())
return JSONResponse({"started": True, "message": "Re-indexing started in background"}) except Exception:
return {}
return {}
def _write_reindex_status(state: dict):
REINDEX_STATUS_PATH.write_text(json.dumps(state, indent=2))
def _reindex_running() -> bool:
return _read_reindex_status().get("status") == "running"
def _run_reindex_background():
"""Background-thread entry: shares api.py's module-level embedder."""
started = datetime.now().isoformat()
_write_reindex_status({"status": "running", "started_at": started})
try:
result = ingest_directory(Path(NEXTCLOUD_PATH), embedder=embedder)
_write_reindex_status({
"status": "complete",
"started_at": started,
"finished_at": datetime.now().isoformat(),
**result,
})
except Exception as e: except Exception as e:
return JSONResponse({"started": False, "error": str(e)}) _write_reindex_status({
"status": "error",
"started_at": started,
"finished_at": datetime.now().isoformat(),
"error": str(e),
})
@app.post("/api/reindex")
async def trigger_reindex(background_tasks: BackgroundTasks,
auth: str = Depends(require_auth)):
if _reindex_running():
return JSONResponse(
{"started": False, "message": "reindex already running"},
status_code=409,
)
background_tasks.add_task(_run_reindex_background)
return JSONResponse({"started": True, "message": "Re-indexing started in background"})
@app.get("/api/reindex/status")
async def reindex_status(auth: str = Depends(require_auth)):
return JSONResponse(_read_reindex_status())
@app.delete("/api/conversations") @app.delete("/api/conversations")
async def clear_all_conversations(auth: str = Depends(require_auth)): async def clear_all_conversations(auth: str = Depends(require_auth)):
@@ -1042,22 +1093,8 @@ async def corpus_retry(request: Request, auth: str = Depends(require_auth)):
filepath = Path(row[0]) filepath = Path(row[0])
if not filepath.exists(): if not filepath.exists():
return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404) return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404)
suffix = filepath.suffix.lower()
text = ""
try: try:
if suffix in {".txt", ".md"}: text = encoding_extract_text(filepath)
text = filepath.read_text(encoding="utf-8", errors="ignore")
elif suffix == ".pdf":
from pypdf import PdfReader
text = "".join(p.extract_text() + "\n" for p in PdfReader(filepath).pages if p.extract_text())
elif suffix == ".docx":
from docx import Document as DocxDocument
text = "\n".join(p.text for p in DocxDocument(filepath).paragraphs if p.text.strip())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(filepath)
text = "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
except Exception as e: except Exception as e:
return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500) return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500)
if not text.strip(): if not text.strip():
+4 -23
View File
@@ -23,6 +23,9 @@ from datetime import datetime
import psycopg2 import psycopg2
from dotenv import load_dotenv from dotenv import load_dotenv
sys.path.insert(0, str(Path(__file__).parent))
from encoding import extract_text
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
@@ -103,28 +106,6 @@ def get_ingest_failures():
return failures return failures
def extract_text_for_retry(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
try:
if suffix == ".docx":
from docx import Document as D
return "\n".join(p.text for p in D(path).paragraphs if p.text.strip())
elif suffix == ".pdf":
from pypdf import PdfReader
return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(path)
return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr)
return ""
def queue_for_retry(source, full_text, filepath): def queue_for_retry(source, full_text, filepath):
try: try:
pg = get_pg() pg = get_pg()
@@ -188,7 +169,7 @@ def run_reconciliation(fix=False):
if fix and neither: if fix and neither:
print(f"Auto-queuing {len(neither)} gap files...") print(f"Auto-queuing {len(neither)} gap files...")
for finfo in neither: for finfo in neither:
text = extract_text_for_retry(finfo["filepath"]) text = extract_text(Path(finfo["filepath"]))
if text.strip(): if text.strip():
if queue_for_retry(finfo["source"], text, finfo["filepath"]): if queue_for_retry(finfo["source"], text, finfo["filepath"]):
auto_queued.append(finfo["source"]) auto_queued.append(finfo["source"])
+120
View File
@@ -0,0 +1,120 @@
"""
Aaron AI Stage 1 encoding helpers — single canonical implementation of:
- extract_text(filepath) — four-extension text extraction
- chunk_text(text, chunk_size, overlap) — word-based chunking
- chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows
- write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT
Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry.
Replaces four separate extract reimplementations and two extract-chunk-embed paths.
"""
import hashlib
import json
import logging
from pathlib import Path
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
log = logging.getLogger("encoding")
SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
DEFAULT_CHUNK_SIZE = 500
DEFAULT_CHUNK_OVERLAP = 50
def extract_text(filepath: Path) -> str:
"""Return the text of a supported file. Returns "" on any failure or
unsupported extension. Does not write to ingest_failures — caller decides."""
suffix = filepath.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(filepath)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(filepath)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(filepath)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return filepath.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {filepath.name}: {e}")
return ""
def chunk_text(text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]:
"""Word-based chunking. Empty chunks filtered."""
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + chunk_size])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def _chunk_id(filepath, source: str, index: int) -> str:
basis = str(filepath) if filepath else source
return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}"
def chunk_and_embed(text: str,
source: str,
embedder,
filepath=None,
folder=None) -> list[dict]:
"""Chunk text, embed each chunk, return rows ready for write_embeddings_batch."""
chunks = chunk_text(text)
if not chunks:
return []
embeddings = embedder.encode(chunks).tolist()
rows = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
rows.append({
"id": _chunk_id(filepath, source, i),
"document": chunk,
"embedding": emb,
"source": source,
"type": "document",
"metadata": {
"source": source,
"filepath": str(filepath) if filepath else source,
"folder": folder,
},
})
return rows
def write_embeddings_batch(conn, batch: list[dict]) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits."""
if not batch:
return 0
cur = conn.cursor()
for row in batch:
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"])))
conn.commit()
return len(batch)
+30
View File
@@ -0,0 +1,30 @@
"""
Aaron AI ingest_failures helpers — shared by watcher.py and ingest.py.
Both modules write structured failure rows so the SettingsPanel "Ingest Health"
view sees the same shape regardless of ingest path. Functions take an explicit
conn parameter; the caller decides transaction boundaries and exception
handling. Both current callers wrap with their own log-and-swallow shims.
"""
def record_ingest_failure(conn, source: str, filepath, error: str) -> None:
"""Insert or update an ingest_failures row. Commits."""
cur = conn.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (source, str(filepath), error[:1000]))
conn.commit()
def resolve_ingest_failure(conn, source: str) -> None:
"""Mark a previously failed source as resolved. Commits."""
cur = conn.cursor()
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,))
conn.commit()
+111 -130
View File
@@ -1,70 +1,37 @@
"""
Aaron AI bulk ingester. Two entry points:
- ingest_directory(folder, embedder=None) — programmatic; called from
api.py /api/reindex with the api process's shared embedder
- python3 scripts/ingest.py <folder> — CLI back-compat; loads its own embedder
Stage 1 helpers (extract / chunk / embed / write) live in scripts/encoding.py.
Failure tracking SQL lives in scripts/failures.py.
"""
import os import os
import sys import sys
import hashlib
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
import psycopg2 import psycopg2
import psycopg2.extras
import json
from sentence_transformers import SentenceTransformer from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
from pptx import Presentation from failures import (
record_ingest_failure as _record_failure_sql,
resolve_ingest_failure as _resolve_failure_sql,
)
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
PG_DSN = os.getenv("PG_DSN") PG_DSN = os.getenv("PG_DSN")
def get_pg(): def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_text_from_pdf(path):
reader = PdfReader(path)
text = ""
for page in reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
def extract_text_from_pptx(path):
prs = Presentation(path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
text += shape.text + "\n"
return text
def extract_text_from_txt(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def chunk_text(text, chunk_size=500, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def enqueue_stage2(source, full_text): def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) Stage 3 (Graphiti ingest). """Enqueue document for Stage 2 (Mistral orientation) -> Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly. and the watcher calls Stage 2 directly.
""" """
@@ -87,94 +54,108 @@ def enqueue_stage2(source, full_text):
except Exception as e: except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}") print(f" Stage 2 queue insert failed (non-fatal): {e}")
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
if path.name.startswith("~$") or path.name.startswith("."):
return 0
def _record_failure(filepath: Path, error: str) -> None:
try: try:
if suffix == ".docx":
text = extract_text_from_docx(path)
elif suffix == ".pdf":
text = extract_text_from_pdf(path)
elif suffix == ".pptx":
text = extract_text_from_pptx(path)
elif suffix in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
return 0
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
embeddings = embedder.encode(chunks).tolist()
ids = [make_id(path, i) for i in range(len(chunks))]
metadatas = [{
"source": path.name,
"filepath": str(path),
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
# STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti)
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas): _record_failure_sql(pg, filepath.name, filepath, error)
cur.execute(""" finally:
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get("source"), "document", None,
json.dumps(meta)
))
pg.commit()
pg.close() pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(path.name, text)
return len(chunks)
except Exception as e: except Exception as e:
print(f" Error: {path.name}: {e}") print(f" Could not record ingest failure (non-fatal): {e}")
def _resolve_failure(source: str) -> None:
try:
pg = get_pg()
try:
_resolve_failure_sql(pg, source)
finally:
pg.close()
except Exception as e:
print(f" Could not resolve ingest failure record (non-fatal): {e}")
def _ingest_one(filepath: Path, embedder, root: Path = None) -> int:
"""Ingest a single file. Returns chunk count, 0 on skip/failure."""
if filepath.name.startswith(("~$", ".")):
return 0 return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
_record_failure(filepath, "Text extraction failed or empty")
return 0
folder_rel = None
if root is not None:
try:
folder_rel = str(filepath.parent.relative_to(root))
except ValueError:
pass
try:
rows = chunk_and_embed(text, filepath.name, embedder,
filepath=filepath, folder=folder_rel)
except Exception as e:
_record_failure(filepath, f"Embedding failed: {e}")
return 0
if not rows:
return 0
try:
pg = get_pg()
try:
write_embeddings_batch(pg, rows)
finally:
pg.close()
except Exception as e:
_record_failure(filepath, f"pgvector write failed: {e}")
return 0
print(f" Indexed {len(rows)} chunks: {filepath.name}")
_resolve_failure(filepath.name)
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(filepath.name, text)
return len(rows)
def ingest_directory(folder, embedder=None) -> dict:
"""Programmatic entry point. Returns {scanned, ingested, failed, total_chunks}.
If embedder is None, loads its own SentenceTransformer (CLI back-compat path).
Caller (e.g. api.py /api/reindex) should pass its module-level embedder so
the ~200MB model isn't reloaded per call.
"""
folder = Path(folder)
if not folder.exists():
return {"scanned": 0, "ingested": 0, "failed": 0, "total_chunks": 0,
"error": f"folder not found: {folder}"}
if embedder is None:
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
files = [f for f in folder.rglob("*")
if f.suffix.lower() in SUPPORTED
and not f.name.startswith(("~$", "."))]
print(f"Found {len(files)} files to process")
ingested = failed = total_chunks = 0
for f in files:
n = _ingest_one(f, embedder, root=folder)
if n > 0:
ingested += 1
total_chunks += n
else:
failed += 1
return {"scanned": len(files), "ingested": ingested, "failed": failed,
"total_chunks": total_chunks}
def ingest_folder(folder_path): def ingest_folder(folder_path):
folder = Path(folder_path) """CLI back-compat wrapper. Loads its own embedder."""
if not folder.exists(): result = ingest_directory(Path(folder_path))
print(f"Folder not found: {folder_path}") print(f"\nDone. {result['ingested']} files / {result['total_chunks']} chunks indexed; "
sys.exit(1) f"{result['failed']} failed.")
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
files = [f for f in folder.rglob("*")
if f.suffix.lower() in supported
and not f.name.startswith("~$")
and not f.name.startswith(".")]
if not files:
print("No supported files found.")
sys.exit(1)
print(f"Found {len(files)} files to process\n")
total_chunks = 0
for f in files:
total_chunks += ingest_file(f)
print(f"\nDone. Total chunks indexed: {total_chunks}")
if __name__ == "__main__": if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs") target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
+29 -85
View File
@@ -19,7 +19,6 @@ Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3
import os import os
import time import time
import json import json
import hashlib
import logging import logging
import threading import threading
from pathlib import Path from pathlib import Path
@@ -30,9 +29,11 @@ from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
from pypdf import PdfReader from failures import (
from pptx import Presentation record_ingest_failure as _record_failure_sql,
resolve_ingest_failure as _resolve_failure_sql,
)
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
@@ -42,10 +43,7 @@ STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat" HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120 DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2" EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN") PG_DSN = os.getenv("PG_DSN")
@@ -76,49 +74,6 @@ def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
record_ingest_failure(path, f"Text extraction failed: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str): def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"): if os.getenv("SKIP_STAGE2_ENQUEUE"):
return return
@@ -143,20 +98,14 @@ def enqueue_stage2(source: str, full_text: str):
def record_ingest_failure(filepath: Path, error: str): def record_ingest_failure(filepath: Path, error: str):
"""Write extraction or ingest failure to ingest_failures table for UI visibility.""" """Write extraction or ingest failure to ingest_failures table for UI visibility.
Local wrapper around failures.record_ingest_failure — opens conn, delegates,
logs non-fatal errors so the caller never has to handle them."""
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
cur.execute(""" _record_failure_sql(pg, filepath.name, filepath, error)
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) finally:
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (filepath.name, str(filepath), error[:1000]))
pg.commit()
pg.close() pg.close()
except Exception as e: except Exception as e:
log.warning(f"Could not record ingest failure (non-fatal): {e}") log.warning(f"Could not record ingest failure (non-fatal): {e}")
@@ -166,9 +115,9 @@ def resolve_ingest_failure(source: str):
"""Mark a previously failed file as resolved after successful ingest.""" """Mark a previously failed file as resolved after successful ingest."""
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,)) _resolve_failure_sql(pg, source)
pg.commit() finally:
pg.close() pg.close()
except Exception as e: except Exception as e:
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}") log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
@@ -181,42 +130,37 @@ def ingest_file(filepath: Path, embedder) -> int:
return 0 return 0
text = extract_text(filepath) text = extract_text(filepath)
if not text.strip(): if not text.strip():
record_ingest_failure(filepath, "Text extraction failed or empty")
return 0 return 0
chunks = chunk_text(text) folder_rel = None
if not chunks:
return 0
try: try:
embeddings = embedder.encode(chunks).tolist() folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH))
except ValueError:
pass
try:
rows = chunk_and_embed(text, filepath.name, embedder,
filepath=filepath, folder=folder_rel)
except Exception as e: except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}") log.error(f"Embedding failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"Embedding failed: {e}") record_ingest_failure(filepath, f"Embedding failed: {e}")
return 0 return 0
if not rows:
return 0
source = filepath.name source = filepath.name
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): write_embeddings_batch(pg, rows)
chunk_id = make_chunk_id(filepath, i) finally:
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close() pg.close()
except Exception as e: except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}") log.error(f"pgvector write failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"pgvector write failed: {e}") record_ingest_failure(filepath, f"pgvector write failed: {e}")
return 0 return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}") log.info(f"Indexed {len(rows)} chunks: {filepath.name}")
resolve_ingest_failure(source) resolve_ingest_failure(source)
enqueue_stage2(source, text) enqueue_stage2(source, text)
return len(chunks) return len(rows)
def ingest_files(paths: list, embedder, state: dict) -> dict: def ingest_files(paths: list, embedder, state: dict) -> dict: