Compare commits

..

2 Commits

Author SHA1 Message Date
aaron 1101bef226 scripts/encoding.py: Stage 1 dual-implementation consolidation (Track 1 Finding 11)
Consolidates four extract paths and two extract-chunk-embed-write pipelines
into a single shared encoding module. Fixes the embedder lifecycle
divergence between watcher and /api/reindex (no more 200MB reload per
reindex click) and unifies failure tracking so /api/reindex failures now
surface in SettingsPanel "Ingest Health".

New files:
- scripts/encoding.py — extract_text, chunk_text, chunk_and_embed,
  write_embeddings_batch
- scripts/failures.py — record_ingest_failure, resolve_ingest_failure
  (shared by watcher.py and ingest.py)

Refactored:
- scripts/watcher.py — drops local extract/chunk/embed implementations
  and CHUNK_SIZE/CHUNK_OVERLAP/SUPPORTED constants; imports from encoding
  and failures. Now writes ingest_failures row on empty-text-extract
  (was silent return 0).
- scripts/ingest.py — substantial rewrite. Exposes ingest_directory(folder,
  embedder=None) for in-process invocation; CLI back-compat preserved via
  ingest_folder wrapper. Module-level SentenceTransformer load removed.
- scripts/corpus_integrity.py — imports extract_text from encoding;
  extract_text_for_retry function removed.
- scripts/api.py — /api/reindex rewritten with BackgroundTasks (uses
  module-level embedder; no subprocess); new /api/reindex/status endpoint
  reading ~/aaronai/reindex_status.json; /api/corpus/retry imports
  extract_text from encoding; INGEST_SCRIPT constant removed (dead after
  this refactor); 409 reentrance guard prevents double-click stomping.

Behavior changes:
- /api/reindex no longer subprocess.Popens; runs in FastAPI BackgroundTasks
  threadpool, doesn't block API thread.
- /api/reindex no longer reloads SentenceTransformer on each click.
- /api/reindex failures newly write to ingest_failures (visible in
  SettingsPanel "Ingest Health" — badge will jump on first reindex).
- New embeddings rows always have created_at = NOW() (canonical, server-side).
- New embeddings rows always include metadata.folder field (None when not
  derivable).
- /api/reindex returns 409 on second click while a job is running.
- New /api/reindex/status endpoint for polling.

Existing 9,815 NULL created_at rows remain unchanged; backfill is a
separate decision if desired.

199 insertions, 256 deletions across 6 files (codebase shrinks net).

Found by Track 1 inventory 2026-05-02 (Finding 11 / cross-cutting F11).
Pre-commit verification: BackgroundTasks already imported, sys.path
resolves correctly via script-path semantics, static import clean.
2026-05-03 01:40:47 +00:00
aaron a317df66f8 dream: factor prompts into module-level templates, repair prompt_hash (Track 1 Finding 11)
prompt_hash() in dream.py was hashing function __doc__ strings, but the
synth functions don't have docstrings, so the hash was always MD5("") =
d41d8cd9 for every dream. The manifest field meant to detect undeclared
prompt drift carried no useful information.

Refactor:
- Each synth function's prompt template moved to a module-level constant
  (NREM_PROMPT_TEMPLATE, EARLY_REM_PROMPT_TEMPLATE, LATE_REM_PROMPT_TEMPLATE,
  SYNTHESIS_PROMPT_TEMPLATE, LUCID_PROMPT_TEMPLATE) using str.format()
  placeholders instead of f-string interpolation.
- Synth functions call TEMPLATE.format(...) at use time. Output is byte-
  identical to the previous f-string implementation.
- prompt_hash() now hashes the four pipeline template constants (lucid is
  on-demand, not part of the nightly manifest — preserves prior scope).
- LUCID_DEFAULT_TASK extracted as a named constant from the lucid fallback
  question (factoring only, no behavior change).
- PROMPT_VERSION_* constants and synth function signatures untouched.
- v1.1 register-shift comment in synthesize_early_rem preserved inline.

The post-fix hash will differ from d41d8cd9 (verified: b65695a1 in static
test). Historical manifests still carry d41d8cd9; the discontinuity is
intentional — pre-fix hashes were equally meaningless and faking continuity
would be worse than acknowledging the break.

Found by Track 1 inventory 2026-05-02 (Finding 11 / divergence #11).
Verified static import + hash determinism before commit.
2026-05-03 00:24:21 +00:00
7 changed files with 488 additions and 369 deletions
+58 -21
View File
@@ -31,6 +31,9 @@ from fastapi.responses import StreamingResponse
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.cron import CronTrigger
from encoding import extract_text as encoding_extract_text
from ingest import ingest_directory
load_dotenv(Path.home() / "aaronai" / ".env") load_dotenv(Path.home() / "aaronai" / ".env")
MEMORY_PATH = Path.home() / "aaronai" / "memory.md" MEMORY_PATH = Path.home() / "aaronai" / "memory.md"
@@ -39,7 +42,6 @@ SETTINGS_PATH = Path.home() / "aaronai" / "settings.json"
WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log") WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = str(Path.home() / "aaronai" / "scripts" / "ingest.py")
PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3") PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3")
DEFAULT_SETTINGS = { DEFAULT_SETTINGS = {
@@ -908,13 +910,62 @@ async def list_captures():
except Exception as e: except Exception as e:
return JSONResponse({"captures": []}) return JSONResponse({"captures": []})
@app.post("/api/reindex") REINDEX_STATUS_PATH = Path.home() / "aaronai" / "reindex_status.json"
async def trigger_reindex(auth: str = Depends(require_auth)):
def _read_reindex_status() -> dict:
if REINDEX_STATUS_PATH.exists():
try:
return json.loads(REINDEX_STATUS_PATH.read_text())
except Exception:
return {}
return {}
def _write_reindex_status(state: dict):
REINDEX_STATUS_PATH.write_text(json.dumps(state, indent=2))
def _reindex_running() -> bool:
return _read_reindex_status().get("status") == "running"
def _run_reindex_background():
"""Background-thread entry: shares api.py's module-level embedder."""
started = datetime.now().isoformat()
_write_reindex_status({"status": "running", "started_at": started})
try: try:
subprocess.Popen([PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH]) result = ingest_directory(Path(NEXTCLOUD_PATH), embedder=embedder)
return JSONResponse({"started": True, "message": "Re-indexing started in background"}) _write_reindex_status({
"status": "complete",
"started_at": started,
"finished_at": datetime.now().isoformat(),
**result,
})
except Exception as e: except Exception as e:
return JSONResponse({"started": False, "error": str(e)}) _write_reindex_status({
"status": "error",
"started_at": started,
"finished_at": datetime.now().isoformat(),
"error": str(e),
})
@app.post("/api/reindex")
async def trigger_reindex(background_tasks: BackgroundTasks,
auth: str = Depends(require_auth)):
if _reindex_running():
return JSONResponse(
{"started": False, "message": "reindex already running"},
status_code=409,
)
background_tasks.add_task(_run_reindex_background)
return JSONResponse({"started": True, "message": "Re-indexing started in background"})
@app.get("/api/reindex/status")
async def reindex_status(auth: str = Depends(require_auth)):
return JSONResponse(_read_reindex_status())
@app.delete("/api/conversations") @app.delete("/api/conversations")
async def clear_all_conversations(auth: str = Depends(require_auth)): async def clear_all_conversations(auth: str = Depends(require_auth)):
@@ -1042,22 +1093,8 @@ async def corpus_retry(request: Request, auth: str = Depends(require_auth)):
filepath = Path(row[0]) filepath = Path(row[0])
if not filepath.exists(): if not filepath.exists():
return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404) return JSONResponse({"error": f"file not found: {filepath}"}, status_code=404)
suffix = filepath.suffix.lower()
text = ""
try: try:
if suffix in {".txt", ".md"}: text = encoding_extract_text(filepath)
text = filepath.read_text(encoding="utf-8", errors="ignore")
elif suffix == ".pdf":
from pypdf import PdfReader
text = "".join(p.extract_text() + "\n" for p in PdfReader(filepath).pages if p.extract_text())
elif suffix == ".docx":
from docx import Document as DocxDocument
text = "\n".join(p.text for p in DocxDocument(filepath).paragraphs if p.text.strip())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(filepath)
text = "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
except Exception as e: except Exception as e:
return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500) return JSONResponse({"error": f"extraction failed: {e}"}, status_code=500)
if not text.strip(): if not text.strip():
+4 -23
View File
@@ -23,6 +23,9 @@ from datetime import datetime
import psycopg2 import psycopg2
from dotenv import load_dotenv from dotenv import load_dotenv
sys.path.insert(0, str(Path(__file__).parent))
from encoding import extract_text
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
@@ -103,28 +106,6 @@ def get_ingest_failures():
return failures return failures
def extract_text_for_retry(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
try:
if suffix == ".docx":
from docx import Document as D
return "\n".join(p.text for p in D(path).paragraphs if p.text.strip())
elif suffix == ".pdf":
from pypdf import PdfReader
return "".join(p.extract_text() + "\n" for p in PdfReader(path).pages if p.extract_text())
elif suffix == ".pptx":
from pptx import Presentation
prs = Presentation(path)
return "\n".join(shape.text for slide in prs.slides for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip())
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
print(f"WARNING: extraction failed {path.name}: {e}", file=sys.stderr)
return ""
def queue_for_retry(source, full_text, filepath): def queue_for_retry(source, full_text, filepath):
try: try:
pg = get_pg() pg = get_pg()
@@ -188,7 +169,7 @@ def run_reconciliation(fix=False):
if fix and neither: if fix and neither:
print(f"Auto-queuing {len(neither)} gap files...") print(f"Auto-queuing {len(neither)} gap files...")
for finfo in neither: for finfo in neither:
text = extract_text_for_retry(finfo["filepath"]) text = extract_text(Path(finfo["filepath"]))
if text.strip(): if text.strip():
if queue_for_retry(finfo["source"], text, finfo["filepath"]): if queue_for_retry(finfo["source"], text, finfo["filepath"]):
auto_queued.append(finfo["source"]) auto_queued.append(finfo["source"])
+131 -105
View File
@@ -64,6 +64,117 @@ def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts) combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8] return hashlib.md5(combined.encode()).hexdigest()[:8]
# ─── Prompt templates ───────────────────────────────────────────────────────
# Module-level so prompt_hash() can hash actual prompt content. Any change to
# any template — even a single character — flips the manifest's prompt_hash.
# Templates use str.format() placeholders ({chunk_text}, {nrem_output}, ...);
# do not switch back to f-strings (the constant must be hashable independent
# of variable values). Literal { or } in template text would need to be
# doubled ({{, }}) — currently no template contains literal braces.
NREM_PROMPT_TEMPLATE = """You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
EARLY_REM_PROMPT_TEMPLATE = """Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
LATE_REM_PROMPT_TEMPLATE = """You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
SYNTHESIS_PROMPT_TEMPLATE = """You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
LUCID_PROMPT_TEMPLATE = """Aaron has a question he is sitting with:
{task}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
LUCID_DEFAULT_TASK = "What should I be thinking about that I am not?"
def extract_folder(source_path): def extract_folder(source_path):
"""Extract top-level Nextcloud folder from source path.""" """Extract top-level Nextcloud folder from source path."""
parts = source_path.replace("\\", "/").split("/") parts = source_path.replace("\\", "/").split("/")
@@ -240,124 +351,39 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
def synthesize_nrem(chunks): def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have read everything Aaron Nelson has written and published. return _call_claude(NREM_PROMPT_TEMPLATE.format(chunk_text=chunk_text))
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
return _call_claude(prompt)
def synthesize_early_rem(chunks, nrem_output): def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona, # v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition. # shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Something was noticed earlier tonight, moving through Aaron's recent work: return _call_claude(EARLY_REM_PROMPT_TEMPLATE.format(
nrem_output=nrem_output, chunk_text=chunk_text))
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
return _call_claude(prompt)
def synthesize_late_rem(chunks, nrem_output, early_rem_output): def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""You have been moving through Aaron Nelson's corpus all night. return _call_claude(LATE_REM_PROMPT_TEMPLATE.format(
First you found this, in the careful light of early consolidation: nrem_output=nrem_output,
early_rem_output=early_rem_output,
{nrem_output} chunk_text=chunk_text))
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
return _call_claude(prompt)
def synthesize_final(nrem_output, early_rem_output, late_rem_output): def synthesize_final(nrem_output, early_rem_output, late_rem_output):
prompt = f"""You have spent the night moving through Aaron Nelson's corpus return _call_claude(
in three passes, each building on the last. SYNTHESIS_PROMPT_TEMPLATE.format(
nrem_output=nrem_output,
The first pass — careful, close to the documents: early_rem_output=early_rem_output,
{nrem_output} late_rem_output=late_rem_output),
max_tokens=800)
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
return _call_claude(prompt, max_tokens=800)
def synthesize_lucid(chunks, task): def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
prompt = f"""Aaron has a question he is sitting with: resolved_task = task or LUCID_DEFAULT_TASK
return _call_claude(LUCID_PROMPT_TEMPLATE.format(
{task or "What should I be thinking about that I am not?"} task=resolved_task, chunk_text=chunk_text))
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
return _call_claude(prompt)
def _call_claude(prompt, max_tokens=1000): def _call_claude(prompt, max_tokens=1000):
@@ -436,10 +462,10 @@ def write_manifest(date_str, stage_data, corpus_data):
"prompt_sig": prompt_signature(), "prompt_sig": prompt_signature(),
"dreamer_version": DREAMER_VERSION, "dreamer_version": DREAMER_VERSION,
"prompt_hash": prompt_hash([ "prompt_hash": prompt_hash([
synthesize_nrem.__doc__ or "", NREM_PROMPT_TEMPLATE,
synthesize_early_rem.__doc__ or "", EARLY_REM_PROMPT_TEMPLATE,
synthesize_late_rem.__doc__ or "", LATE_REM_PROMPT_TEMPLATE,
synthesize_final.__doc__ or "", SYNTHESIS_PROMPT_TEMPLATE,
]), ]),
"stages": stage_data, "stages": stage_data,
"corpus": corpus_data, "corpus": corpus_data,
+120
View File
@@ -0,0 +1,120 @@
"""
Aaron AI Stage 1 encoding helpers — single canonical implementation of:
- extract_text(filepath) — four-extension text extraction
- chunk_text(text, chunk_size, overlap) — word-based chunking
- chunk_and_embed(text, source, embedder, filepath, folder) — produce ready-to-write rows
- write_embeddings_batch(conn, batch) — server-side NOW() canonical INSERT
Used by watcher.py, ingest.py, corpus_integrity.py, and api.py /api/corpus/retry.
Replaces four separate extract reimplementations and two extract-chunk-embed paths.
"""
import hashlib
import json
import logging
from pathlib import Path
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
log = logging.getLogger("encoding")
SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
DEFAULT_CHUNK_SIZE = 500
DEFAULT_CHUNK_OVERLAP = 50
def extract_text(filepath: Path) -> str:
"""Return the text of a supported file. Returns "" on any failure or
unsupported extension. Does not write to ingest_failures — caller decides."""
suffix = filepath.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(filepath)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(filepath)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(filepath)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return filepath.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {filepath.name}: {e}")
return ""
def chunk_text(text: str,
chunk_size: int = DEFAULT_CHUNK_SIZE,
overlap: int = DEFAULT_CHUNK_OVERLAP) -> list[str]:
"""Word-based chunking. Empty chunks filtered."""
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + chunk_size])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def _chunk_id(filepath, source: str, index: int) -> str:
basis = str(filepath) if filepath else source
return f"{hashlib.md5(basis.encode()).hexdigest()[:8]}_{index}"
def chunk_and_embed(text: str,
source: str,
embedder,
filepath=None,
folder=None) -> list[dict]:
"""Chunk text, embed each chunk, return rows ready for write_embeddings_batch."""
chunks = chunk_text(text)
if not chunks:
return []
embeddings = embedder.encode(chunks).tolist()
rows = []
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
rows.append({
"id": _chunk_id(filepath, source, i),
"document": chunk,
"embedding": emb,
"source": source,
"type": "document",
"metadata": {
"source": source,
"filepath": str(filepath) if filepath else source,
"folder": folder,
},
})
return rows
def write_embeddings_batch(conn, batch: list[dict]) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits."""
if not batch:
return 0
cur = conn.cursor()
for row in batch:
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"])))
conn.commit()
return len(batch)
+30
View File
@@ -0,0 +1,30 @@
"""
Aaron AI ingest_failures helpers — shared by watcher.py and ingest.py.
Both modules write structured failure rows so the SettingsPanel "Ingest Health"
view sees the same shape regardless of ingest path. Functions take an explicit
conn parameter; the caller decides transaction boundaries and exception
handling. Both current callers wrap with their own log-and-swallow shims.
"""
def record_ingest_failure(conn, source: str, filepath, error: str) -> None:
"""Insert or update an ingest_failures row. Commits."""
cur = conn.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (source, str(filepath), error[:1000]))
conn.commit()
def resolve_ingest_failure(conn, source: str) -> None:
"""Mark a previously failed source as resolved. Commits."""
cur = conn.cursor()
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,))
conn.commit()
+112 -131
View File
@@ -1,70 +1,37 @@
"""
Aaron AI bulk ingester. Two entry points:
- ingest_directory(folder, embedder=None) — programmatic; called from
api.py /api/reindex with the api process's shared embedder
- python3 scripts/ingest.py <folder> — CLI back-compat; loads its own embedder
Stage 1 helpers (extract / chunk / embed / write) live in scripts/encoding.py.
Failure tracking SQL lives in scripts/failures.py.
"""
import os import os
import sys import sys
import hashlib
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
import psycopg2 import psycopg2
import psycopg2.extras
import json
from sentence_transformers import SentenceTransformer from sentence_transformers import SentenceTransformer
from docx import Document
from pypdf import PdfReader from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
from pptx import Presentation from failures import (
record_ingest_failure as _record_failure_sql,
resolve_ingest_failure as _resolve_failure_sql,
)
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
PG_DSN = os.getenv("PG_DSN") PG_DSN = os.getenv("PG_DSN")
def get_pg(): def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_text_from_pdf(path):
reader = PdfReader(path)
text = ""
for page in reader.pages:
extracted = page.extract_text()
if extracted:
text += extracted + "\n"
return text
def extract_text_from_pptx(path):
prs = Presentation(path)
text = ""
for slide in prs.slides:
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
text += shape.text + "\n"
return text
def extract_text_from_txt(path):
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
def chunk_text(text, chunk_size=500, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def enqueue_stage2(source, full_text): def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) Stage 3 (Graphiti ingest). """Enqueue document for Stage 2 (Mistral orientation) -> Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly. and the watcher calls Stage 2 directly.
""" """
@@ -87,94 +54,108 @@ def enqueue_stage2(source, full_text):
except Exception as e: except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}") print(f" Stage 2 queue insert failed (non-fatal): {e}")
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
if path.name.startswith("~$") or path.name.startswith("."):
return 0
def _record_failure(filepath: Path, error: str) -> None:
try: try:
if suffix == ".docx":
text = extract_text_from_docx(path)
elif suffix == ".pdf":
text = extract_text_from_pdf(path)
elif suffix == ".pptx":
text = extract_text_from_pptx(path)
elif suffix in [".txt", ".md"]:
text = extract_text_from_txt(path)
else:
return 0
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
embeddings = embedder.encode(chunks).tolist()
ids = [make_id(path, i) for i in range(len(chunks))]
metadatas = [{
"source": path.name,
"filepath": str(path),
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
# STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti)
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas): _record_failure_sql(pg, filepath.name, filepath, error)
cur.execute(""" finally:
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) pg.close()
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get("source"), "document", None,
json.dumps(meta)
))
pg.commit()
pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
# SKIP_STAGE2_ENQUEUE env var set by migration scripts to prevent bulk enqueue
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(path.name, text)
return len(chunks)
except Exception as e: except Exception as e:
print(f" Error: {path.name}: {e}") print(f" Could not record ingest failure (non-fatal): {e}")
def _resolve_failure(source: str) -> None:
try:
pg = get_pg()
try:
_resolve_failure_sql(pg, source)
finally:
pg.close()
except Exception as e:
print(f" Could not resolve ingest failure record (non-fatal): {e}")
def _ingest_one(filepath: Path, embedder, root: Path = None) -> int:
"""Ingest a single file. Returns chunk count, 0 on skip/failure."""
if filepath.name.startswith(("~$", ".")):
return 0 return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
_record_failure(filepath, "Text extraction failed or empty")
return 0
folder_rel = None
if root is not None:
try:
folder_rel = str(filepath.parent.relative_to(root))
except ValueError:
pass
try:
rows = chunk_and_embed(text, filepath.name, embedder,
filepath=filepath, folder=folder_rel)
except Exception as e:
_record_failure(filepath, f"Embedding failed: {e}")
return 0
if not rows:
return 0
try:
pg = get_pg()
try:
write_embeddings_batch(pg, rows)
finally:
pg.close()
except Exception as e:
_record_failure(filepath, f"pgvector write failed: {e}")
return 0
print(f" Indexed {len(rows)} chunks: {filepath.name}")
_resolve_failure(filepath.name)
if not os.getenv("SKIP_STAGE2_ENQUEUE"):
enqueue_stage2(filepath.name, text)
return len(rows)
def ingest_directory(folder, embedder=None) -> dict:
"""Programmatic entry point. Returns {scanned, ingested, failed, total_chunks}.
If embedder is None, loads its own SentenceTransformer (CLI back-compat path).
Caller (e.g. api.py /api/reindex) should pass its module-level embedder so
the ~200MB model isn't reloaded per call.
"""
folder = Path(folder)
if not folder.exists():
return {"scanned": 0, "ingested": 0, "failed": 0, "total_chunks": 0,
"error": f"folder not found: {folder}"}
if embedder is None:
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
files = [f for f in folder.rglob("*")
if f.suffix.lower() in SUPPORTED
and not f.name.startswith(("~$", "."))]
print(f"Found {len(files)} files to process")
ingested = failed = total_chunks = 0
for f in files:
n = _ingest_one(f, embedder, root=folder)
if n > 0:
ingested += 1
total_chunks += n
else:
failed += 1
return {"scanned": len(files), "ingested": ingested, "failed": failed,
"total_chunks": total_chunks}
def ingest_folder(folder_path): def ingest_folder(folder_path):
folder = Path(folder_path) """CLI back-compat wrapper. Loads its own embedder."""
if not folder.exists(): result = ingest_directory(Path(folder_path))
print(f"Folder not found: {folder_path}") print(f"\nDone. {result['ingested']} files / {result['total_chunks']} chunks indexed; "
sys.exit(1) f"{result['failed']} failed.")
supported = [".docx", ".pdf", ".pptx", ".txt", ".md"]
files = [f for f in folder.rglob("*")
if f.suffix.lower() in supported
and not f.name.startswith("~$")
and not f.name.startswith(".")]
if not files:
print("No supported files found.")
sys.exit(1)
print(f"Found {len(files)} files to process\n")
total_chunks = 0
for f in files:
total_chunks += ingest_file(f)
print(f"\nDone. Total chunks indexed: {total_chunks}")
if __name__ == "__main__": if __name__ == "__main__":
target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs") target = sys.argv[1] if len(sys.argv) > 1 else str(Path.home() / "aaronai" / "docs")
+33 -89
View File
@@ -19,7 +19,6 @@ Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3
import os import os
import time import time
import json import json
import hashlib
import logging import logging
import threading import threading
from pathlib import Path from pathlib import Path
@@ -30,9 +29,11 @@ from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
from pypdf import PdfReader from failures import (
from pptx import Presentation record_ingest_failure as _record_failure_sql,
resolve_ingest_failure as _resolve_failure_sql,
)
load_dotenv(Path.home() / "aaronai" / ".env", override=True) load_dotenv(Path.home() / "aaronai" / ".env", override=True)
@@ -42,10 +43,7 @@ STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat" HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120 DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2" EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN") PG_DSN = os.getenv("PG_DSN")
@@ -76,49 +74,6 @@ def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
record_ingest_failure(path, f"Text extraction failed: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str): def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"): if os.getenv("SKIP_STAGE2_ENQUEUE"):
return return
@@ -143,21 +98,15 @@ def enqueue_stage2(source: str, full_text: str):
def record_ingest_failure(filepath: Path, error: str): def record_ingest_failure(filepath: Path, error: str):
"""Write extraction or ingest failure to ingest_failures table for UI visibility.""" """Write extraction or ingest failure to ingest_failures table for UI visibility.
Local wrapper around failures.record_ingest_failure — opens conn, delegates,
logs non-fatal errors so the caller never has to handle them."""
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
cur.execute(""" _record_failure_sql(pg, filepath.name, filepath, error)
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at) finally:
VALUES (%s, %s, %s, 0, NOW(), NOW()) pg.close()
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (filepath.name, str(filepath), error[:1000]))
pg.commit()
pg.close()
except Exception as e: except Exception as e:
log.warning(f"Could not record ingest failure (non-fatal): {e}") log.warning(f"Could not record ingest failure (non-fatal): {e}")
@@ -166,10 +115,10 @@ def resolve_ingest_failure(source: str):
"""Mark a previously failed file as resolved after successful ingest.""" """Mark a previously failed file as resolved after successful ingest."""
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,)) _resolve_failure_sql(pg, source)
pg.commit() finally:
pg.close() pg.close()
except Exception as e: except Exception as e:
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}") log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
@@ -181,42 +130,37 @@ def ingest_file(filepath: Path, embedder) -> int:
return 0 return 0
text = extract_text(filepath) text = extract_text(filepath)
if not text.strip(): if not text.strip():
record_ingest_failure(filepath, "Text extraction failed or empty")
return 0 return 0
chunks = chunk_text(text) folder_rel = None
if not chunks:
return 0
try: try:
embeddings = embedder.encode(chunks).tolist() folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH))
except ValueError:
pass
try:
rows = chunk_and_embed(text, filepath.name, embedder,
filepath=filepath, folder=folder_rel)
except Exception as e: except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}") log.error(f"Embedding failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"Embedding failed: {e}") record_ingest_failure(filepath, f"Embedding failed: {e}")
return 0 return 0
if not rows:
return 0
source = filepath.name source = filepath.name
try: try:
pg = get_pg() pg = get_pg()
cur = pg.cursor() try:
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): write_embeddings_batch(pg, rows)
chunk_id = make_chunk_id(filepath, i) finally:
cur.execute(""" pg.close()
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e: except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}") log.error(f"pgvector write failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"pgvector write failed: {e}") record_ingest_failure(filepath, f"pgvector write failed: {e}")
return 0 return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}") log.info(f"Indexed {len(rows)} chunks: {filepath.name}")
resolve_ingest_failure(source) resolve_ingest_failure(source)
enqueue_stage2(source, text) enqueue_stage2(source, text)
return len(chunks) return len(rows)
def ingest_files(paths: list, embedder, state: dict) -> dict: def ingest_files(paths: list, embedder, state: dict) -> dict: