5b4a299414
Adds an optional commit=True parameter to write_embeddings_batch. When True (default, matching prior behavior), the function commits the connection after the per-row UPSERT loop. When False, the caller manages the transaction. This unblocks fix #1 (pgvector-bypass paths) and fix #2 (watcher two-transaction pattern), both of which need to compose embeddings writes with other database writes in the same transaction. Without this lever, either fix would require duplicating the UPSERT logic outside this helper or introducing a second commit boundary inside an otherwise atomic operation. No behavior change for existing callers — they all use the default commit=True and continue working unchanged.
244 lines
8.8 KiB
Python
244 lines
8.8 KiB
Python
"""
|
|
Aaron AI Stage 1 encoding helpers — single canonical implementation of:
|
|
- extract_text(filepath) — four-extension text extraction
|
|
- chunk_text(text, chunk_size, overlap) — word-based chunking
|
|
- chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows
|
|
- write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT
|
|
|
|
Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry.
|
|
Replaces four separate extract reimplementations and two extract-chunk-embed paths.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import re
|
|
from pathlib import Path
|
|
|
|
from docx import Document as DocxDocument
|
|
from pypdf import PdfReader
|
|
from pptx import Presentation
|
|
|
|
log = logging.getLogger("encoding")
|
|
|
|
SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
|
|
DEFAULT_CHUNK_SIZE = 500
|
|
DEFAULT_CHUNK_OVERLAP = 50
|
|
|
|
_BOLD_KV_RE = re.compile(r"^\*\*[\w +/-]+?:\*\*")
|
|
|
|
|
|
def _strip_md_frontmatter(text: str) -> str:
|
|
"""Strip a leading frontmatter block from markdown, if present.
|
|
|
|
Recognizes two formats:
|
|
- YAML-style: file's first non-empty line is `---`, terminated by `---`.
|
|
Only triggered when no heading precedes — guards against `---`
|
|
horizontal rules that follow an H1.
|
|
- Capture-style: optional H1 heading, then one or more `**key:** value`
|
|
lines (and blanks), terminated by `---`. The H1 is preserved; the
|
|
key/value block + separator are removed.
|
|
|
|
Body `---` rules and body `**bold:**` lines are never touched — the scan
|
|
aborts as soon as a non-frontmatter line appears in the leading block.
|
|
"""
|
|
lines = text.splitlines()
|
|
n = len(lines)
|
|
i = 0
|
|
while i < n and not lines[i].strip():
|
|
i += 1
|
|
heading = None
|
|
if i < n and lines[i].startswith("# "):
|
|
heading = lines[i]
|
|
i += 1
|
|
while i < n and not lines[i].strip():
|
|
i += 1
|
|
if i >= n:
|
|
return text
|
|
first = lines[i].strip()
|
|
if heading is None and first == "---":
|
|
j = i + 1
|
|
while j < n and lines[j].strip() != "---":
|
|
j += 1
|
|
if j >= n:
|
|
return text
|
|
body_start = j + 1
|
|
elif _BOLD_KV_RE.match(first):
|
|
j = i
|
|
while j < n:
|
|
s = lines[j].strip()
|
|
if not s or _BOLD_KV_RE.match(s):
|
|
j += 1
|
|
continue
|
|
if s == "---":
|
|
body_start = j + 1
|
|
break
|
|
return text
|
|
else:
|
|
return text
|
|
else:
|
|
return text
|
|
body = "\n".join(lines[body_start:]).lstrip("\n")
|
|
return f"{heading}\n\n{body}" if heading else body
|
|
|
|
|
|
def _docx_cell_paragraphs(cell):
|
|
yield from (p for p in cell.paragraphs if p.text.strip())
|
|
for nested in cell.tables:
|
|
for row in nested.rows:
|
|
for c in row.cells:
|
|
yield from _docx_cell_paragraphs(c)
|
|
|
|
|
|
def _pptx_shape_text(shape):
|
|
from pptx.enum.shapes import MSO_SHAPE_TYPE
|
|
parts = []
|
|
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
|
|
for sub in shape.shapes:
|
|
parts.extend(_pptx_shape_text(sub))
|
|
return parts
|
|
if hasattr(shape, "text") and shape.text.strip():
|
|
parts.append(shape.text)
|
|
if getattr(shape, "has_table", False):
|
|
for cell in shape.table.iter_cells():
|
|
if cell.text.strip():
|
|
parts.append(cell.text)
|
|
return parts
|
|
|
|
|
|
def extract_text(filepath: Path) -> str:
|
|
"""Return the text of a supported file. Returns "" on any failure or
|
|
unsupported extension. Does not write to ingest_failures — caller decides."""
|
|
suffix = filepath.suffix.lower()
|
|
try:
|
|
if suffix == ".docx":
|
|
doc = DocxDocument(filepath)
|
|
parts = [p.text for p in doc.paragraphs if p.text.strip()]
|
|
for tbl in doc.tables:
|
|
for row in tbl.rows:
|
|
for cell in row.cells:
|
|
parts.extend(p.text for p in _docx_cell_paragraphs(cell))
|
|
for section in doc.sections:
|
|
parts.extend(p.text for p in section.header.paragraphs if p.text.strip())
|
|
parts.extend(p.text for p in section.footer.paragraphs if p.text.strip())
|
|
from docx.oxml.ns import qn
|
|
for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")):
|
|
for p in txbx.findall(".//" + qn("w:p")):
|
|
text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t")))
|
|
if text.strip():
|
|
parts.append(text)
|
|
return "\n".join(parts)
|
|
elif suffix == ".pdf":
|
|
reader = PdfReader(filepath)
|
|
return "".join(
|
|
page.extract_text() + "\n"
|
|
for page in reader.pages if page.extract_text()
|
|
)
|
|
elif suffix == ".pptx":
|
|
prs = Presentation(filepath)
|
|
parts = []
|
|
for slide in prs.slides:
|
|
for shape in slide.shapes:
|
|
parts.extend(_pptx_shape_text(shape))
|
|
if slide.has_notes_slide:
|
|
notes = slide.notes_slide.notes_text_frame.text
|
|
if notes.strip():
|
|
parts.append(notes)
|
|
return "\n".join(parts)
|
|
elif suffix in {".txt", ".md"}:
|
|
text = filepath.read_text(encoding="utf-8", errors="ignore")
|
|
if suffix == ".md":
|
|
return _strip_md_frontmatter(text)
|
|
return text
|
|
except Exception as e:
|
|
log.warning(f"Text extraction failed for {filepath.name}: {e}")
|
|
return ""
|
|
|
|
|
|
def chunk_text(text: str,
|
|
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
|
overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]:
|
|
"""Word-based chunking. Empty chunks filtered."""
|
|
words = text.split()
|
|
chunks = []
|
|
start = 0
|
|
while start < len(words):
|
|
chunk = " ".join(words[start:start + chunk_size])
|
|
if chunk.strip():
|
|
chunks.append(chunk)
|
|
start += chunk_size - overlap
|
|
return chunks
|
|
|
|
|
|
def _chunk_id(filepath, source: str, index: int) -> str:
|
|
basis = str(filepath) if filepath else source
|
|
return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}"
|
|
|
|
|
|
def chunk_and_embed(text: str,
|
|
source: str,
|
|
embedder,
|
|
filepath=None,
|
|
folder=None) -> list[dict]:
|
|
"""Chunk text, embed each chunk, return rows ready for write_embeddings_batch."""
|
|
chunks = chunk_text(text)
|
|
if not chunks:
|
|
return []
|
|
embeddings = embedder.encode(chunks).tolist()
|
|
rows = []
|
|
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
|
|
rows.append({
|
|
"id": _chunk_id(filepath, source, i),
|
|
"document": chunk,
|
|
"embedding": emb,
|
|
"source": source,
|
|
"type": "document",
|
|
"metadata": {
|
|
"source": source,
|
|
"filepath": str(filepath) if filepath else source,
|
|
"folder": folder,
|
|
},
|
|
})
|
|
return rows
|
|
|
|
|
|
def write_embeddings_batch(conn, batch: list[dict], commit: bool = True) -> int:
|
|
"""Single canonical INSERT. Sets created_at = NOW() server-side.
|
|
|
|
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
|
|
callers do not need to provide it. The application-layer assertion is the
|
|
primary enforcement point for type — the column lacks NOT NULL because
|
|
historical NULLs were resolved by the Improvement #2 backfill, and a
|
|
Python-level raise gives a faster, more debuggable failure than a
|
|
Postgres constraint error.
|
|
|
|
When commit=True (default), this function commits the connection itself.
|
|
When commit=False, the caller is responsible for committing. Use
|
|
commit=False when composing this write with other writes that must land
|
|
atomically in the same transaction.
|
|
"""
|
|
if not batch:
|
|
return 0
|
|
cur = conn.cursor()
|
|
for row in batch:
|
|
if not row.get("type"):
|
|
raise ValueError(
|
|
f"row {row.get('id')!r} missing 'type'; writers must supply it "
|
|
f"(see Improvement #2 in docs/birdai-component-inventory)"
|
|
)
|
|
cur.execute("""
|
|
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
|
|
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
document = EXCLUDED.document,
|
|
embedding = EXCLUDED.embedding,
|
|
source = EXCLUDED.source,
|
|
type = EXCLUDED.type,
|
|
created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at),
|
|
metadata = EXCLUDED.metadata
|
|
""", (row["id"], row["document"], row["embedding"],
|
|
row["source"], row["type"], json.dumps(row["metadata"])))
|
|
if commit:
|
|
conn.commit()
|
|
return len(batch)
|