Files
aaronAI/scripts/watcher.py
T
aaron 465f2f725b Code review fixes: CV pinning, F1 (excluded_sources), F14 (50KB truncation), F37
- api.py: strip CV pinning workaround (parity violation, see architecture doc)
- dream.py: F1 — retrieve_graphiti() now accepts excluded_sources, over-fetches
  3x and filters in-process. Was silently dropping the parameter; would have
  confounded E3 with broken cross-stage exclusion in Graphiti arm.
- watcher.py + ingest.py: F14 — drop full_text[:50000] truncation. Was
  propagating through entire cascade. Postgres TEXT can hold up to 1GB.
- corpus_integrity.py: F37 — same truncation, third path now clean.

Backups: api.py.bak.*, dream.py.bak.*, watcher.py.bak.*, ingest.py.bak.*,
corpus_integrity.py.bak.* timestamped pre-fix.

Re-cascaded Shop Class as Soulcraft (only already-cascaded source affected
by F14, 414KB).
2026-05-01 02:26:37 +00:00

449 lines
14 KiB
Python

"""
Aaron AI Watcher — Stage 1 of the encoding pipeline.
Watches the Nextcloud directory for new or changed files.
On detection, chunks + embeds documents in-process (no subprocess),
then enqueues to stage_2_queue for async cascade processing.
Design principles:
- Embedding model loaded ONCE at startup, reused across all ingest runs
- In-process ingest (no subprocess) — eliminates per-run model reload memory spike
- Missed-file recovery on startup — ingests anything new since last state
- Heartbeat file updated every loop tick — enables external health monitoring
- Parity principle: no filtering, no decisions, faithful capture
- Does NOT enqueue to stage_2_queue during bulk migration (SKIP_STAGE2_ENQUEUE env var)
Architecture: Stage 1 (watcher) -> stage_2_queue -> Stage 2 (Mistral) -> stage_3_queue -> Stage 3 (Graphiti)
"""
import os
import time
import json
import hashlib
import logging
import threading
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from docx import Document as DocxDocument
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
HEARTBEAT_FILE = "/home/aaron/aaronai/watcher_heartbeat"
SUPPORTED = {".pdf", ".docx", ".pptx", ".txt", ".md"}
DEBOUNCE_SECONDS = 120
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
EMBED_MODEL = "all-MiniLM-L6-v2"
PG_DSN = os.getenv("PG_DSN")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [watcher] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE)],
)
log = logging.getLogger("watcher")
ingestion_lock = threading.Lock()
ingestion_state = {
"status": "idle", "message": "", "file_count": 0,
"started_at": None, "finished_at": None, "last_error": "",
}
ingestion_thread = None
def load_embedder():
log.info(f"Loading embedding model: {EMBED_MODEL}")
model = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
return model
def get_pg():
return psycopg2.connect(PG_DSN)
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
try:
if suffix == ".docx":
doc = DocxDocument(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
elif suffix == ".pdf":
reader = PdfReader(path)
return "".join(
page.extract_text() + "\n"
for page in reader.pages if page.extract_text()
)
elif suffix == ".pptx":
prs = Presentation(path)
return "\n".join(
shape.text for slide in prs.slides
for shape in slide.shapes
if hasattr(shape, "text") and shape.text.strip()
)
elif suffix in {".txt", ".md"}:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as e:
log.warning(f"Text extraction failed for {path.name}: {e}")
record_ingest_failure(path, f"Text extraction failed: {e}")
return ""
def chunk_text(text: str) -> list:
words = text.split()
chunks = []
start = 0
while start < len(words):
chunk = " ".join(words[start:start + CHUNK_SIZE])
if chunk.strip():
chunks.append(chunk)
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunks
def make_chunk_id(filepath: Path, chunk_index: int) -> str:
return hashlib.md5(str(filepath).encode()).hexdigest()[:8] + f"_{chunk_index}"
def enqueue_stage2(source: str, full_text: str):
if os.getenv("SKIP_STAGE2_ENQUEUE"):
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text, len(full_text)))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Stage 2 enqueue failed (non-fatal): {e}")
def record_ingest_failure(filepath: Path, error: str):
"""Write extraction or ingest failure to ingest_failures table for UI visibility."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO ingest_failures (source, filepath, error, retry_count, first_failed_at, last_failed_at)
VALUES (%s, %s, %s, 0, NOW(), NOW())
ON CONFLICT (source) DO UPDATE SET
error = EXCLUDED.error,
retry_count = ingest_failures.retry_count + 1,
last_failed_at = NOW(),
resolved = FALSE
""", (filepath.name, str(filepath), error[:1000]))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not record ingest failure (non-fatal): {e}")
def resolve_ingest_failure(source: str):
"""Mark a previously failed file as resolved after successful ingest."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE ingest_failures SET resolved = TRUE WHERE source = %s", (source,))
pg.commit()
pg.close()
except Exception as e:
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")):
return 0
if filepath.suffix.lower() not in SUPPORTED:
return 0
text = extract_text(filepath)
if not text.strip():
return 0
chunks = chunk_text(text)
if not chunks:
return 0
try:
embeddings = embedder.encode(chunks).tolist()
except Exception as e:
log.error(f"Embedding failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"Embedding failed: {e}")
return 0
source = filepath.name
try:
pg = get_pg()
cur = pg.cursor()
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
chunk_id = make_chunk_id(filepath, i)
cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
ON CONFLICT (id) DO UPDATE SET
document = EXCLUDED.document,
embedding = EXCLUDED.embedding,
source = EXCLUDED.source,
metadata = EXCLUDED.metadata
""", (chunk_id, chunk, embedding, source, "document",
json.dumps({"source": source, "filepath": str(filepath)})))
pg.commit()
pg.close()
except Exception as e:
log.error(f"pgvector write failed for {filepath.name}: {e}")
record_ingest_failure(filepath, f"pgvector write failed: {e}")
return 0
log.info(f"Indexed {len(chunks)} chunks: {filepath.name}")
resolve_ingest_failure(source)
enqueue_stage2(source, text)
return len(chunks)
def ingest_files(paths: list, embedder, state: dict) -> dict:
total = 0
for path in paths:
count = ingest_file(path, embedder)
total += count
state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state
def load_state() -> dict:
if Path(STATE_FILE).exists():
try:
with open(STATE_FILE) as f:
return json.load(f)
except Exception:
pass
return {}
def save_state(state: dict):
with open(STATE_FILE, "w") as f:
json.dump(state, f)
def get_changed_files(state: dict) -> list:
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith((".", "~$")):
continue
if "Admin/Backups" in str(path) or "Backups" in path.parts:
continue
if "Journal/Media" in str(path):
continue
if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path)
return changed
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def write_status(handler):
with ingestion_lock:
status = {
"running": True, "timestamp": time.time(),
"pending": handler.pending, "last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
try:
with open(STATUS_FILE, "w") as f:
json.dump(status, f)
except Exception:
pass
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def run_ingestion(embedder):
state = load_state()
changed = get_changed_files(state)
if not changed:
log.info("No new or changed files — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
log.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting", message=f"Ingesting {count} file(s)...",
file_count=count, started_at=time.time(), finished_at=None, last_error="",
)
try:
state = ingest_files(changed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error", message=f"Ingestion exception: {e}",
last_error=str(e), finished_at=time.time(),
)
def start_ingestion_thread(embedder):
global ingestion_thread
with ingestion_lock:
if ingestion_thread and ingestion_thread.is_alive():
log.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(
target=run_ingestion, args=(embedder,), daemon=True
)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def _should_ignore(self, path: Path) -> bool:
if path.name.startswith((".", "~$")):
return True
if "Admin/Backups" in str(path) or "Backups" in path.parts:
return True
if "Journal/Media" in str(path):
return True
return False
def on_created(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: created {path}")
self.pending = True
self.last_event = time.time()
def on_modified(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: modified {path}")
self.pending = True
self.last_event = time.time()
def on_moved(self, event):
if event.is_directory:
return
# Nextcloud WebDAV writes .part temp files then renames to final path.
# src_path is the .part file; dest_path is the final filename.
dest = Path(event.dest_path)
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
return
log.info(f"Event: moved -> {dest}")
self.pending = True
self.last_event = time.time()
def on_closed(self, event):
# FileClosedEvent fires on the final file after Nextcloud completes write.
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED or self._should_ignore(path):
return
log.info(f"Event: closed {path}")
self.pending = True
self.last_event = time.time()
def main():
log.info("Aaron AI Watcher starting...")
log.info(f"Watching: {NEXTCLOUD_PATH}")
embedder = load_embedder()
log.info("Startup scan: checking for files missed since last run...")
state = load_state()
missed = get_changed_files(state)
if missed:
log.info(f"Startup recovery: {len(missed)} missed file(s) — ingesting now.")
set_ingestion_state(
status="ingesting",
message=f"Startup recovery: ingesting {len(missed)} missed file(s)...",
file_count=len(missed), started_at=time.time(),
)
try:
state = ingest_files(missed, embedder, state)
save_state(state)
set_ingestion_state(
status="idle",
message=f"Startup recovery complete: {len(missed)} file(s) ingested.",
finished_at=time.time(),
)
except Exception as e:
log.error(f"Startup recovery failed: {e}")
set_ingestion_state(status="error", message=str(e),
last_error=str(e), finished_at=time.time())
else:
log.info("Startup scan: no missed files.")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
log.info("Observer started.")
try:
while True:
write_heartbeat()
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread(embedder)
time.sleep(5)
except KeyboardInterrupt:
log.info("KeyboardInterrupt — stopping.")
observer.stop()
observer.join()
log.info("Watcher stopped.")
if __name__ == "__main__":
main()