watcher.py: in-process ingest, embedder loaded once at startup, startup recovery, heartbeat, no duplicate logging

This commit is contained in:
2026-04-30 16:42:44 +00:00
parent 2fb50cce71
commit 2b3c2380a0
+282 -115
View File
@@ -1,62 +1,214 @@
"""
Aaron AI Watcher — Stage 1 of the encoding pipeline.
Watches the Nextcloud directory for new or changed files.
On detection, chunks + embeds documents in-process (no subprocess),
then enqueues to stage_2_queue for async cascade processing.
Design principles:
- Embedding model loaded ONCE at startup, reused across all ingest runs
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
- Missed-file recovery on startup — ingests anything new since last state
- Heartbeat file updated every loop tick — enables external health monitoring
- Parity principle: no filtering, no decisions, faithful capture
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
"""
import os
import time import time
import subprocess
import logging
import json import json
import hashlib
import logging
import threading import threading
from pathlib import Path from pathlib import Path
import psycopg2
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" from docx import Document as DocxDocument
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py" from pypdf import PdfReader
PYTHON = "/home/aaron/aaronai/venv/bin/python3" from pptx import Presentation
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'} load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120 DEBOUNCE_SECONDS = 120
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN")
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
format='%(asctime)s - %(message)s', format="%(asctime)s [watcher] %(levelname)s %(message)s",
handlers=[ handlers=[logging.FileHandler(LOG_FILE)],
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
) )
log = logging.getLogger("watcher")
ingestion_lock = threading.Lock()
ingestion_state = { ingestion_state = {
"status": "idle", "status": "idle", "message": "", "file_count": 0,
"message": "", "started_at": None, "finished_at": None, "last_error": "",
"file_count": 0,
"started_at": None,
"finished_at": None,
"last_error": "",
} }
ingestion_lock = threading.Lock()
ingestion_thread = None ingestion_thread = None
def set_ingestion_state(**kwargs): def load_embedder():
with ingestion_lock: log.info(f"Loading embedding model: {EMBED_MODEL}")
ingestion_state.update(kwargs) model = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
return model
def load_state(): def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"):
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
try:
embeddings = embedder.encode(chunks).tolist()
except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}")
return 0
source = filepath.name
try:
pg = get_pg()
cur = pg.cursor()
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = make_chunk_id(filepath, i)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}")
return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
enqueue_stage2(source, text)
return len(chunks)
def ingest_files(paths: list, embedder, state: dict) -> dict:
total = 0
for path in paths:
count = ingest_file(path, embedder)
total += count
state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state
def load_state() -> dict:
if Path(STATE_FILE).exists(): if Path(STATE_FILE).exists():
with open(STATE_FILE) as f: try:
return json.load(f) with open(STATE_FILE) as f:
return json.load(f)
except Exception:
pass
return {} return {}
def save_state(state): def save_state(state: dict):
with open(STATE_FILE, 'w') as f: with open(STATE_FILE, "w") as f:
json.dump(state, f) json.dump(state, f)
def get_changed_files(): def get_changed_files(state: dict) -> list:
state = load_state()
changed = [] changed = []
root = Path(NEXTCLOUD_PATH) root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"): for path in root.rglob("*"):
@@ -64,90 +216,87 @@ def get_changed_files():
continue continue
if path.suffix.lower() not in SUPPORTED: if path.suffix.lower() not in SUPPORTED:
continue continue
if path.name.startswith('.') or path.name.startswith('~$'): if path.name.startswith((".", "~$")):
continue continue
mtime = str(path.stat().st_mtime) if "Admin/Backups" in str(path) or "Backups" in path.parts:
key = str(path) continue
if state.get(key) != mtime: if "Journal/Media" in str(path):
continue
if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path) changed.append(path)
return changed, state return changed
def run_ingestion(): def set_ingestion_state(**kwargs):
changed, state = get_changed_files() with ingestion_lock:
ingestion_state.update(kwargs)
def write_status(handler):
with ingestion_lock:
status = {
"running": True, "timestamp": time.time(),
"pending": handler.pending, "last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
try:
with open(STATUS_FILE, "w") as f:
json.dump(status, f)
except Exception:
pass
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def run_ingestion(embedder):
state = load_state()
changed = get_changed_files(state)
if not changed: if not changed:
logging.info("No new or changed files detected — skipping ingestion.") log.info("No new or changed files — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0) set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return return
count = len(changed) count = len(changed)
logging.info(f"Found {count} new or changed files — starting ingestion...") log.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state( set_ingestion_state(
status="ingesting", status="ingesting", message=f"Ingesting {count} file(s)...",
message=f"Ingesting {count} file(s)...", file_count=count, started_at=time.time(), finished_at=None, last_error="",
file_count=count,
started_at=time.time(),
finished_at=None,
last_error="",
) )
try: try:
result = subprocess.run( state = ingest_files(changed, embedder, state)
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH], save_state(state)
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in SUPPORTED:
state[str(path)] = str(path.stat().st_mtime)
save_state(state)
logging.info("Ingestion complete. State updated.")
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
else:
logging.error(f"Ingestion error: {result.stderr}")
set_ingestion_state(
status="error",
message="Ingestion failed — see log",
last_error=result.stderr[-300:],
finished_at=time.time(),
)
except subprocess.TimeoutExpired:
logging.error("Ingestion timed out.")
set_ingestion_state( set_ingestion_state(
status="error", status="idle",
message="Ingestion timed out (>30 min)", message=f"Last run: ingested {count} file(s) successfully",
last_error="TimeoutExpired",
finished_at=time.time(), finished_at=time.time(),
) )
except Exception as e: except Exception as e:
logging.error(f"Ingestion failed: {e}") log.error(f"Ingestion failed: {e}")
set_ingestion_state( set_ingestion_state(
status="error", status="error", message=f"Ingestion exception: {e}",
message=f"Ingestion exception: {e}", last_error=str(e), finished_at=time.time(),
last_error=str(e),
finished_at=time.time(),
) )
def start_ingestion_thread(): def start_ingestion_thread(embedder):
global ingestion_thread global ingestion_thread
if ingestion_thread and ingestion_thread.is_alive(): with ingestion_lock:
logging.info("Ingestion already running — skipping.") if ingestion_thread and ingestion_thread.is_alive():
return log.info("Ingestion already running — skipping.")
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True) return
ingestion_thread.start() ingestion_thread = threading.Thread(
target=run_ingestion, args=(embedder,), daemon=True
)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler): class IngestHandler(FileSystemEventHandler):
def __init__(self): def __init__(self):
self.pending = False self.pending = False
self.last_event = 0 self.last_event = 0
def on_any_event(self, event): def on_any_event(self, event):
@@ -156,54 +305,72 @@ class IngestHandler(FileSystemEventHandler):
path = Path(event.src_path) path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED: if path.suffix.lower() not in SUPPORTED:
return return
if path.name.startswith('.') or path.name.startswith('~$'): if path.name.startswith((".", "~$")):
return return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts: if "Admin/Backups" in str(path) or "Backups" in path.parts:
return return
if 'Journal/Media' in str(path): if "Journal/Media" in str(path):
return return
if event.event_type not in ('modified', 'created', 'moved'): if event.event_type not in ("modified", "created", "moved"):
return return
logging.info(f"Event: {event.event_type} {event.src_path}") log.info(f"Event: {event.event_type} {event.src_path}")
self.pending = True self.pending = True
self.last_event = time.time() self.last_event = time.time()
def write_status(handler):
with ingestion_lock:
status = {
"running": True,
"timestamp": time.time(),
"pending": handler.pending,
"last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
with open(STATUS_FILE, 'w') as f:
json.dump(status, f)
def main(): def main():
logging.info("Aaron AI Watcher starting...") log.info("Aaron AI Watcher starting...")
logging.info(f"Watching: {NEXTCLOUD_PATH}") log.info(f"Watching: {NEXTCLOUD_PATH}")
handler = IngestHandler() embedder = load_embedder()
log.info("Startup scan: checking for files missed since last run...")
state = load_state()
missed = get_changed_files(state)
if missed:
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
set_ingestion_state(
status="ingesting",
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
file_count=len(missed), started_at=time.time(),
)
try:
state = ingest_files(missed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Startup recovery failed: {e}")
set_ingestion_state(status="error", message=str(e),
last_error=str(e), finished_at=time.time())
else:
log.info("Startup scan: no missed files.")
handler = IngestHandler()
observer = Observer() observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True) observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start() observer.start()
log.info("Observer started.")
try: try:
while True: while True:
write_heartbeat()
write_status(handler) write_status(handler)
if handler.pending: if handler.pending:
elapsed = time.time() - handler.last_event elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS: if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False handler.pending = False
start_ingestion_thread() start_ingestion_thread(embedder)
time.sleep(5) time.sleep(5)
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("KeyboardInterrupt — stopping.")
observer.stop() observer.stop()
observer.join() observer.join()
logging.info("Watcher stopped.") log.info("Watcher stopped.")
if __name__ == "__main__": if __name__ == "__main__":