72e07afc03
ingest_files() updated state[path] = mtime unconditionally after every ingest_file() call. ingest_file() returns 0 when text extraction fails, embedding fails, no chunks are produced, or the pgvector write fails — in every one of those cases, the path was still recorded as ingested at the current mtime. On the next pass, get_changed_files() saw the mtime match and skipped the file, locking it out of the corpus until something modified it on disk. record_ingest_failure() writes to a UI-visible failures table, but nothing reads that table to retry. So failures accumulated silently: the file was simultaneously logged as failed AND tracked in watcher_state as up-to-date, and the second condition won. Fix: only update watcher_state when ingest_file returns count > 0. Failed ingests will be retried on the next watcher cycle until they succeed or are explicitly excluded. Diagnostic at fix time: 129 rows in ingest_failures, 128 currently locked out of the corpus (filepath in watcher_state with mtime matching current disk). 128/129 are text_extraction failures, mostly scanned PDFs (106 .pdf, 13 .docx, 7 .pptx, 2 .md, 1 .txt). 1 source no longer exists on disk. 0 have had their disk mtime change since failing — i.e. without this fix, none of them would ever retry. Cross-check shows watcher_state has 1466 paths vs. 1061 distinct sources in pgvector embeddings, leaving a residual silent-gap of ~276 files after accounting for failures. Historical cleanup of files already locked out by this bug is tracked separately. New failures from this commit forward will retry naturally.
394 lines
12 KiB
Python
394 lines
12 KiB
Python
"""
|
|
Aaron AI Watcher — Stage 1 of the encoding pipeline.
|
|
|
|
Watches the Nextcloud directory for new or changed files.
|
|
On detection, chunks + embeds documents in-process (no subprocess),
|
|
then enqueues to stage_2_queue for async cascade processing.
|
|
|
|
Design principles:
|
|
- Embedding model loaded ONCE at startup, reused across all ingest runs
|
|
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
|
|
- Missed-file recovery on startup — ingests anything new since last state
|
|
- Heartbeat file updated every loop tick — enables external health monitoring
|
|
- Parity principle: no filtering, no decisions, faithful capture
|
|
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
|
|
|
|
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from sentence_transformers import SentenceTransformer
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
|
|
from failures import (
|
|
record_ingest_failure as _record_failure_sql,
|
|
resolve_ingest_failure as _resolve_failure_sql,
|
|
)
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
|
|
|
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
|
|
LOG_FILE = "/home/aaron/aaronai/watcher.log"
|
|
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
|
|
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
|
|
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
|
|
|
|
DEBOUNCE_SECONDS = 120
|
|
EMBED_MODEL = "all-MiniLM-L6-v2"
|
|
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [watcher] %(levelname)s %(message)s",
|
|
handlers=[logging.FileHandler(LOG_FILE)],
|
|
)
|
|
log = logging.getLogger("watcher")
|
|
|
|
ingestion_lock = threading.Lock()
|
|
ingestion_state = {
|
|
"status": "idle", "message": "", "file_count": 0,
|
|
"started_at": None, "finished_at": None, "last_error": "",
|
|
}
|
|
ingestion_thread = None
|
|
|
|
|
|
def load_embedder():
|
|
log.info(f"Loading embedding model: {EMBED_MODEL}")
|
|
model = SentenceTransformer(EMBED_MODEL)
|
|
log.info("Embedding model ready.")
|
|
return model
|
|
|
|
|
|
def get_pg():
|
|
return psycopg2.connect(PG_DSN)
|
|
|
|
|
|
def enqueue_stage2(source: str, full_text: str):
|
|
if os.getenv("SKIP_STAGE2_ENQUEUE"):
|
|
return
|
|
try:
|
|
pg = get_pg()
|
|
cur = pg.cursor()
|
|
cur.execute("""
|
|
INSERT INTO stage_2_queue (source, full_text, char_length)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (source) DO UPDATE SET
|
|
full_text = EXCLUDED.full_text,
|
|
char_length = EXCLUDED.char_length,
|
|
enqueued_at = NOW(),
|
|
completed_at = NULL,
|
|
failed_at = NULL,
|
|
attempts = 0
|
|
""", (source, full_text, len(full_text)))
|
|
pg.commit()
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
|
|
|
|
|
|
def record_ingest_failure(filepath: Path, error: str):
|
|
"""Write extraction or ingest failure to ingest_failures table for UI visibility.
|
|
Local wrapper around failures.record_ingest_failure — opens conn, delegates,
|
|
logs non-fatal errors so the caller never has to handle them."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_record_failure_sql(pg, filepath.name, filepath, error)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not record ingest failure (non-fatal): {e}")
|
|
|
|
|
|
def resolve_ingest_failure(source: str):
|
|
"""Mark a previously failed file as resolved after successful ingest."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_resolve_failure_sql(pg, source)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
|
|
|
|
|
|
def ingest_file(filepath: Path, embedder) -> int:
|
|
if filepath.name.startswith(("~$", ".")):
|
|
return 0
|
|
if filepath.suffix.lower() not in SUPPORTED:
|
|
return 0
|
|
text = extract_text(filepath)
|
|
if not text.strip():
|
|
record_ingest_failure(filepath, "Text extraction failed or empty")
|
|
return 0
|
|
folder_rel = None
|
|
try:
|
|
folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH))
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
rows = chunk_and_embed(text, filepath.name, embedder,
|
|
filepath=filepath, folder=folder_rel)
|
|
except Exception as e:
|
|
log.error(f"Embedding failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"Embedding failed: {e}")
|
|
return 0
|
|
if not rows:
|
|
return 0
|
|
source = filepath.name
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
write_embeddings_batch(pg, rows)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.error(f"pgvector write failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"pgvector write failed: {e}")
|
|
return 0
|
|
log.info(f"Indexed {len(rows)} chunks: {filepath.name}")
|
|
resolve_ingest_failure(source)
|
|
enqueue_stage2(source, text)
|
|
return len(rows)
|
|
|
|
|
|
def ingest_files(paths: list, embedder, state: dict) -> dict:
|
|
total = 0
|
|
for path in paths:
|
|
count = ingest_file(path, embedder)
|
|
total += count
|
|
if count > 0:
|
|
state[str(path)] = str(path.stat().st_mtime)
|
|
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
|
|
return state
|
|
|
|
|
|
def load_state() -> dict:
|
|
if Path(STATE_FILE).exists():
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f)
|
|
|
|
|
|
def get_changed_files(state: dict) -> list:
|
|
changed = []
|
|
root = Path(NEXTCLOUD_PATH)
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix.lower() not in SUPPORTED:
|
|
continue
|
|
if path.name.startswith((".", "~$")):
|
|
continue
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
continue
|
|
if "Journal/Media" in str(path):
|
|
continue
|
|
if state.get(str(path)) != str(path.stat().st_mtime):
|
|
changed.append(path)
|
|
return changed
|
|
|
|
|
|
def set_ingestion_state(**kwargs):
|
|
with ingestion_lock:
|
|
ingestion_state.update(kwargs)
|
|
|
|
|
|
def write_status(handler):
|
|
with ingestion_lock:
|
|
status = {
|
|
"running": True, "timestamp": time.time(),
|
|
"pending": handler.pending, "last_event": handler.last_event,
|
|
"ingestion": dict(ingestion_state),
|
|
}
|
|
try:
|
|
with open(STATUS_FILE, "w") as f:
|
|
json.dump(status, f)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def write_heartbeat():
|
|
try:
|
|
Path(HEARTBEAT_FILE).write_text(str(time.time()))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_ingestion(embedder):
|
|
state = load_state()
|
|
changed = get_changed_files(state)
|
|
if not changed:
|
|
log.info("No new or changed files — skipping ingestion.")
|
|
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
|
|
return
|
|
count = len(changed)
|
|
log.info(f"Found {count} new or changed files — starting ingestion...")
|
|
set_ingestion_state(
|
|
status="ingesting", message=f"Ingesting {count} file(s)...",
|
|
file_count=count, started_at=time.time(), finished_at=None, last_error="",
|
|
)
|
|
try:
|
|
state = ingest_files(changed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Last run: ingested {count} file(s) successfully",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Ingestion failed: {e}")
|
|
set_ingestion_state(
|
|
status="error", message=f"Ingestion exception: {e}",
|
|
last_error=str(e), finished_at=time.time(),
|
|
)
|
|
|
|
|
|
def start_ingestion_thread(embedder):
|
|
global ingestion_thread
|
|
with ingestion_lock:
|
|
if ingestion_thread and ingestion_thread.is_alive():
|
|
log.info("Ingestion already running — skipping.")
|
|
return
|
|
ingestion_thread = threading.Thread(
|
|
target=run_ingestion, args=(embedder,), daemon=True
|
|
)
|
|
ingestion_thread.start()
|
|
|
|
|
|
class IngestHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
self.pending = False
|
|
self.last_event = 0
|
|
|
|
def _should_ignore(self, path: Path) -> bool:
|
|
if path.name.startswith((".", "~$")):
|
|
return True
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
return True
|
|
if "Journal/Media" in str(path):
|
|
return True
|
|
return False
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: created {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: modified {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
return
|
|
# Nextcloud WebDAV writes .part temp files then renames to final path.
|
|
# src_path is the .part file; dest_path is the final filename.
|
|
dest = Path(event.dest_path)
|
|
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
|
|
return
|
|
log.info(f"Event: moved -> {dest}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_closed(self, event):
|
|
# FileClosedEvent fires on the final file after Nextcloud completes write.
|
|
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: closed {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
|
|
def main():
|
|
log.info("Aaron AI Watcher starting...")
|
|
log.info(f"Watching: {NEXTCLOUD_PATH}")
|
|
|
|
embedder = load_embedder()
|
|
|
|
log.info("Startup scan: checking for files missed since last run...")
|
|
state = load_state()
|
|
missed = get_changed_files(state)
|
|
if missed:
|
|
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
|
|
set_ingestion_state(
|
|
status="ingesting",
|
|
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
|
|
file_count=len(missed), started_at=time.time(),
|
|
)
|
|
try:
|
|
state = ingest_files(missed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Startup recovery failed: {e}")
|
|
set_ingestion_state(status="error", message=str(e),
|
|
last_error=str(e), finished_at=time.time())
|
|
else:
|
|
log.info("Startup scan: no missed files.")
|
|
|
|
handler = IngestHandler()
|
|
observer = Observer()
|
|
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
|
|
observer.start()
|
|
log.info("Observer started.")
|
|
|
|
try:
|
|
while True:
|
|
write_heartbeat()
|
|
write_status(handler)
|
|
if handler.pending:
|
|
elapsed = time.time() - handler.last_event
|
|
if elapsed >= DEBOUNCE_SECONDS:
|
|
handler.pending = False
|
|
start_ingestion_thread(embedder)
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
log.info("KeyboardInterrupt — stopping.")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
log.info("Watcher stopped.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|