""" Aaron AI Stage 1 encoding helpers — single canonical implementation of: - extract_text(filepath) — four-extension text extraction - chunk_text(text, chunk_size, overlap) — word-based chunking - chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows - write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry. Replaces four separate extract reimplementations and two extract-chunk-embed paths. """ import hashlib import json import logging from pathlib import Path from docx import Document as DocxDocument from pypdf import PdfReader from pptx import Presentation log = logging.getLogger("encoding") SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"} DEFAULT_CHUNK_SIZE = 500 DEFAULT_CHUNK_OVERLAP = 50 def extract_text(filepath: Path) -> str: """Return the text of a supported file. Returns "" on any failure or unsupported extension. Does not write to ingest_failures — caller decides.""" suffix = filepath.suffix.lower() try: if suffix == ".docx": doc = DocxDocument(filepath) return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) elif suffix == ".pdf": reader = PdfReader(filepath) return "".join( page.extract_text() + "\n" for page in reader.pages if page.extract_text() ) elif suffix == ".pptx": prs = Presentation(filepath) return "\n".join( shape.text for slide in prs.slides for shape in slide.shapes if hasattr(shape, "text") and shape.text.strip() ) elif suffix in {".txt", ".md"}: return filepath.read_text(encoding="utf-8", errors="ignore") except Exception as e: log.warning(f"Text extraction failed for {filepath.name}: {e}") return "" def chunk_text(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]: """Word-based chunking. Empty chunks filtered.""" words = text.split() chunks = [] start = 0 while start < len(words): chunk = " ".join(words[start:start + chunk_size]) if chunk.strip(): chunks.append(chunk) start += chunk_size - overlap return chunks def _chunk_id(filepath, source: str, index: int) -> str: basis = str(filepath) if filepath else source return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}" def chunk_and_embed(text: str, source: str, embedder, filepath=None, folder=None) -> list[dict]: """Chunk text, embed each chunk, return rows ready for write_embeddings_batch.""" chunks = chunk_text(text) if not chunks: return [] embeddings = embedder.encode(chunks).tolist() rows = [] for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): rows.append({ "id": _chunk_id(filepath, source, i), "document": chunk, "embedding": emb, "source": source, "type": "document", "metadata": { "source": source, "filepath": str(filepath) if filepath else source, "folder": folder, }, }) return rows def write_embeddings_batch(conn, batch: list[dict]) -> int: """Single canonical INSERT. Sets created_at = NOW() server-side. Commits. Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so callers do not need to provide it. The application-layer assertion is the primary enforcement point for type — the column lacks NOT NULL because historical NULLs were resolved by the Improvement #2 backfill, and a Python-level raise gives a faster, more debuggable failure than a Postgres constraint error. """ if not batch: return 0 cur = conn.cursor() for row in batch: if not row.get("type"): raise ValueError( f"row {row.get('id')!r} missing 'type'; writers must supply it " f"(see Improvement #2 in docs/birdai-component-inventory)" ) cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document, embedding = EXCLUDED.embedding, source = EXCLUDED.source, type = EXCLUDED.type, created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at), metadata = EXCLUDED.metadata """, (row["id"], row["document"], row["embedding"], row["source"], row["type"], json.dumps(row["metadata"]))) conn.commit() return len(batch)