125 lines
3.6 KiB
Python
125 lines
3.6 KiB
Python
import time
|
|
import subprocess
|
|
import logging
|
|
import json
|
|
from pathlib import Path
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files"
|
|
INGEST_SCRIPT = "/home/aaron/aaronai/scripts/ingest.py"
|
|
PYTHON = "/home/aaron/aaronai/venv/bin/python3"
|
|
LOG_FILE = "/home/aaron/aaronai/watcher.log"
|
|
STATE_FILE = "/home/aaron/aaronai/watcher_state.json"
|
|
|
|
SUPPORTED = {'.pdf', '.docx', '.pptx', '.txt', '.md'}
|
|
DEBOUNCE_SECONDS = 120
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
|
|
def load_state():
|
|
if Path(STATE_FILE).exists():
|
|
with open(STATE_FILE) as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
def save_state(state):
|
|
with open(STATE_FILE, 'w') as f:
|
|
json.dump(state, f)
|
|
|
|
def get_changed_files():
|
|
state = load_state()
|
|
changed = []
|
|
root = Path(NEXTCLOUD_PATH)
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix.lower() not in SUPPORTED:
|
|
continue
|
|
if path.name.startswith('.') or path.name.startswith('~$'):
|
|
continue
|
|
mtime = str(path.stat().st_mtime)
|
|
key = str(path)
|
|
if state.get(key) != mtime:
|
|
changed.append(path)
|
|
return changed, state
|
|
|
|
def run_ingestion():
|
|
changed, state = get_changed_files()
|
|
if not changed:
|
|
logging.info("No new or changed files detected — skipping ingestion.")
|
|
return
|
|
|
|
logging.info(f"Found {len(changed)} new or changed files — starting ingestion...")
|
|
try:
|
|
result = subprocess.run(
|
|
[PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=1800
|
|
)
|
|
if result.returncode == 0:
|
|
# Update state with new mtimes
|
|
root = Path(NEXTCLOUD_PATH)
|
|
for path in root.rglob("*"):
|
|
if path.is_file() and path.suffix.lower() in SUPPORTED:
|
|
state[str(path)] = str(path.stat().st_mtime)
|
|
save_state(state)
|
|
logging.info("Ingestion complete. State updated.")
|
|
else:
|
|
logging.error(f"Ingestion error: {result.stderr}")
|
|
except subprocess.TimeoutExpired:
|
|
logging.error("Ingestion timed out.")
|
|
except Exception as e:
|
|
logging.error(f"Ingestion failed: {e}")
|
|
|
|
class IngestHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
self.pending = False
|
|
self.last_event = 0
|
|
|
|
def on_any_event(self, event):
|
|
if event.is_directory:
|
|
return
|
|
path = Path(event.src_path)
|
|
if path.suffix.lower() not in SUPPORTED:
|
|
return
|
|
if path.name.startswith('.') or path.name.startswith('~$'):
|
|
return
|
|
if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
|
|
return
|
|
self.pending = True
|
|
self.last_event = time.time()
|
|
|
|
def main():
|
|
logging.info("Aaron AI Watcher starting...")
|
|
logging.info(f"Watching: {NEXTCLOUD_PATH}")
|
|
|
|
handler = IngestHandler()
|
|
observer = Observer()
|
|
observer.schedule(handler, NEXTCLOUD_PATH, recursive=True)
|
|
observer.start()
|
|
|
|
try:
|
|
while True:
|
|
if handler.pending:
|
|
elapsed = time.time() - handler.last_event
|
|
if elapsed >= DEBOUNCE_SECONDS:
|
|
handler.pending = False
|
|
run_ingestion()
|
|
time.sleep(5)
|
|
except KeyboardInterrupt:
|
|
observer.stop()
|
|
observer.join()
|
|
logging.info("Watcher stopped.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|