Files
aaronAI/scripts/ingest.py
T
aaron 9bb083f065 chat: cap retrieve_documents per turn, truncate displayed citations, broaden lock-file skip
- MAX_RETRIEVALS_PER_TURN (5): after five retrieve_documents calls in a single
  turn, further calls return a budget-exhausted message instead of executing.
  Caps cost on runaway multi-query loops without forbidding compound questions.

- MAX_CITED_SOURCES (5): accumulated_sources was growing to 14+ entries across
  multiple tool calls and showing chunks Claude never actually used. Cap the
  list returned to the UI at 5, preserving insertion order so the
  highest-relevance early-call results survive. Proper fix (Claude-driven
  inline citations) is bigger work, noted for later.

- ingest.py lock-file skip: changed prefix tuple from ("~$", ".") to ("~", ".")
  so it catches Office lock files even when Nextcloud's filesystem encoding has
  mangled the "$" into a unicode replacement char. Matches what watcher.py
  already does.
2026-05-20 02:22:54 +00:00

183 lines
5.9 KiB
Python

"""
Aaron AI bulk ingester. Two entry points:
- ingest_directory(folder, embedder=None) — programmatic; called from
api.py /api/reindex with the api process's shared embedder
- python3 scripts/ingest.py <folder> — CLI back-compat; loads its own embedder
Stage 1 helpers (extract / chunk / embed / write) live in scripts/encoding.py.
Failure tracking SQL lives in scripts/failures.py.
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
from sentence_transformers import SentenceTransformer
from encoding import extract_blocks, chunk_and_embed, write_embeddings_batch, SUPPORTED
from failures import (
record_ingest_failure as _record_failure_sql,
resolve_ingest_failure as _resolve_failure_sql,
)
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) -> Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly.
"""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text, len(full_text)))
pg.commit()
pg.close()
except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}")
def _record_failure(filepath: Path, error: str) -> None:
try:
pg = get_pg()
try:
_record_failure_sql(pg, filepath.name, filepath, error)
finally:
pg.close()
except Exception as e:
print(f" Could not record ingest failure (non-fatal): {e}")
def _resolve_failure(source: str) -> None:
try:
pg = get_pg()
try:
_resolve_failure_sql(pg, source)
finally:
pg.close()
except Exception as e:
print(f" Could not resolve ingest failure record (non-fatal): {e}")
IGNORED_TOP_FOLDERS = {"Drafts"}
def _ingest_one(filepath: Path, embedder, root: Path = None) -> int:
"""Ingest a single file. Returns chunk count, 0 on skip/failure."""
# "~" catches Office lock files (~$) including the case where Nextcloud
# filesystem encoding has mangled the "$" to a unicode replacement char.
if filepath.name.startswith(("~", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
if root is not None:
try:
rel = filepath.parent.relative_to(root)
if rel.parts and rel.parts[0] in IGNORED_TOP_FOLDERS:
return 0
except ValueError:
pass
blocks = extract_blocks(filepath)
if not blocks or not any(
(b.get("text") or "").strip() or (b.get("heading") or "").strip()
for b in blocks
):
_record_failure(filepath, "Text extraction failed or empty")
return 0
folder_rel = None
if root is not None:
try:
folder_rel = str(filepath.parent.relative_to(root))
except ValueError:
pass
try:
rows = chunk_and_embed(blocks, filepath.name, embedder,
filepath=filepath, folder=folder_rel)
except Exception as e:
_record_failure(filepath, f"Embedding failed: {e}")
return 0
if not rows:
return 0
try:
pg = get_pg()
try:
write_embeddings_batch(pg, rows)
finally:
pg.close()
except Exception as e:
_record_failure(filepath, f"pgvector write failed: {e}")
return 0
print(f" Indexed {len(rows)} chunks: {filepath.name}")
_resolve_failure(filepath.name)
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
full_text = "\n".join(
f"{b['heading']}\n{b['text']}" if b.get("heading") else b.get("text", "")
for b in blocks
)
enqueue_stage2(filepath.name, full_text)
return len(rows)
def ingest_directory(folder, embedder=None) -> dict:
"""Programmatic entry point. Returns {scanned, ingested, failed, total_chunks}.
If embedder is None, loads its own SentenceTransformer (CLI back-compat path).
Caller (e.g. api.py /api/reindex) should pass its module-level embedder so
the ~200MB model isn't reloaded per call.
"""
folder = Path(folder)
if not folder.exists():
return {"scanned": 0, "ingested": 0, "failed": 0, "total_chunks": 0,
"error": f"folder not found: {folder}"}
if embedder is None:
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
files = [f for f in folder.rglob("*")
if f.suffix.lower() in SUPPORTED
and not f.name.startswith(("~$", "."))]
print(f"Found {len(files)} files to process")
ingested = failed = total_chunks = 0
for f in files:
n = _ingest_one(f, embedder, root=folder)
if n > 0:
ingested += 1
total_chunks += n
else:
failed += 1
return {"scanned": len(files), "ingested": ingested, "failed": failed,
"total_chunks": total_chunks}
def ingest_folder(folder_path):
"""CLI back-compat wrapper. Loads its own embedder."""
result = ingest_directory(Path(folder_path))
print(f"\nDone. {result['ingested']} files / {result['total_chunks']} chunks indexed; "
f"{result['failed']} failed.")
if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
print(f"Ingesting from: {target}\n")
ingest_folder(target)