Files
aaronAI/scripts/watcher.py
T

211 lines
6.0 KiB
Python

import time
import subprocess
import logging
import json
import threading
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
LOG_FILE = "/home/aaron/aaronai/watcher.log"
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
DEBOUNCE_SECONDS = 120
STATUS_FILE = "/home/aaron/aaronai/watcher_status.json"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
ingestion_state = {
"status": "idle",
"message": "",
"file_count": 0,
"started_at": None,
"finished_at": None,
"last_error": "",
}
ingestion_lock = threading.Lock()
ingestion_thread = None
def set_ingestion_state(**kwargs):
with ingestion_lock:
ingestion_state.update(kwargs)
def load_state():
if Path(STATE_FILE).exists():
with open(STATE_FILE) as f:
return json.load(f)
return {}
def save_state(state):
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
def get_changed_files():
state = load_state()
changed = []
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix.lower() not in SUPPORTED:
continue
if path.name.startswith('.') or path.name.startswith('~$'):
continue
mtime = str(path.stat().st_mtime)
key = str(path)
if state.get(key) != mtime:
changed.append(path)
return changed, state
def run_ingestion():
changed, state = get_changed_files()
if not changed:
logging.info("No new or changed files detected — skipping ingestion.")
set_ingestion_state(status="idle", message="No changes detected", file_count=0)
return
count = len(changed)
logging.info(f"Found {count} new or changed files — starting ingestion...")
set_ingestion_state(
status="ingesting",
message=f"Ingesting {count} file(s)...",
file_count=count,
started_at=time.time(),
finished_at=None,
last_error="",
)
try:
result = subprocess.run(
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
capture_output=True,
text=True,
timeout=1800
)
if result.returncode == 0:
root = Path(NEXTCLOUD_PATH)
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in SUPPORTED:
state[str(path)] = str(path.stat().st_mtime)
save_state(state)
logging.info("Ingestion complete. State updated.")
set_ingestion_state(
status="idle",
message=f"Last run: ingested {count} file(s) successfully",
finished_at=time.time(),
)
else:
logging.error(f"Ingestion error: {result.stderr}")
set_ingestion_state(
status="error",
message="Ingestion failed — see log",
last_error=result.stderr[-300:],
finished_at=time.time(),
)
except subprocess.TimeoutExpired:
logging.error("Ingestion timed out.")
set_ingestion_state(
status="error",
message="Ingestion timed out (>30 min)",
last_error="TimeoutExpired",
finished_at=time.time(),
)
except Exception as e:
logging.error(f"Ingestion failed: {e}")
set_ingestion_state(
status="error",
message=f"Ingestion exception: {e}",
last_error=str(e),
finished_at=time.time(),
)
def start_ingestion_thread():
global ingestion_thread
if ingestion_thread and ingestion_thread.is_alive():
logging.info("Ingestion already running — skipping.")
return
ingestion_thread = threading.Thread(target=run_ingestion, daemon=True)
ingestion_thread.start()
class IngestHandler(FileSystemEventHandler):
def __init__(self):
self.pending = False
self.last_event = 0
def on_any_event(self, event):
if event.is_directory:
return
path = Path(event.src_path)
if path.suffix.lower() not in SUPPORTED:
return
if path.name.startswith('.') or path.name.startswith('~$'):
return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
return
if 'Journal/Media' in str(path):
return
if event.event_type not in ('modified', 'created', 'moved'):
return
logging.info(f"Event: {event.event_type} {event.src_path}")
self.pending = True
self.last_event = time.time()
def write_status(handler):
with ingestion_lock:
status = {
"running": True,
"timestamp": time.time(),
"pending": handler.pending,
"last_event": handler.last_event,
"ingestion": dict(ingestion_state),
}
with open(STATUS_FILE, 'w') as f:
json.dump(status, f)
def main():
logging.info("Aaron AI Watcher starting...")
logging.info(f"Watching: {NEXTCLOUD_PATH}")
handler = IngestHandler()
observer = Observer()
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
observer.start()
try:
while True:
write_status(handler)
if handler.pending:
elapsed = time.time() - handler.last_event
if elapsed >= DEBOUNCE_SECONDS:
handler.pending = False
start_ingestion_thread()
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
observer.join()
logging.info("Watcher stopped.")
if __name__ == "__main__":
main()