diff --git a/scripts/api.py b/scripts/api.py index 3fac813..9073679 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -302,14 +302,19 @@ def classify_retrieval_intent(query: str): def _rerank(query: str, candidates: list[tuple]) -> list[tuple]: - """Cross-encoder rerank. Candidates are (id, document, source, folder) tuples. - Returns the same tuples reordered by reranker score (highest first).""" + """Cross-encoder rerank. Candidates are (id, document, source, folder, created_at) + tuples. Returns the same tuples reordered by reranker score with created_at as + secondary key — so when two chunks score similarly the newer one wins, which + keeps memory/journal files biased toward the latest snapshot.""" if not candidates: return [] pairs = [(query, row[1]) for row in candidates] scores = reranker.predict(pairs) - return [row for row, _ in sorted(zip(candidates, scores), - key=lambda x: x[1], reverse=True)] + return [row for row, _ in sorted( + zip(candidates, scores), + key=lambda x: (float(x[1]), x[0][4] or ""), + reverse=True, + )] def _format_source(source: str, folder: str) -> str: @@ -374,7 +379,7 @@ def retrieve_context(query, n_results=FINAL_LIMIT, cur.execute("SET LOCAL hnsw.ef_search = 500") cur.execute(f""" - SELECT id, document, source, metadata->>'folder' AS folder + SELECT id, document, source, metadata->>'folder' AS folder, created_at FROM embeddings {common_where} ORDER BY embedding <=> %s::vector @@ -387,7 +392,7 @@ def retrieve_context(query, n_results=FINAL_LIMIT, lex_match = "to_tsvector('english', document) @@ websearch_to_tsquery('english', %s)" lex_where = ("WHERE " + " AND ".join([lex_match] + where_clauses)) cur.execute(f""" - SELECT id, document, source, metadata->>'folder' AS folder + SELECT id, document, source, metadata->>'folder' AS folder, created_at FROM embeddings {lex_where} ORDER BY ts_rank(to_tsvector('english', document), @@ -411,7 +416,7 @@ def retrieve_context(query, n_results=FINAL_LIMIT, candidates = [rows_by_id[doc_id] for doc_id, _ in rrf_ranked] seen = set() - for _id, doc, source, folder in _rerank(query, candidates): + for _id, doc, source, folder, _created_at in _rerank(query, candidates): key = _dedup_key(doc) if key in seen: continue diff --git a/scripts/encoding.py b/scripts/encoding.py index a3db6bb..96d9dfe 100644 --- a/scripts/encoding.py +++ b/scripts/encoding.py @@ -1,12 +1,14 @@ """ Aaron AI Stage 1 encoding helpers — single canonical implementation of: - - extract_text(filepath) — four-extension text extraction - - chunk_text(text, chunk_size, overlap) — word-based chunking - - chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows + - extract_blocks(filepath) — section-aware extraction (docx heading-bounded + sections, pptx per-slide, pdf/txt/md single-block) + - extract_text(filepath) — back-compat string concatenation over blocks + - chunk_text(text, chunk_size, overlap) — word-based blind chunking + - chunk_and_embed(text_or_blocks, source, embedder, filepath, folder) — + produce ready-to-write rows. Accepts str (blind) or list[dict] (section-aware). - write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry. -Replaces four separate extract reimplementations and two extract-chunk-embed paths. """ import hashlib @@ -106,53 +108,106 @@ def _pptx_shape_text(shape): return parts -def extract_text(filepath: Path) -> str: - """Return the text of a supported file. Returns "" on any failure or - unsupported extension. Does not write to ingest_failures — caller decides.""" +def _extract_docx_blocks(filepath: Path) -> list[dict]: + """Return docx content as a single block. Earlier attempt at section-aware + chunking via Heading styles was rolled back: the user's docs are mostly + Normal-styled with bold-as-heading, and tying chunk boundaries to formatting + choices locks future-them into preserving those choices forever. Lexical + + cross-encoder retrieval already finds the right substrings within a + blind-chunked CV, so the section structure isn't load-bearing for retrieval.""" + from docx.oxml.ns import qn + + doc = DocxDocument(filepath) + parts = [p.text for p in doc.paragraphs if p.text.strip()] + for tbl in doc.tables: + for row in tbl.rows: + for cell in row.cells: + parts.extend(p.text for p in _docx_cell_paragraphs(cell)) + for section in doc.sections: + parts.extend(p.text for p in section.header.paragraphs if p.text.strip()) + parts.extend(p.text for p in section.footer.paragraphs if p.text.strip()) + for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")): + for p in txbx.findall(".//" + qn("w:p")): + text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t"))) + if text.strip(): + parts.append(text) + text = "\n".join(parts) + return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else [] + + +def _extract_pptx_blocks(filepath: Path) -> list[dict]: + """One block per slide. Heading = slide title (or 'Slide N' fallback). + Body = non-title shape text + speaker notes.""" + prs = Presentation(filepath) + blocks = [] + for i, slide in enumerate(prs.slides, 1): + title_shape = None + try: + title_shape = slide.shapes.title + except (AttributeError, KeyError): + pass + title = None + body_parts = [] + for shape in slide.shapes: + if title_shape is not None and shape == title_shape and shape.has_text_frame: + title = shape.text_frame.text.strip() or None + continue + body_parts.extend(_pptx_shape_text(shape)) + if slide.has_notes_slide: + notes = slide.notes_slide.notes_text_frame.text + if notes.strip(): + body_parts.append(f"[Notes] {notes}") + if title or body_parts: + blocks.append({ + "heading": title or f"Slide {i}", + "text": "\n".join(body_parts), + "kind": "slide", + }) + return blocks + + +def extract_blocks(filepath: Path) -> list[dict]: + """Structured extraction. Returns list of {heading, text, kind} blocks. + + - docx: section-aware via Heading-style paragraphs (kind='section'). + - pptx: one block per slide (kind='slide'). + - pdf/txt/md: single block, no heading (kind='doc'). + + Empty list on any failure or unsupported extension.""" suffix = filepath.suffix.lower() try: if suffix == ".docx": - doc = DocxDocument(filepath) - parts = [p.text for p in doc.paragraphs if p.text.strip()] - for tbl in doc.tables: - for row in tbl.rows: - for cell in row.cells: - parts.extend(p.text for p in _docx_cell_paragraphs(cell)) - for section in doc.sections: - parts.extend(p.text for p in section.header.paragraphs if p.text.strip()) - parts.extend(p.text for p in section.footer.paragraphs if p.text.strip()) - from docx.oxml.ns import qn - for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")): - for p in txbx.findall(".//" + qn("w:p")): - text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t"))) - if text.strip(): - parts.append(text) - return "\n".join(parts) - elif suffix == ".pdf": + return _extract_docx_blocks(filepath) + if suffix == ".pptx": + return _extract_pptx_blocks(filepath) + if suffix == ".pdf": reader = PdfReader(filepath) - return "".join( + text = "".join( page.extract_text() + "\n" for page in reader.pages if page.extract_text() ) - elif suffix == ".pptx": - prs = Presentation(filepath) - parts = [] - for slide in prs.slides: - for shape in slide.shapes: - parts.extend(_pptx_shape_text(shape)) - if slide.has_notes_slide: - notes = slide.notes_slide.notes_text_frame.text - if notes.strip(): - parts.append(notes) - return "\n".join(parts) - elif suffix in {".txt", ".md"}: + return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else [] + if suffix in {".txt", ".md"}: text = filepath.read_text(encoding="utf-8", errors="ignore") if suffix == ".md": - return _strip_md_frontmatter(text) - return text + text = _strip_md_frontmatter(text) + return [{"heading": None, "text": text, "kind": "doc"}] if text.strip() else [] except Exception as e: - log.warning(f"Text extraction failed for {filepath.name}: {e}") - return "" + log.warning(f"Extraction failed for {filepath.name}: {e}") + return [] + + +def extract_text(filepath: Path) -> str: + """Back-compat wrapper: concatenate extract_blocks() output. Section + structure is lost; use extract_blocks() directly for chunking.""" + blocks = extract_blocks(filepath) + parts = [] + for b in blocks: + if b.get("heading"): + parts.append(b["heading"]) + if b.get("text"): + parts.append(b["text"]) + return "\n".join(parts) def chunk_text(text: str, @@ -175,18 +230,49 @@ def _chunk_id(filepath, source: str, index: int) -> str: return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}" -def chunk_and_embed(text: str, +def chunk_and_embed(text_or_blocks, source: str, embedder, filepath=None, folder=None) -> list[dict]: - """Chunk text, embed each chunk, return rows ready for write_embeddings_batch.""" - chunks = chunk_text(text) + """Chunk + embed for write_embeddings_batch. Accepts either: + + - str: blind chunking with 500-word windows (pdf/txt/md legacy path). + - list[dict]: section-aware path (docx Heading-bounded sections, pptx + slides). Each block emits one chunk if its text fits within + DEFAULT_CHUNK_SIZE words, otherwise is blind-split with overlap. + + The block heading is prepended to the chunk text (so retrieval sees the + section context) and stored in metadata as heading/kind.""" + if isinstance(text_or_blocks, str): + blocks = [{"heading": None, "text": text_or_blocks, "kind": "doc"}] + else: + blocks = text_or_blocks + + chunks = [] + for block in blocks: + body = block.get("text") or "" + heading = block.get("heading") + kind = block.get("kind", "doc") + if not body.strip() and not (heading and heading.strip()): + continue + if heading and body.strip(): + contextualized = f"{heading}\n\n{body}" + elif heading: + contextualized = heading + else: + contextualized = body + if len(contextualized.split()) <= DEFAULT_CHUNK_SIZE: + chunks.append((contextualized, heading, kind)) + else: + for sub in chunk_text(contextualized): + chunks.append((sub, heading, kind)) + if not chunks: return [] - embeddings = embedder.encode(chunks).tolist() + embeddings = embedder.encode([c[0] for c in chunks]).tolist() rows = [] - for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): + for i, ((chunk, heading, kind), emb) in enumerate(zip(chunks, embeddings)): rows.append({ "id": _chunk_id(filepath, source, i), "document": chunk, @@ -197,6 +283,8 @@ def chunk_and_embed(text: str, "source": source, "filepath": str(filepath) if filepath else source, "folder": folder, + "heading": heading, + "kind": kind, }, }) return rows diff --git a/scripts/ingest.py b/scripts/ingest.py index 9cbfa6d..76bc140 100644 --- a/scripts/ingest.py +++ b/scripts/ingest.py @@ -15,7 +15,7 @@ from dotenv import load_dotenv import psycopg2 from sentence_transformers import SentenceTransformer -from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED +from encoding import extract_blocks, chunk_and_embed, write_embeddings_batch, SUPPORTED from failures import ( record_ingest_failure as _record_failure_sql, resolve_ingest_failure as _resolve_failure_sql, @@ -83,8 +83,11 @@ def _ingest_one(filepath: Path, embedder, root: Path = None) -> int: return 0 if filepath.suffix.lower() not in SUPPORTED: return 0 - text = extract_text(filepath) - if not text.strip(): + blocks = extract_blocks(filepath) + if not blocks or not any( + (b.get("text") or "").strip() or (b.get("heading") or "").strip() + for b in blocks + ): _record_failure(filepath, "Text extraction failed or empty") return 0 folder_rel = None @@ -94,7 +97,7 @@ def _ingest_one(filepath: Path, embedder, root: Path = None) -> int: except ValueError: pass try: - rows = chunk_and_embed(text, filepath.name, embedder, + rows = chunk_and_embed(blocks, filepath.name, embedder, filepath=filepath, folder=folder_rel) except Exception as e: _record_failure(filepath, f"Embedding failed: {e}") @@ -113,7 +116,11 @@ def _ingest_one(filepath: Path, embedder, root: Path = None) -> int: print(f" Indexed {len(rows)} chunks: {filepath.name}") _resolve_failure(filepath.name) if not os.getenv("SKIP_STAGE2_ENQUEUE"): - enqueue_stage2(filepath.name, text) + full_text = "\n".join( + f"{b['heading']}\n{b['text']}" if b.get("heading") else b.get("text", "") + for b in blocks + ) + enqueue_stage2(filepath.name, full_text) return len(rows) diff --git a/scripts/reindex_docx_pptx.py b/scripts/reindex_docx_pptx.py index d19b3ab..2167fb3 100644 --- a/scripts/reindex_docx_pptx.py +++ b/scripts/reindex_docx_pptx.py @@ -12,6 +12,7 @@ Without --apply: dry-run. Counts files and chunks, prints a sample, writes nothi """ import os +import re import sys import time from pathlib import Path @@ -28,19 +29,29 @@ sys.path.insert(0, str(Path(__file__).parent)) from ingest import _ingest_one, get_pg NEXTCLOUD_PATH = Path("/home/aaron/nextcloud/data/data/aaron/files") -TARGET_EXTS = {".docx", ".pptx"} APPLY = "--apply" in sys.argv +_ext_args = [a for a in sys.argv[1:] if a.startswith("--ext=")] +if _ext_args: + TARGET_EXTS = {("." + e.lstrip(".")) for arg in _ext_args + for e in arg.split("=", 1)[1].split(",")} +else: + TARGET_EXTS = {".docx", ".pptx"} + + +def _ext_regex(): + inner = "|".join(re.escape(e.lstrip(".")) for e in sorted(TARGET_EXTS)) + return f"\\.({inner})$" def count_stale(): pg = get_pg() cur = pg.cursor() cur.execute( - "SELECT lower(substring(source from '\\.[^.]+$')) AS ext, " - "COUNT(DISTINCT source) AS files, COUNT(*) AS chunks " - "FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$' " - "GROUP BY 1 ORDER BY 1" + f"SELECT lower(substring(source from '\\.[^.]+$')) AS ext, " + f"COUNT(DISTINCT source) AS files, COUNT(*) AS chunks " + f"FROM embeddings WHERE lower(source) ~ '{_ext_regex()}' " + f"GROUP BY 1 ORDER BY 1" ) rows = cur.fetchall() pg.close() @@ -50,7 +61,7 @@ def count_stale(): def delete_stale(): pg = get_pg() cur = pg.cursor() - cur.execute("DELETE FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$'") + cur.execute(f"DELETE FROM embeddings WHERE lower(source) ~ '{_ext_regex()}'") deleted = cur.rowcount pg.commit() pg.close() diff --git a/scripts/watcher.py b/scripts/watcher.py index db04703..a938591 100644 --- a/scripts/watcher.py +++ b/scripts/watcher.py @@ -29,7 +29,7 @@ from sentence_transformers import SentenceTransformer from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED +from encoding import extract_blocks, chunk_and_embed, write_embeddings_batch, SUPPORTED from failures import ( record_ingest_failure as _record_failure_sql, resolve_ingest_failure as _resolve_failure_sql, @@ -128,8 +128,11 @@ def ingest_file(filepath: Path, embedder) -> int: return 0 if filepath.suffix.lower() not in SUPPORTED: return 0 - text = extract_text(filepath) - if not text.strip(): + blocks = extract_blocks(filepath) + if not blocks or not any( + (b.get("text") or "").strip() or (b.get("heading") or "").strip() + for b in blocks + ): record_ingest_failure(filepath, "Text extraction failed or empty") return 0 folder_rel = None @@ -138,7 +141,7 @@ def ingest_file(filepath: Path, embedder) -> int: except ValueError: pass try: - rows = chunk_and_embed(text, filepath.name, embedder, + rows = chunk_and_embed(blocks, filepath.name, embedder, filepath=filepath, folder=folder_rel) except Exception as e: log.error(f"Embedding failed for {filepath.name}: {e}") @@ -159,7 +162,11 @@ def ingest_file(filepath: Path, embedder) -> int: return 0 log.info(f"Indexed {len(rows)} chunks: {filepath.name}") resolve_ingest_failure(source) - enqueue_stage2(source, text) + full_text = "\n".join( + f"{b['heading']}\n{b['text']}" if b.get("heading") else b.get("text", "") + for b in blocks + ) + enqueue_stage2(source, full_text) return len(rows)