""" Aaron AI Stage 1 encoding helpers — single canonical implementation of: - extract_text(filepath) — four-extension text extraction - chunk_text(text, chunk_size, overlap) — word-based chunking - chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows - write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry. Replaces four separate extract reimplementations and two extract-chunk-embed paths. """ import hashlib import json import logging from pathlib import Path from docx import Document as DocxDocument from pypdf import PdfReader from pptx import Presentation log = logging.getLogger("encoding") SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"} DEFAULT_CHUNK_SIZE = 500 DEFAULT_CHUNK_OVERLAP = 50 def _docx_cell_paragraphs(cell): yield from (p for p in cell.paragraphs if p.text.strip()) for nested in cell.tables: for row in nested.rows: for c in row.cells: yield from _docx_cell_paragraphs(c) def _pptx_shape_text(shape): from pptx.enum.shapes import MSO_SHAPE_TYPE parts = [] if shape.shape_type == MSO_SHAPE_TYPE.GROUP: for sub in shape.shapes: parts.extend(_pptx_shape_text(sub)) return parts if hasattr(shape, "text") and shape.text.strip(): parts.append(shape.text) if getattr(shape, "has_table", False): for cell in shape.table.iter_cells(): if cell.text.strip(): parts.append(cell.text) return parts def extract_text(filepath: Path) -> str: """Return the text of a supported file. Returns "" on any failure or unsupported extension. Does not write to ingest_failures — caller decides.""" suffix = filepath.suffix.lower() try: if suffix == ".docx": doc = DocxDocument(filepath) parts = [p.text for p in doc.paragraphs if p.text.strip()] for tbl in doc.tables: for row in tbl.rows: for cell in row.cells: parts.extend(p.text for p in _docx_cell_paragraphs(cell)) for section in doc.sections: parts.extend(p.text for p in section.header.paragraphs if p.text.strip()) parts.extend(p.text for p in section.footer.paragraphs if p.text.strip()) from docx.oxml.ns import qn for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")): for p in txbx.findall(".//" + qn("w:p")): text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t"))) if text.strip(): parts.append(text) return "\n".join(parts) elif suffix == ".pdf": reader = PdfReader(filepath) return "".join( page.extract_text() + "\n" for page in reader.pages if page.extract_text() ) elif suffix == ".pptx": prs = Presentation(filepath) parts = [] for slide in prs.slides: for shape in slide.shapes: parts.extend(_pptx_shape_text(shape)) if slide.has_notes_slide: notes = slide.notes_slide.notes_text_frame.text if notes.strip(): parts.append(notes) return "\n".join(parts) elif suffix in {".txt", ".md"}: return filepath.read_text(encoding="utf-8", errors="ignore") except Exception as e: log.warning(f"Text extraction failed for {filepath.name}: {e}") return "" def chunk_text(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]: """Word-based chunking. Empty chunks filtered.""" words = text.split() chunks = [] start = 0 while start < len(words): chunk = " ".join(words[start:start + chunk_size]) if chunk.strip(): chunks.append(chunk) start += chunk_size - overlap return chunks def _chunk_id(filepath, source: str, index: int) -> str: basis = str(filepath) if filepath else source return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}" def chunk_and_embed(text: str, source: str, embedder, filepath=None, folder=None) -> list[dict]: """Chunk text, embed each chunk, return rows ready for write_embeddings_batch.""" chunks = chunk_text(text) if not chunks: return [] embeddings = embedder.encode(chunks).tolist() rows = [] for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): rows.append({ "id": _chunk_id(filepath, source, i), "document": chunk, "embedding": emb, "source": source, "type": "document", "metadata": { "source": source, "filepath": str(filepath) if filepath else source, "folder": folder, }, }) return rows def write_embeddings_batch(conn, batch: list[dict]) -> int: """Single canonical INSERT. Sets created_at = NOW() server-side. Commits. Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so callers do not need to provide it. The application-layer assertion is the primary enforcement point for type — the column lacks NOT NULL because historical NULLs were resolved by the Improvement #2 backfill, and a Python-level raise gives a faster, more debuggable failure than a Postgres constraint error. """ if not batch: return 0 cur = conn.cursor() for row in batch: if not row.get("type"): raise ValueError( f"row {row.get('id')!r} missing 'type'; writers must supply it " f"(see Improvement #2 in docs/birdai-component-inventory)" ) cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) ON CONFLICT (id) DO UPDATE SET document = EXCLUDED.document, embedding = EXCLUDED.embedding, source = EXCLUDED.source, type = EXCLUDED.type, created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at), metadata = EXCLUDED.metadata """, (row["id"], row["document"], row["embedding"], row["source"], row["type"], json.dumps(row["metadata"]))) conn.commit() return len(batch)