diff --git a/scripts/watcher.py b/scripts/watcher.py index 8f674a7..7d31821 100644 --- a/scripts/watcher.py +++ b/scripts/watcher.py @@ -1,62 +1,214 @@ +""" +Aaron AI Watcher — Stage 1 of the encoding pipeline. + +Watches the Nextcloud directory for new or changed files. +On detection, chunks + embeds documents in-process (no subprocess), +then enqueues to stage_2_queue for async cascade processing. + +Design principles: +- Embedding model loaded ONCE at startup, reused across all ingest runs +- In-process ingest (no subprocess) — eliminates per-run model reload memory spike +- Missed-file recovery on startup — ingests anything new since last state +- Heartbeat file updated every loop tick — enables external health monitoring +- Parity principle: no filtering, no decisions, faithful capture +- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var) + +Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti) +""" + +import os import time -import subprocess -import logging import json +import hashlib +import logging import threading from pathlib import Path + +import psycopg2 +from dotenv import load_dotenv +from sentence_transformers import SentenceTransformer from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler -NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" -INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py" -PYTHON = "/home/aaron/aaronai/venv/bin/python3" -LOG_FILE = "/home/aaron/aaronai/watcher.log" -STATE_FILE = "/home/aaron/aaronai/watcher_state.json" +from docx import Document as DocxDocument +from pypdf import PdfReader +from pptx import Presentation -SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'} +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" +LOG_FILE = "/home/aaron/aaronai/watcher.log" +STATE_FILE = "/home/aaron/aaronai/watcher_state.json" +STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" +HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat" + +SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"} DEBOUNCE_SECONDS = 120 -STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" +CHUNK_SIZE = 500 +CHUNK_OVERLAP = 50 +EMBED_MODEL = "all-MiniLM-L6-v2" + +PG_DSN = os.getenv("PG_DSN") logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(message)s', - handlers=[ - logging.FileHandler(LOG_FILE), - logging.StreamHandler() - ] + format="%(asctime)s [watcher] %(levelname)s %(message)s", + handlers=[logging.FileHandler(LOG_FILE)], ) +log = logging.getLogger("watcher") +ingestion_lock = threading.Lock() ingestion_state = { - "status": "idle", - "message": "", - "file_count": 0, - "started_at": None, - "finished_at": None, - "last_error": "", + "status": "idle", "message": "", "file_count": 0, + "started_at": None, "finished_at": None, "last_error": "", } -ingestion_lock = threading.Lock() ingestion_thread = None -def set_ingestion_state(**kwargs): - with ingestion_lock: - ingestion_state.update(kwargs) +def load_embedder(): + log.info(f"Loading embedding model: {EMBED_MODEL}") + model = SentenceTransformer(EMBED_MODEL) + log.info("Embedding model ready.") + return model -def load_state(): +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def extract_text(path: Path) -> str: + suffix = path.suffix.lower() + try: + if suffix == ".docx": + doc = DocxDocument(path) + return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) + elif suffix == ".pdf": + reader = PdfReader(path) + return "".join( + page.extract_text() + "\n" + for page in reader.pages if page.extract_text() + ) + elif suffix == ".pptx": + prs = Presentation(path) + return "\n".join( + shape.text for slide in prs.slides + for shape in slide.shapes + if hasattr(shape, "text") and shape.text.strip() + ) + elif suffix in {".txt", ".md"}: + return path.read_text(encoding="utf-8", errors="ignore") + except Exception as e: + log.warning(f"Text extraction failed for {path.name}: {e}") + return "" + + +def chunk_text(text: str) -> list: + words = text.split() + chunks = [] + start = 0 + while start < len(words): + chunk = " ".join(words[start:start + CHUNK_SIZE]) + if chunk.strip(): + chunks.append(chunk) + start += CHUNK_SIZE - CHUNK_OVERLAP + return chunks + + +def make_chunk_id(filepath: Path, chunk_index: int) -> str: + return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}" + + +def enqueue_stage2(source: str, full_text: str): + if os.getenv("SKIP_STAGE2_ENQUEUE"): + return + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + INSERT INTO stage_2_queue (source, full_text, char_length) + VALUES (%s, %s, %s) + ON CONFLICT (source) DO UPDATE SET + full_text = EXCLUDED.full_text, + char_length = EXCLUDED.char_length, + enqueued_at = NOW(), + completed_at = NULL, + failed_at = NULL, + attempts = 0 + """, (source, full_text[:50000], len(full_text))) + pg.commit() + pg.close() + except Exception as e: + log.warning(f"Stage 2 enqueue failed (non-fatal): {e}") + + +def ingest_file(filepath: Path, embedder) -> int: + if filepath.name.startswith(("~$", ".")): + return 0 + if filepath.suffix.lower() not in SUPPORTED: + return 0 + text = extract_text(filepath) + if not text.strip(): + return 0 + chunks = chunk_text(text) + if not chunks: + return 0 + try: + embeddings = embedder.encode(chunks).tolist() + except Exception as e: + log.error(f"Embedding failed for {filepath.name}: {e}") + return 0 + source = filepath.name + try: + pg = get_pg() + cur = pg.cursor() + for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)): + chunk_id = make_chunk_id(filepath, i) + cur.execute(""" + INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) + VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) + ON CONFLICT (id) DO UPDATE SET + document = EXCLUDED.document, + embedding = EXCLUDED.embedding, + source = EXCLUDED.source, + metadata = EXCLUDED.metadata + """, (chunk_id, chunk, embedding, source, "document", + json.dumps({"source": source, "filepath": str(filepath)}))) + pg.commit() + pg.close() + except Exception as e: + log.error(f"pgvector write failed for {filepath.name}: {e}") + return 0 + log.info(f"Indexed {len(chunks)} chunks: {filepath.name}") + enqueue_stage2(source, text) + return len(chunks) + + +def ingest_files(paths: list, embedder, state: dict) -> dict: + total = 0 + for path in paths: + count = ingest_file(path, embedder) + total += count + state[str(path)] = str(path.stat().st_mtime) + log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.") + return state + + +def load_state() -> dict: if Path(STATE_FILE).exists(): - with open(STATE_FILE) as f: - return json.load(f) + try: + with open(STATE_FILE) as f: + return json.load(f) + except Exception: + pass return {} -def save_state(state): - with open(STATE_FILE, 'w') as f: +def save_state(state: dict): + with open(STATE_FILE, "w") as f: json.dump(state, f) -def get_changed_files(): - state = load_state() +def get_changed_files(state: dict) -> list: changed = [] root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): @@ -64,90 +216,87 @@ def get_changed_files(): continue if path.suffix.lower() not in SUPPORTED: continue - if path.name.startswith('.') or path.name.startswith('~$'): + if path.name.startswith((".", "~$")): continue - mtime = str(path.stat().st_mtime) - key = str(path) - if state.get(key) != mtime: + if "Admin/Backups" in str(path) or "Backups" in path.parts: + continue + if "Journal/Media" in str(path): + continue + if state.get(str(path)) != str(path.stat().st_mtime): changed.append(path) - return changed, state + return changed -def run_ingestion(): - changed, state = get_changed_files() +def set_ingestion_state(**kwargs): + with ingestion_lock: + ingestion_state.update(kwargs) + + +def write_status(handler): + with ingestion_lock: + status = { + "running": True, "timestamp": time.time(), + "pending": handler.pending, "last_event": handler.last_event, + "ingestion": dict(ingestion_state), + } + try: + with open(STATUS_FILE, "w") as f: + json.dump(status, f) + except Exception: + pass + + +def write_heartbeat(): + try: + Path(HEARTBEAT_FILE).write_text(str(time.time())) + except Exception: + pass + + +def run_ingestion(embedder): + state = load_state() + changed = get_changed_files(state) if not changed: - logging.info("No new or changed files detected — skipping ingestion.") + log.info("No new or changed files — skipping ingestion.") set_ingestion_state(status="idle", message="No changes detected", file_count=0) return - count = len(changed) - logging.info(f"Found {count} new or changed files — starting ingestion...") + log.info(f"Found {count} new or changed files — starting ingestion...") set_ingestion_state( - status="ingesting", - message=f"Ingesting {count} file(s)...", - file_count=count, - started_at=time.time(), - finished_at=None, - last_error="", + status="ingesting", message=f"Ingesting {count} file(s)...", + file_count=count, started_at=time.time(), finished_at=None, last_error="", ) - try: - result = subprocess.run( - [PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH], - capture_output=True, - text=True, - timeout=1800 - ) - if result.returncode == 0: - root = Path(NEXTCLOUD_PATH) - for path in root.rglob("*"): - if path.is_file() and path.suffix.lower() in SUPPORTED: - state[str(path)] = str(path.stat().st_mtime) - save_state(state) - logging.info("Ingestion complete. State updated.") - set_ingestion_state( - status="idle", - message=f"Last run: ingested {count} file(s) successfully", - finished_at=time.time(), - ) - else: - logging.error(f"Ingestion error: {result.stderr}") - set_ingestion_state( - status="error", - message="Ingestion failed — see log", - last_error=result.stderr[-300:], - finished_at=time.time(), - ) - except subprocess.TimeoutExpired: - logging.error("Ingestion timed out.") + state = ingest_files(changed, embedder, state) + save_state(state) set_ingestion_state( - status="error", - message="Ingestion timed out (>30 min)", - last_error="TimeoutExpired", + status="idle", + message=f"Last run: ingested {count} file(s) successfully", finished_at=time.time(), ) except Exception as e: - logging.error(f"Ingestion failed: {e}") + log.error(f"Ingestion failed: {e}") set_ingestion_state( - status="error", - message=f"Ingestion exception: {e}", - last_error=str(e), - finished_at=time.time(), + status="error", message=f"Ingestion exception: {e}", + last_error=str(e), finished_at=time.time(), ) -def start_ingestion_thread(): +def start_ingestion_thread(embedder): global ingestion_thread - if ingestion_thread and ingestion_thread.is_alive(): - logging.info("Ingestion already running — skipping.") - return - ingestion_thread = threading.Thread(target=run_ingestion, daemon=True) - ingestion_thread.start() + with ingestion_lock: + if ingestion_thread and ingestion_thread.is_alive(): + log.info("Ingestion already running — skipping.") + return + ingestion_thread = threading.Thread( + target=run_ingestion, args=(embedder,), daemon=True + ) + ingestion_thread.start() class IngestHandler(FileSystemEventHandler): def __init__(self): - self.pending = False + self.pending = False self.last_event = 0 def on_any_event(self, event): @@ -156,54 +305,72 @@ class IngestHandler(FileSystemEventHandler): path = Path(event.src_path) if path.suffix.lower() not in SUPPORTED: return - if path.name.startswith('.') or path.name.startswith('~$'): + if path.name.startswith((".", "~$")): return - if 'Admin/Backups' in str(path) or 'Backups' in path.parts: + if "Admin/Backups" in str(path) or "Backups" in path.parts: return - if 'Journal/Media' in str(path): + if "Journal/Media" in str(path): return - if event.event_type not in ('modified', 'created', 'moved'): + if event.event_type not in ("modified", "created", "moved"): return - logging.info(f"Event: {event.event_type} {event.src_path}") - self.pending = True + log.info(f"Event: {event.event_type} {event.src_path}") + self.pending = True self.last_event = time.time() -def write_status(handler): - with ingestion_lock: - status = { - "running": True, - "timestamp": time.time(), - "pending": handler.pending, - "last_event": handler.last_event, - "ingestion": dict(ingestion_state), - } - with open(STATUS_FILE, 'w') as f: - json.dump(status, f) - - def main(): - logging.info("Aaron AI Watcher starting...") - logging.info(f"Watching: {NEXTCLOUD_PATH}") + log.info("Aaron AI Watcher starting...") + log.info(f"Watching: {NEXTCLOUD_PATH}") - handler = IngestHandler() + embedder = load_embedder() + + log.info("Startup scan: checking for files missed since last run...") + state = load_state() + missed = get_changed_files(state) + if missed: + log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.") + set_ingestion_state( + status="ingesting", + message=f"Startup recovery: ingesting {len(missed)} missed file(s)...", + file_count=len(missed), started_at=time.time(), + ) + try: + state = ingest_files(missed, embedder, state) + save_state(state) + set_ingestion_state( + status="idle", + message=f"Startup recovery complete: {len(missed)} file(s) ingested.", + finished_at=time.time(), + ) + except Exception as e: + log.error(f"Startup recovery failed: {e}") + set_ingestion_state(status="error", message=str(e), + last_error=str(e), finished_at=time.time()) + else: + log.info("Startup scan: no missed files.") + + handler = IngestHandler() observer = Observer() observer.schedule(handler, NEXTCLOUD_PATH, recursive=True) observer.start() + log.info("Observer started.") try: while True: + write_heartbeat() write_status(handler) if handler.pending: elapsed = time.time() - handler.last_event if elapsed >= DEBOUNCE_SECONDS: handler.pending = False - start_ingestion_thread() + start_ingestion_thread(embedder) time.sleep(5) except KeyboardInterrupt: + log.info("KeyboardInterrupt — stopping.") observer.stop() + observer.join() - logging.info("Watcher stopped.") + log.info("Watcher stopped.") if __name__ == "__main__":