9955c7e383
extract_blocks(filepath) is the new structured-extraction entry point, returning
list[{heading, text, kind}]. chunk_and_embed accepts either str (blind-chunk
back-compat) or list[dict] (one chunk per block, blind-split if oversize, heading
prepended for retrieval context and stored in metadata).
- pptx: one block per slide. Slide title becomes block heading; speaker notes
fold into the body. Image-only decks with title-only slides now produce
heading-only chunks instead of being recorded as extraction failures.
- docx: deliberately single-block (back-compat). Heading-style section detection
was implemented and rolled back: hand-formatted CVs are Normal-styled with
bold-as-heading, and tying chunk boundaries to formatting choices would lock
future-user into preserving those choices forever. Lexical + cross-encoder
retrieval already handles substring matching inside blind-chunked CVs.
- pdf/txt/md: unchanged (single block, blind chunking).
Recency tiebreak in retrieve_context: pull created_at into the SELECT, use it
as secondary sort key in _rerank so memory/journal snapshots prefer the latest
copy among near-duplicate content.
reindex_docx_pptx.py now accepts --ext=pptx,docx... so re-ingest can target a
subset; previous hardcoded delete regex would have wiped both even with a
single-ext target.
423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""
|
|
Aaron AI Watcher — Stage 1 of the encoding pipeline.
|
|
|
|
Watches the Nextcloud directory for new or changed files.
|
|
On detection, chunks + embeds documents in-process (no subprocess),
|
|
then enqueues to stage_2_queue for async cascade processing.
|
|
|
|
Design principles:
|
|
- Embedding model loaded ONCE at startup, reused across all ingest runs
|
|
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
|
|
- Missed-file recovery on startup — ingests anything new since last state
|
|
- Heartbeat file updated every loop tick — enables external health monitoring
|
|
- Parity principle: no filtering, no decisions, faithful capture
|
|
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
|
|
|
|
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from sentence_transformers import SentenceTransformer
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from encoding import extract_blocks, chunk_and_embed, write_embeddings_batch, SUPPORTED
|
|
from failures import (
|
|
record_ingest_failure as _record_failure_sql,
|
|
resolve_ingest_failure as _resolve_failure_sql,
|
|
)
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
|
|
|
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
|
|
LOG_FILE = "/home/aaron/aaronai/watcher.log"
|
|
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
|
|
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
|
|
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
|
|
|
|
DEBOUNCE_SECONDS = 120
|
|
EMBED_MODEL = "all-MiniLM-L6-v2"
|
|
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [watcher] %(levelname)s %(message)s",
|
|
handlers=[logging.FileHandler(LOG_FILE)],
|
|
)
|
|
log = logging.getLogger("watcher")
|
|
|
|
ingestion_lock = threading.Lock()
|
|
ingestion_state = {
|
|
"status": "idle", "message": "", "file_count": 0,
|
|
"started_at": None, "finished_at": None, "last_error": "",
|
|
}
|
|
ingestion_thread = None
|
|
|
|
|
|
def load_embedder():
|
|
log.info(f"Loading embedding model: {EMBED_MODEL}")
|
|
model = SentenceTransformer(EMBED_MODEL)
|
|
log.info("Embedding model ready.")
|
|
return model
|
|
|
|
|
|
def get_pg():
|
|
return psycopg2.connect(PG_DSN)
|
|
|
|
|
|
def enqueue_stage2(source: str, full_text: str):
|
|
if os.getenv("SKIP_STAGE2_ENQUEUE"):
|
|
return
|
|
try:
|
|
pg = get_pg()
|
|
cur = pg.cursor()
|
|
cur.execute("""
|
|
INSERT INTO stage_2_queue (source, full_text, char_length)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (source) DO UPDATE SET
|
|
full_text = EXCLUDED.full_text,
|
|
char_length = EXCLUDED.char_length,
|
|
enqueued_at = NOW(),
|
|
completed_at = NULL,
|
|
failed_at = NULL,
|
|
attempts = 0
|
|
""", (source, full_text, len(full_text)))
|
|
pg.commit()
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
|
|
|
|
|
|
def record_ingest_failure(filepath: Path, error: str):
|
|
"""Write extraction or ingest failure to ingest_failures table for UI visibility.
|
|
Local wrapper around failures.record_ingest_failure — opens conn, delegates,
|
|
logs non-fatal errors so the caller never has to handle them."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_record_failure_sql(pg, filepath.name, filepath, error)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not record ingest failure (non-fatal): {e}")
|
|
|
|
|
|
def resolve_ingest_failure(source: str):
|
|
"""Mark a previously failed file as resolved after successful ingest."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_resolve_failure_sql(pg, source)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
|
|
|
|
|
|
def ingest_file(filepath: Path, embedder) -> int:
|
|
if filepath.name.startswith(("~$", "~", ".")):
|
|
return 0
|
|
if filepath.suffix.lower() not in SUPPORTED:
|
|
return 0
|
|
blocks = extract_blocks(filepath)
|
|
if not blocks or not any(
|
|
(b.get("text") or "").strip() or (b.get("heading") or "").strip()
|
|
for b in blocks
|
|
):
|
|
record_ingest_failure(filepath, "Text extraction failed or empty")
|
|
return 0
|
|
folder_rel = None
|
|
try:
|
|
folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH))
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
rows = chunk_and_embed(blocks, filepath.name, embedder,
|
|
filepath=filepath, folder=folder_rel)
|
|
except Exception as e:
|
|
log.error(f"Embedding failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"Embedding failed: {e}")
|
|
return 0
|
|
if not rows:
|
|
return 0
|
|
source = filepath.name
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
write_embeddings_batch(pg, rows)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.error(f"pgvector write failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"pgvector write failed: {e}")
|
|
return 0
|
|
log.info(f"Indexed {len(rows)} chunks: {filepath.name}")
|
|
resolve_ingest_failure(source)
|
|
full_text = "\n".join(
|
|
f"{b['heading']}\n{b['text']}" if b.get("heading") else b.get("text", "")
|
|
for b in blocks
|
|
)
|
|
enqueue_stage2(source, full_text)
|
|
return len(rows)
|
|
|
|
|
|
def ingest_files(paths: list, embedder, state: dict) -> dict:
|
|
total = 0
|
|
for path in paths:
|
|
count = ingest_file(path, embedder)
|
|
total += count
|
|
if count > 0:
|
|
state[str(path)] = str(path.stat().st_mtime)
|
|
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
|
|
return state
|
|
|
|
|
|
def load_state() -> dict:
|
|
if Path(STATE_FILE).exists():
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f)
|
|
|
|
|
|
def get_changed_files(state: dict) -> list:
|
|
changed = []
|
|
root = Path(NEXTCLOUD_PATH)
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix.lower() not in SUPPORTED:
|
|
continue
|
|
if path.name.startswith((".", "~$", "~")):
|
|
continue
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
continue
|
|
if "Journal/Media" in str(path):
|
|
continue
|
|
if "Generative Design" in path.parts and "Processing" in path.parts:
|
|
continue
|
|
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
|
|
continue
|
|
if path.name in ("Renders.pptx", "Ribbon Cutting Slideshow.pptx") \
|
|
and "Presentations" in path.parts:
|
|
continue
|
|
if path.name == "GH Slicer Notes [Autosaved].pptx" \
|
|
and "DDF555 3D Computational" in path.parts:
|
|
continue
|
|
if path.stat().st_size == 0:
|
|
continue
|
|
if state.get(str(path)) != str(path.stat().st_mtime):
|
|
changed.append(path)
|
|
return changed
|
|
|
|
|
|
def set_ingestion_state(**kwargs):
|
|
with ingestion_lock:
|
|
ingestion_state.update(kwargs)
|
|
|
|
|
|
def write_status(handler):
|
|
with ingestion_lock:
|
|
status = {
|
|
"running": True, "timestamp": time.time(),
|
|
"pending": handler.pending, "last_event": handler.last_event,
|
|
"ingestion": dict(ingestion_state),
|
|
}
|
|
try:
|
|
with open(STATUS_FILE, "w") as f:
|
|
json.dump(status, f)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def write_heartbeat():
|
|
try:
|
|
Path(HEARTBEAT_FILE).write_text(str(time.time()))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_ingestion(embedder):
|
|
state = load_state()
|
|
changed = get_changed_files(state)
|
|
if not changed:
|
|
log.info("No new or changed files — skipping ingestion.")
|
|
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
|
|
return
|
|
count = len(changed)
|
|
log.info(f"Found {count} new or changed files — starting ingestion...")
|
|
set_ingestion_state(
|
|
status="ingesting", message=f"Ingesting {count} file(s)...",
|
|
file_count=count, started_at=time.time(), finished_at=None, last_error="",
|
|
)
|
|
try:
|
|
state = ingest_files(changed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Last run: ingested {count} file(s) successfully",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Ingestion failed: {e}")
|
|
set_ingestion_state(
|
|
status="error", message=f"Ingestion exception: {e}",
|
|
last_error=str(e), finished_at=time.time(),
|
|
)
|
|
|
|
|
|
def start_ingestion_thread(embedder):
|
|
global ingestion_thread
|
|
with ingestion_lock:
|
|
if ingestion_thread and ingestion_thread.is_alive():
|
|
log.info("Ingestion already running — skipping.")
|
|
return
|
|
ingestion_thread = threading.Thread(
|
|
target=run_ingestion, args=(embedder,), daemon=True
|
|
)
|
|
ingestion_thread.start()
|
|
|
|
|
|
class IngestHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
self.pending = False
|
|
self.last_event = 0
|
|
|
|
def _should_ignore(self, path: Path) -> bool:
|
|
if path.name.startswith((".", "~$", "~")):
|
|
return True
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
return True
|
|
if "Journal/Media" in str(path):
|
|
return True
|
|
if "Generative Design" in path.parts and "Processing" in path.parts:
|
|
return True
|
|
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
|
|
return True
|
|
if path.name in ("Renders.pptx", "Ribbon Cutting Slideshow.pptx") \
|
|
and "Presentations" in path.parts:
|
|
return True
|
|
if path.name == "GH Slicer Notes [Autosaved].pptx" \
|
|
and "DDF555 3D Computational" in path.parts:
|
|
return True
|
|
return False
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: created {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: modified {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
return
|
|
# Nextcloud WebDAV writes .part temp files then renames to final path.
|
|
# src_path is the .part file; dest_path is the final filename.
|
|
dest = Path(event.dest_path)
|
|
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
|
|
return
|
|
log.info(f"Event: moved -> {dest}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_closed(self, event):
|
|
# FileClosedEvent fires on the final file after Nextcloud completes write.
|
|
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: closed {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
|
|
def main():
|
|
log.info("Aaron AI Watcher starting...")
|
|
log.info(f"Watching: {NEXTCLOUD_PATH}")
|
|
|
|
embedder = load_embedder()
|
|
|
|
log.info("Startup scan: checking for files missed since last run...")
|
|
state = load_state()
|
|
missed = get_changed_files(state)
|
|
if missed:
|
|
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
|
|
set_ingestion_state(
|
|
status="ingesting",
|
|
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
|
|
file_count=len(missed), started_at=time.time(),
|
|
)
|
|
try:
|
|
state = ingest_files(missed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Startup recovery failed: {e}")
|
|
set_ingestion_state(status="error", message=str(e),
|
|
last_error=str(e), finished_at=time.time())
|
|
else:
|
|
log.info("Startup scan: no missed files.")
|
|
|
|
handler = IngestHandler()
|
|
observer = Observer()
|
|
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
|
|
observer.start()
|
|
log.info("Observer started.")
|
|
|
|
try:
|
|
while True:
|
|
write_heartbeat()
|
|
write_status(handler)
|
|
if handler.pending:
|
|
elapsed = time.time() - handler.last_event
|
|
if elapsed >= DEBOUNCE_SECONDS:
|
|
handler.pending = False
|
|
start_ingestion_thread(embedder)
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
log.info("KeyboardInterrupt — stopping.")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
log.info("Watcher stopped.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|