diff --git a/scripts/sweep_orphans.py b/scripts/sweep_orphans.py new file mode 100644 index 0000000..a41337e --- /dev/null +++ b/scripts/sweep_orphans.py @@ -0,0 +1,123 @@ +"""One-off: remove embeddings rows that no longer correspond to a file on disk. + +Two passes: + 1. Modern rows (metadata.filepath set): check each filepath, delete if missing. + 2. Legacy rows (metadata.filepath null): build a set of all basenames present + anywhere under NEXTCLOUD_PATH, then delete rows whose `source` basename + isn't in that set. + +Default mode is a dry-run (counts + sample paths, no writes). Pass --apply to +actually delete. +""" + +import os +import sys +from pathlib import Path +from collections import defaultdict + +from dotenv import load_dotenv +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +import psycopg2 + +NEXTCLOUD_PATH = Path("/home/aaron/nextcloud/data/data/aaron/files") +APPLY = "--apply" in sys.argv + + +def get_pg(): + return psycopg2.connect(os.environ["PG_DSN"]) + + +def scan_modern_orphans(): + """Rows with metadata.filepath whose file doesn't exist on disk.""" + pg = get_pg() + cur = pg.cursor() + cur.execute( + "SELECT id, source, metadata->>'filepath' AS filepath " + "FROM embeddings WHERE metadata->>'filepath' IS NOT NULL" + ) + orphans = [] + by_source = defaultdict(int) + for row in cur.fetchall(): + fp = row[2] + if fp and not Path(fp).exists(): + orphans.append(row) + by_source[row[1]] += 1 + pg.close() + return orphans, by_source + + +def scan_legacy_orphans(): + """Rows without metadata.filepath whose basename isn't anywhere under + NEXTCLOUD_PATH. Restricted to type='document' so conversations and memory + snapshots (which are synthetic sources, not files on disk) aren't flagged + as orphans. Walks the filesystem once to build the basename set.""" + print(f" walking {NEXTCLOUD_PATH} to build basename index...") + on_disk = set() + for p in NEXTCLOUD_PATH.rglob("*"): + if p.is_file(): + on_disk.add(p.name) + print(f" {len(on_disk):,} files on disk") + + pg = get_pg() + cur = pg.cursor() + cur.execute( + "SELECT id, source FROM embeddings " + "WHERE metadata->>'filepath' IS NULL AND type = 'document'" + ) + orphans = [] + by_source = defaultdict(int) + for row in cur.fetchall(): + if row[1] not in on_disk: + orphans.append(row) + by_source[row[1]] += 1 + pg.close() + return orphans, by_source + + +def delete_rows(ids): + pg = get_pg() + cur = pg.cursor() + cur.execute("DELETE FROM embeddings WHERE id = ANY(%s)", (list(ids),)) + deleted = cur.rowcount + pg.commit() + pg.close() + return deleted + + +def main(): + print(f"Mode: {'APPLY (destructive)' if APPLY else 'DRY-RUN (no writes)'}") + print(f"Target: {NEXTCLOUD_PATH}") + print() + + print("Pass 1 — modern rows (metadata.filepath set):") + modern, modern_by_src = scan_modern_orphans() + print(f" {len(modern):,} orphan rows across {len(modern_by_src):,} files") + for src, n in sorted(modern_by_src.items(), key=lambda kv: -kv[1])[:10]: + print(f" {n:>4} chunks — {src}") + print() + + print("Pass 2 — legacy rows (no metadata.filepath):") + legacy, legacy_by_src = scan_legacy_orphans() + print(f" {len(legacy):,} orphan rows across {len(legacy_by_src):,} files") + for src, n in sorted(legacy_by_src.items(), key=lambda kv: -kv[1])[:10]: + print(f" {n:>4} chunks — {src}") + print() + + total = len(modern) + len(legacy) + if total == 0: + print("Nothing to delete.") + return + + if not APPLY: + print(f"Dry-run only. Re-run with --apply to delete {total:,} rows.") + return + + print(f"Deleting {total:,} orphan rows...") + n1 = delete_rows([r[0] for r in modern]) if modern else 0 + n2 = delete_rows([r[0] for r in legacy]) if legacy else 0 + print(f" modern: {n1:,} legacy: {n2:,} total: {n1 + n2:,}") + + +if __name__ == "__main__": + main() diff --git a/scripts/watcher.py b/scripts/watcher.py index 2148949..cdf3b7f 100644 --- a/scripts/watcher.py +++ b/scripts/watcher.py @@ -123,6 +123,42 @@ def resolve_ingest_failure(source: str): log.warning(f"Could not resolve ingest failure record (non-fatal): {e}") +def delete_embeddings_for_path(filepath: Path): + """Remove embeddings rows for a file that no longer exists. Matches by + metadata.filepath so multi-folder same-basename files don't collide. + Legacy rows without filepath metadata are left alone — they get cleaned + by sweep_orphans.py.""" + try: + pg = get_pg() + try: + cur = pg.cursor() + cur.execute( + "DELETE FROM embeddings WHERE metadata->>'filepath' = %s", + (str(filepath),), + ) + deleted = cur.rowcount + pg.commit() + if deleted: + log.info(f"Deleted {deleted} chunks for removed file: {filepath}") + finally: + pg.close() + except Exception as e: + log.warning(f"Could not delete embeddings for {filepath} (non-fatal): {e}") + + +def remove_from_state(filepath: Path): + """Drop a deleted file from watcher_state.json so it isn't carried as + 'known mtime' indefinitely.""" + try: + state = load_state() + key = str(filepath) + if key in state: + del state[key] + save_state(state) + except Exception as e: + log.warning(f"Could not update state for deleted {filepath} (non-fatal): {e}") + + IGNORED_TOP_FOLDERS = {"Drafts"} @@ -350,15 +386,47 @@ class IngestHandler(FileSystemEventHandler): def on_moved(self, event): if event.is_directory: return + src = Path(event.src_path) + dest = Path(event.dest_path) + # If destination is outside NEXTCLOUD_PATH (e.g., Nextcloud trashbin at + # /home/aaron/nextcloud/data/data/aaron/files_trashbin/), treat as a + # delete — the file is no longer in the watched corpus. + try: + dest.relative_to(NEXTCLOUD_PATH) + except ValueError: + if src.suffix.lower() in SUPPORTED: + log.info(f"Event: moved out of tree {src} -> {dest}") + threading.Thread( + target=lambda: ( + delete_embeddings_for_path(src), + remove_from_state(src), + ), + daemon=True, + ).start() + return # Nextcloud WebDAV writes .part temp files then renames to final path. # src_path is the .part file; dest_path is the final filename. - dest = Path(event.dest_path) if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest): return log.info(f"Event: moved -> {dest}") self.pending = True self.last_event = time.time() + def on_deleted(self, event): + if event.is_directory: + return + path = Path(event.src_path) + if path.suffix.lower() not in SUPPORTED: + return + log.info(f"Event: deleted {path}") + threading.Thread( + target=lambda: ( + delete_embeddings_for_path(path), + remove_from_state(path), + ), + daemon=True, + ).start() + def on_closed(self, event): # FileClosedEvent fires on the final file after Nextcloud completes write. # Belt-and-suspenders catch for any write pattern not caught by on_moved.