import time import subprocess import logging import json import threading from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py" PYTHON = "/home/aaron/aaronai/venv/bin/python3" LOG_FILE = "/home/aaron/aaronai/watcher.log" STATE_FILE = "/home/aaron/aaronai/watcher_state.json" SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'} DEBOUNCE_SECONDS = 120 STATUS_FILE = "/home/aaron/aaronai/watcher_status.json" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) ingestion_state = { "status": "idle", "message": "", "file_count": 0, "started_at": None, "finished_at": None, "last_error": "", } ingestion_lock = threading.Lock() ingestion_thread = None def set_ingestion_state(**kwargs): with ingestion_lock: ingestion_state.update(kwargs) def load_state(): if Path(STATE_FILE).exists(): with open(STATE_FILE) as f: return json.load(f) return {} def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f) def get_changed_files(): state = load_state() changed = [] root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): if path.is_dir(): continue if path.suffix.lower() not in SUPPORTED: continue if path.name.startswith('.') or path.name.startswith('~$'): continue mtime = str(path.stat().st_mtime) key = str(path) if state.get(key) != mtime: changed.append(path) return changed, state def run_ingestion(): changed, state = get_changed_files() if not changed: logging.info("No new or changed files detected — skipping ingestion.") set_ingestion_state(status="idle", message="No changes detected", file_count=0) return count = len(changed) logging.info(f"Found {count} new or changed files — starting ingestion...") set_ingestion_state( status="ingesting", message=f"Ingesting {count} file(s)...", file_count=count, started_at=time.time(), finished_at=None, last_error="", ) try: result = subprocess.run( [PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH], capture_output=True, text=True, timeout=1800 ) if result.returncode == 0: root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): if path.is_file() and path.suffix.lower() in SUPPORTED: state[str(path)] = str(path.stat().st_mtime) save_state(state) logging.info("Ingestion complete. State updated.") set_ingestion_state( status="idle", message=f"Last run: ingested {count} file(s) successfully", finished_at=time.time(), ) else: logging.error(f"Ingestion error: {result.stderr}") set_ingestion_state( status="error", message="Ingestion failed — see log", last_error=result.stderr[-300:], finished_at=time.time(), ) except subprocess.TimeoutExpired: logging.error("Ingestion timed out.") set_ingestion_state( status="error", message="Ingestion timed out (>30 min)", last_error="TimeoutExpired", finished_at=time.time(), ) except Exception as e: logging.error(f"Ingestion failed: {e}") set_ingestion_state( status="error", message=f"Ingestion exception: {e}", last_error=str(e), finished_at=time.time(), ) def start_ingestion_thread(): global ingestion_thread if ingestion_thread and ingestion_thread.is_alive(): logging.info("Ingestion already running — skipping.") return ingestion_thread = threading.Thread(target=run_ingestion, daemon=True) ingestion_thread.start() class IngestHandler(FileSystemEventHandler): def __init__(self): self.pending = False self.last_event = 0 def on_any_event(self, event): if event.is_directory: return path = Path(event.src_path) if path.suffix.lower() not in SUPPORTED: return if path.name.startswith('.') or path.name.startswith('~$'): return if 'Admin/Backups' in str(path) or 'Backups' in path.parts: return if 'Journal/Media' in str(path): return if event.event_type not in ('modified', 'created', 'moved'): return logging.info(f"Event: {event.event_type} {event.src_path}") self.pending = True self.last_event = time.time() def write_status(handler): with ingestion_lock: status = { "running": True, "timestamp": time.time(), "pending": handler.pending, "last_event": handler.last_event, "ingestion": dict(ingestion_state), } with open(STATUS_FILE, 'w') as f: json.dump(status, f) def main(): logging.info("Aaron AI Watcher starting...") logging.info(f"Watching: {NEXTCLOUD_PATH}") handler = IngestHandler() observer = Observer() observer.schedule(handler, NEXTCLOUD_PATH, recursive=True) observer.start() try: while True: write_status(handler) if handler.pending: elapsed = time.time() - handler.last_event if elapsed >= DEBOUNCE_SECONDS: handler.pending = False start_ingestion_thread() time.sleep(5) except KeyboardInterrupt: observer.stop() observer.join() logging.info("Watcher stopped.") if __name__ == "__main__": main()