Files
aaron 9955c7e383 encoding: per-slide pptx chunking + extract_blocks API; api: recency tiebreak
extract_blocks(filepath) is the new structured-extraction entry point, returning
list[{heading, text, kind}]. chunk_and_embed accepts either str (blind-chunk
back-compat) or list[dict] (one chunk per block, blind-split if oversize, heading
prepended for retrieval context and stored in metadata).

- pptx: one block per slide. Slide title becomes block heading; speaker notes
  fold into the body. Image-only decks with title-only slides now produce
  heading-only chunks instead of being recorded as extraction failures.
- docx: deliberately single-block (back-compat). Heading-style section detection
  was implemented and rolled back: hand-formatted CVs are Normal-styled with
  bold-as-heading, and tying chunk boundaries to formatting choices would lock
  future-user into preserving those choices forever. Lexical + cross-encoder
  retrieval already handles substring matching inside blind-chunked CVs.
- pdf/txt/md: unchanged (single block, blind chunking).

Recency tiebreak in retrieve_context: pull created_at into the SELECT, use it
as secondary sort key in _rerank so memory/journal snapshots prefer the latest
copy among near-duplicate content.

reindex_docx_pptx.py now accepts --ext=pptx,docx... so re-ingest can target a
subset; previous hardcoded delete regex would have wiped both even with a
single-ext target.
2026-05-19 21:58:25 +00:00

332 lines
12 KiB
Python

"""
Aaron AI Stage 1 encoding helpers — single canonical implementation of:
- extract_blocks(filepath) — section-aware extraction (docx heading-bounded
sections, pptx per-slide, pdf/txt/md single-block)
- extract_text(filepath) — back-compat string concatenation over blocks
- chunk_text(text, chunk_size, overlap) — word-based blind chunking
- chunk_and_embed(text_or_blocks, source, embedder, filepath, folder) —
produce ready-to-write rows. Accepts str (blind) or list[dict] (section-aware).
- write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT
Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry.
"""
import hashlib
import json
import logging
import re
from pathlib import Path
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
log = logging.getLogger("encoding")
SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
DEFAULT_CHUNK_SIZE = 500
DEFAULT_CHUNK_OVERLAP = 50
_BOLD_KV_RE = re.compile(r"^\*\*[\w +/-]+?:\*\*")
def _strip_md_frontmatter(text: str) -> str:
"""Strip a leading frontmatter block from markdown, if present.
Recognizes two formats:
- YAML-style: file's first non-empty line is `---`, terminated by `---`.
Only triggered when no heading precedes — guards against `---`
horizontal rules that follow an H1.
- Capture-style: optional H1 heading, then one or more `**key:** value`
lines (and blanks), terminated by `---`. The H1 is preserved; the
key/value block + separator are removed.
Body `---` rules and body `**bold:**` lines are never touched — the scan
aborts as soon as a non-frontmatter line appears in the leading block.
"""
lines = text.splitlines()
n = len(lines)
i = 0
while i < n and not lines[i].strip():
i += 1
heading = None
if i < n and lines[i].startswith("# "):
heading = lines[i]
i += 1
while i < n and not lines[i].strip():
i += 1
if i >= n:
return text
first = lines[i].strip()
if heading is None and first == "---":
j = i + 1
while j < n and lines[j].strip() != "---":
j += 1
if j >= n:
return text
body_start = j + 1
elif _BOLD_KV_RE.match(first):
j = i
while j < n:
s = lines[j].strip()
if not s or _BOLD_KV_RE.match(s):
j += 1
continue
if s == "---":
body_start = j + 1
break
return text
else:
return text
else:
return text
body = "\n".join(lines[body_start:]).lstrip("\n")
return f"{heading}\n\n{body}" if heading else body
def _docx_cell_paragraphs(cell):
yield from (p for p in cell.paragraphs if p.text.strip())
for nested in cell.tables:
for row in nested.rows:
for c in row.cells:
yield from _docx_cell_paragraphs(c)
def _pptx_shape_text(shape):
from pptx.enum.shapes import MSO_SHAPE_TYPE
parts = []
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub in shape.shapes:
parts.extend(_pptx_shape_text(sub))
return parts
if hasattr(shape, "text") and shape.text.strip():
parts.append(shape.text)
if getattr(shape, "has_table", False):
for cell in shape.table.iter_cells():
if cell.text.strip():
parts.append(cell.text)
return parts
def _extract_docx_blocks(filepath: Path) -> list[dict]:
"""Return docx content as a single block. Earlier attempt at section-aware
chunking via Heading styles was rolled back: the user's docs are mostly
Normal-styled with bold-as-heading, and tying chunk boundaries to formatting
choices locks future-them into preserving those choices forever. Lexical
+ cross-encoder retrieval already finds the right substrings within a
blind-chunked CV, so the section structure isn't load-bearing for retrieval."""
from docx.oxml.ns import qn
doc = DocxDocument(filepath)
parts = [p.text for p in doc.paragraphs if p.text.strip()]
for tbl in doc.tables:
for row in tbl.rows:
for cell in row.cells:
parts.extend(p.text for p in _docx_cell_paragraphs(cell))
for section in doc.sections:
parts.extend(p.text for p in section.header.paragraphs if p.text.strip())
parts.extend(p.text for p in section.footer.paragraphs if p.text.strip())
for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")):
for p in txbx.findall(".//" + qn("w:p")):
text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t")))
if text.strip():
parts.append(text)
text = "\n".join(parts)
return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else []
def _extract_pptx_blocks(filepath: Path) -> list[dict]:
"""One block per slide. Heading = slide title (or 'Slide N' fallback).
Body = non-title shape text + speaker notes."""
prs = Presentation(filepath)
blocks = []
for i, slide in enumerate(prs.slides, 1):
title_shape = None
try:
title_shape = slide.shapes.title
except (AttributeError, KeyError):
pass
title = None
body_parts = []
for shape in slide.shapes:
if title_shape is not None and shape == title_shape and shape.has_text_frame:
title = shape.text_frame.text.strip() or None
continue
body_parts.extend(_pptx_shape_text(shape))
if slide.has_notes_slide:
notes = slide.notes_slide.notes_text_frame.text
if notes.strip():
body_parts.append(f"[Notes] {notes}")
if title or body_parts:
blocks.append({
"heading": title or f"Slide {i}",
"text": "\n".join(body_parts),
"kind": "slide",
})
return blocks
def extract_blocks(filepath: Path) -> list[dict]:
"""Structured extraction. Returns list of {heading, text, kind} blocks.
- docx: section-aware via Heading-style paragraphs (kind='section').
- pptx: one block per slide (kind='slide').
- pdf/txt/md: single block, no heading (kind='doc').
Empty list on any failure or unsupported extension."""
suffix = filepath.suffix.lower()
try:
if suffix == ".docx":
return _extract_docx_blocks(filepath)
if suffix == ".pptx":
return _extract_pptx_blocks(filepath)
if suffix == ".pdf":
reader = PdfReader(filepath)
text = "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else []
if suffix in {".txt", ".md"}:
text = filepath.read_text(encoding="utf-8", errors="ignore")
if suffix == ".md":
text = _strip_md_frontmatter(text)
return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else []
except Exception as e:
log.warning(f"Extraction failed for {filepath.name}: {e}")
return []
def extract_text(filepath: Path) -> str:
"""Back-compat wrapper: concatenate extract_blocks() output. Section
structure is lost; use extract_blocks() directly for chunking."""
blocks = extract_blocks(filepath)
parts = []
for b in blocks:
if b.get("heading"):
parts.append(b["heading"])
if b.get("text"):
parts.append(b["text"])
return "\n".join(parts)
def chunk_text(text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]:
"""Word-based chunking. Empty chunks filtered."""
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + chunk_size])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def _chunk_id(filepath, source: str, index: int) -> str:
basis = str(filepath) if filepath else source
return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}"
def chunk_and_embed(text_or_blocks,
source: str,
embedder,
filepath=None,
folder=None) -> list[dict]:
"""Chunk + embed for write_embeddings_batch. Accepts either:
- str: blind chunking with 500-word windows (pdf/txt/md legacy path).
- list[dict]: section-aware path (docx Heading-bounded sections, pptx
slides). Each block emits one chunk if its text fits within
DEFAULT_CHUNK_SIZE words, otherwise is blind-split with overlap.
The block heading is prepended to the chunk text (so retrieval sees the
section context) and stored in metadata as heading/kind."""
if isinstance(text_or_blocks, str):
blocks = [{"heading": None, "text": text_or_blocks, "kind": "doc"}]
else:
blocks = text_or_blocks
chunks = []
for block in blocks:
body = block.get("text") or ""
heading = block.get("heading")
kind = block.get("kind", "doc")
if not body.strip() and not (heading and heading.strip()):
continue
if heading and body.strip():
contextualized = f"{heading}\n\n{body}"
elif heading:
contextualized = heading
else:
contextualized = body
if len(contextualized.split()) <= DEFAULT_CHUNK_SIZE:
chunks.append((contextualized, heading, kind))
else:
for sub in chunk_text(contextualized):
chunks.append((sub, heading, kind))
if not chunks:
return []
embeddings = embedder.encode([c[0] for c in chunks]).tolist()
rows = []
for i, ((chunk, heading, kind), emb) in enumerate(zip(chunks, embeddings)):
rows.append({
"id": _chunk_id(filepath, source, i),
"document": chunk,
"embedding": emb,
"source": source,
"type": "document",
"metadata": {
"source": source,
"filepath": str(filepath) if filepath else source,
"folder": folder,
"heading": heading,
"kind": kind,
},
})
return rows
def write_embeddings_batch(conn, batch: list[dict], commit: bool = True) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side.
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
callers do not need to provide it. The application-layer assertion is the
primary enforcement point for type — the column lacks NOT NULL because
historical NULLs were resolved by the Improvement #2 backfill, and a
Python-level raise gives a faster, more debuggable failure than a
Postgres constraint error.
When commit=True (default), this function commits the connection itself.
When commit=False, the caller is responsible for committing. Use
commit=False when composing this write with other writes that must land
atomically in the same transaction.
"""
if not batch:
return 0
cur = conn.cursor()
for row in batch:
if not row.get("type"):
raise ValueError(
f"row {row.get('id')!r} missing 'type'; writers must supply it "
f"(see Improvement #2 in docs/birdai-component-inventory)"
)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at),
metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"])))
if commit:
conn.commit()
return len(batch)