import time import subprocess import logging import json from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py" PYTHON = "/home/aaron/aaronai/venv/bin/python3" LOG_FILE = "/home/aaron/aaronai/watcher.log" STATE_FILE = "/home/aaron/aaronai/watcher_state.json" SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'} DEBOUNCE_SECONDS = 120 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) def load_state(): if Path(STATE_FILE).exists(): with open(STATE_FILE) as f: return json.load(f) return {} def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f) def get_changed_files(): state = load_state() changed = [] root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): if path.is_dir(): continue if path.suffix.lower() not in SUPPORTED: continue if path.name.startswith('.') or path.name.startswith('~$'): continue mtime = str(path.stat().st_mtime) key = str(path) if state.get(key) != mtime: changed.append(path) return changed, state def run_ingestion(): changed, state = get_changed_files() if not changed: logging.info("No new or changed files detected — skipping ingestion.") return logging.info(f"Found {len(changed)} new or changed files — starting ingestion...") try: result = subprocess.run( [PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH], capture_output=True, text=True, timeout=600 ) if result.returncode == 0: # Update state with new mtimes root = Path(NEXTCLOUD_PATH) for path in root.rglob("*"): if path.is_file() and path.suffix.lower() in SUPPORTED: state[str(path)] = str(path.stat().st_mtime) save_state(state) logging.info("Ingestion complete. State updated.") else: logging.error(f"Ingestion error: {result.stderr}") except subprocess.TimeoutExpired: logging.error("Ingestion timed out.") except Exception as e: logging.error(f"Ingestion failed: {e}") class IngestHandler(FileSystemEventHandler): def __init__(self): self.pending = False self.last_event = 0 def on_any_event(self, event): if event.is_directory: return path = Path(event.src_path) if path.suffix.lower() not in SUPPORTED: return if path.name.startswith('.') or path.name.startswith('~$'): return self.pending = True self.last_event = time.time() def main(): logging.info("Aaron AI Watcher starting...") logging.info(f"Watching: {NEXTCLOUD_PATH}") handler = IngestHandler() observer = Observer() observer.schedule(handler, NEXTCLOUD_PATH, recursive=True) observer.start() try: while True: if handler.pending: elapsed = time.time() - handler.last_event if elapsed >= DEBOUNCE_SECONDS: handler.pending = False run_ingestion() time.sleep(5) except KeyboardInterrupt: observer.stop() observer.join() logging.info("Watcher stopped.") if __name__ == "__main__": main()