b9eea6cb62
Three rows in ingest_failures were Office lockfile leftovers whose
filename starts with ~� (~ followed by the UTF-8 replacement
character) instead of ~$. Somewhere in the Nextcloud sync chain the $
byte was lost or replaced; the file now lives on disk as a real file
with this corrupted name. The watcher's ("~$", ".") prefix filter
didn't match, so each cycle tried to ingest these as pptx, hit
BadZipFile inside python-pptx (lockfiles aren't real Office documents),
and they ended up permanently in ingest_failures.
Three filter sites in watcher.py applied the lockfile prefix check:
- ingest_file() at :127
- get_changed_files() at :200
- IngestHandler._should_ignore() at :290
All three now match ("~$", "~", ".") — broadened to catch any tilde
prefix, not just ~$. The cross-check against pgvector embeddings and
disk found zero legitimate tilde-prefixed files in the corpus, so the
broader filter has no false-positive risk in this corpus.
Cleanup: 3 ingest_failures rows resolved (filepath LIKE '%/~%').
Unresolved count drops 97 → 94.
If a fourth filter site is ever added, the right shape is consolidating
the lockfile prefix check to a shared function or constant. Three
parallel sites with three different tuple orderings is acceptable for
now but worth normalizing if the surface grows.
404 lines
13 KiB
Python
404 lines
13 KiB
Python
"""
|
|
Aaron AI Watcher — Stage 1 of the encoding pipeline.
|
|
|
|
Watches the Nextcloud directory for new or changed files.
|
|
On detection, chunks + embeds documents in-process (no subprocess),
|
|
then enqueues to stage_2_queue for async cascade processing.
|
|
|
|
Design principles:
|
|
- Embedding model loaded ONCE at startup, reused across all ingest runs
|
|
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
|
|
- Missed-file recovery on startup — ingests anything new since last state
|
|
- Heartbeat file updated every loop tick — enables external health monitoring
|
|
- Parity principle: no filtering, no decisions, faithful capture
|
|
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
|
|
|
|
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
from dotenv import load_dotenv
|
|
from sentence_transformers import SentenceTransformer
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from encoding import extract_text, chunk_and_embed, write_embeddings_batch, SUPPORTED
|
|
from failures import (
|
|
record_ingest_failure as _record_failure_sql,
|
|
resolve_ingest_failure as _resolve_failure_sql,
|
|
)
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
|
|
|
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
|
|
LOG_FILE = "/home/aaron/aaronai/watcher.log"
|
|
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
|
|
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
|
|
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
|
|
|
|
DEBOUNCE_SECONDS = 120
|
|
EMBED_MODEL = "all-MiniLM-L6-v2"
|
|
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [watcher] %(levelname)s %(message)s",
|
|
handlers=[logging.FileHandler(LOG_FILE)],
|
|
)
|
|
log = logging.getLogger("watcher")
|
|
|
|
ingestion_lock = threading.Lock()
|
|
ingestion_state = {
|
|
"status": "idle", "message": "", "file_count": 0,
|
|
"started_at": None, "finished_at": None, "last_error": "",
|
|
}
|
|
ingestion_thread = None
|
|
|
|
|
|
def load_embedder():
|
|
log.info(f"Loading embedding model: {EMBED_MODEL}")
|
|
model = SentenceTransformer(EMBED_MODEL)
|
|
log.info("Embedding model ready.")
|
|
return model
|
|
|
|
|
|
def get_pg():
|
|
return psycopg2.connect(PG_DSN)
|
|
|
|
|
|
def enqueue_stage2(source: str, full_text: str):
|
|
if os.getenv("SKIP_STAGE2_ENQUEUE"):
|
|
return
|
|
try:
|
|
pg = get_pg()
|
|
cur = pg.cursor()
|
|
cur.execute("""
|
|
INSERT INTO stage_2_queue (source, full_text, char_length)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (source) DO UPDATE SET
|
|
full_text = EXCLUDED.full_text,
|
|
char_length = EXCLUDED.char_length,
|
|
enqueued_at = NOW(),
|
|
completed_at = NULL,
|
|
failed_at = NULL,
|
|
attempts = 0
|
|
""", (source, full_text, len(full_text)))
|
|
pg.commit()
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
|
|
|
|
|
|
def record_ingest_failure(filepath: Path, error: str):
|
|
"""Write extraction or ingest failure to ingest_failures table for UI visibility.
|
|
Local wrapper around failures.record_ingest_failure — opens conn, delegates,
|
|
logs non-fatal errors so the caller never has to handle them."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_record_failure_sql(pg, filepath.name, filepath, error)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not record ingest failure (non-fatal): {e}")
|
|
|
|
|
|
def resolve_ingest_failure(source: str):
|
|
"""Mark a previously failed file as resolved after successful ingest."""
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
_resolve_failure_sql(pg, source)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
|
|
|
|
|
|
def ingest_file(filepath: Path, embedder) -> int:
|
|
if filepath.name.startswith(("~$", "~", ".")):
|
|
return 0
|
|
if filepath.suffix.lower() not in SUPPORTED:
|
|
return 0
|
|
text = extract_text(filepath)
|
|
if not text.strip():
|
|
record_ingest_failure(filepath, "Text extraction failed or empty")
|
|
return 0
|
|
folder_rel = None
|
|
try:
|
|
folder_rel = str(filepath.parent.relative_to(NEXTCLOUD_PATH))
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
rows = chunk_and_embed(text, filepath.name, embedder,
|
|
filepath=filepath, folder=folder_rel)
|
|
except Exception as e:
|
|
log.error(f"Embedding failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"Embedding failed: {e}")
|
|
return 0
|
|
if not rows:
|
|
return 0
|
|
source = filepath.name
|
|
try:
|
|
pg = get_pg()
|
|
try:
|
|
write_embeddings_batch(pg, rows)
|
|
finally:
|
|
pg.close()
|
|
except Exception as e:
|
|
log.error(f"pgvector write failed for {filepath.name}: {e}")
|
|
record_ingest_failure(filepath, f"pgvector write failed: {e}")
|
|
return 0
|
|
log.info(f"Indexed {len(rows)} chunks: {filepath.name}")
|
|
resolve_ingest_failure(source)
|
|
enqueue_stage2(source, text)
|
|
return len(rows)
|
|
|
|
|
|
def ingest_files(paths: list, embedder, state: dict) -> dict:
|
|
total = 0
|
|
for path in paths:
|
|
count = ingest_file(path, embedder)
|
|
total += count
|
|
if count > 0:
|
|
state[str(path)] = str(path.stat().st_mtime)
|
|
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
|
|
return state
|
|
|
|
|
|
def load_state() -> dict:
|
|
if Path(STATE_FILE).exists():
|
|
try:
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
|
|
def save_state(state: dict):
|
|
with open(STATE_FILE, "w") as f:
|
|
json.dump(state, f)
|
|
|
|
|
|
def get_changed_files(state: dict) -> list:
|
|
changed = []
|
|
root = Path(NEXTCLOUD_PATH)
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix.lower() not in SUPPORTED:
|
|
continue
|
|
if path.name.startswith((".", "~$", "~")):
|
|
continue
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
continue
|
|
if "Journal/Media" in str(path):
|
|
continue
|
|
if "Generative Design" in path.parts and "Processing" in path.parts:
|
|
continue
|
|
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
|
|
continue
|
|
if path.stat().st_size == 0:
|
|
continue
|
|
if state.get(str(path)) != str(path.stat().st_mtime):
|
|
changed.append(path)
|
|
return changed
|
|
|
|
|
|
def set_ingestion_state(**kwargs):
|
|
with ingestion_lock:
|
|
ingestion_state.update(kwargs)
|
|
|
|
|
|
def write_status(handler):
|
|
with ingestion_lock:
|
|
status = {
|
|
"running": True, "timestamp": time.time(),
|
|
"pending": handler.pending, "last_event": handler.last_event,
|
|
"ingestion": dict(ingestion_state),
|
|
}
|
|
try:
|
|
with open(STATUS_FILE, "w") as f:
|
|
json.dump(status, f)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def write_heartbeat():
|
|
try:
|
|
Path(HEARTBEAT_FILE).write_text(str(time.time()))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_ingestion(embedder):
|
|
state = load_state()
|
|
changed = get_changed_files(state)
|
|
if not changed:
|
|
log.info("No new or changed files — skipping ingestion.")
|
|
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
|
|
return
|
|
count = len(changed)
|
|
log.info(f"Found {count} new or changed files — starting ingestion...")
|
|
set_ingestion_state(
|
|
status="ingesting", message=f"Ingesting {count} file(s)...",
|
|
file_count=count, started_at=time.time(), finished_at=None, last_error="",
|
|
)
|
|
try:
|
|
state = ingest_files(changed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Last run: ingested {count} file(s) successfully",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Ingestion failed: {e}")
|
|
set_ingestion_state(
|
|
status="error", message=f"Ingestion exception: {e}",
|
|
last_error=str(e), finished_at=time.time(),
|
|
)
|
|
|
|
|
|
def start_ingestion_thread(embedder):
|
|
global ingestion_thread
|
|
with ingestion_lock:
|
|
if ingestion_thread and ingestion_thread.is_alive():
|
|
log.info("Ingestion already running — skipping.")
|
|
return
|
|
ingestion_thread = threading.Thread(
|
|
target=run_ingestion, args=(embedder,), daemon=True
|
|
)
|
|
ingestion_thread.start()
|
|
|
|
|
|
class IngestHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
self.pending = False
|
|
self.last_event = 0
|
|
|
|
def _should_ignore(self, path: Path) -> bool:
|
|
if path.name.startswith((".", "~$", "~")):
|
|
return True
|
|
if "Admin/Backups" in str(path) or "Backups" in path.parts:
|
|
return True
|
|
if "Journal/Media" in str(path):
|
|
return True
|
|
if "Generative Design" in path.parts and "Processing" in path.parts:
|
|
return True
|
|
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
|
|
return True
|
|
return False
|
|
|
|
def on_created(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: created {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: modified {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_moved(self, event):
|
|
if event.is_directory:
|
|
return
|
|
# Nextcloud WebDAV writes .part temp files then renames to final path.
|
|
# src_path is the .part file; dest_path is the final filename.
|
|
dest = Path(event.dest_path)
|
|
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
|
|
return
|
|
log.info(f"Event: moved -> {dest}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def on_closed(self, event):
|
|
# FileClosedEvent fires on the final file after Nextcloud completes write.
|
|
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
|
|
return
|
|
log.info(f"Event: closed {path}")
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
|
|
def main():
|
|
log.info("Aaron AI Watcher starting...")
|
|
log.info(f"Watching: {NEXTCLOUD_PATH}")
|
|
|
|
embedder = load_embedder()
|
|
|
|
log.info("Startup scan: checking for files missed since last run...")
|
|
state = load_state()
|
|
missed = get_changed_files(state)
|
|
if missed:
|
|
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
|
|
set_ingestion_state(
|
|
status="ingesting",
|
|
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
|
|
file_count=len(missed), started_at=time.time(),
|
|
)
|
|
try:
|
|
state = ingest_files(missed, embedder, state)
|
|
save_state(state)
|
|
set_ingestion_state(
|
|
status="idle",
|
|
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
|
|
finished_at=time.time(),
|
|
)
|
|
except Exception as e:
|
|
log.error(f"Startup recovery failed: {e}")
|
|
set_ingestion_state(status="error", message=str(e),
|
|
last_error=str(e), finished_at=time.time())
|
|
else:
|
|
log.info("Startup scan: no missed files.")
|
|
|
|
handler = IngestHandler()
|
|
observer = Observer()
|
|
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
|
|
observer.start()
|
|
log.info("Observer started.")
|
|
|
|
try:
|
|
while True:
|
|
write_heartbeat()
|
|
write_status(handler)
|
|
if handler.pending:
|
|
elapsed = time.time() - handler.last_event
|
|
if elapsed >= DEBOUNCE_SECONDS:
|
|
handler.pending = False
|
|
start_ingestion_thread(embedder)
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
log.info("KeyboardInterrupt — stopping.")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
log.info("Watcher stopped.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|