watcher: handle deletes; sweep_orphans cleans existing phantom chunks
watcher.py now listens for on_deleted events and treats on_moved destinations that fall outside NEXTCLOUD_PATH (Nextcloud trashbin, moves to other volumes) as deletes. Both cases call delete_embeddings_for_path (DELETE WHERE metadata.filepath = ...) and remove_from_state to drop the file from watcher_state.json so it isn't carried as known-mtime. Match is by metadata.filepath, not source basename, so files that share a name across folders don't collide. scripts/sweep_orphans.py is the one-time cleanup for chunks the watcher missed before this fix: - Modern pass: rows with metadata.filepath whose file no longer exists. - Legacy pass: rows with NULL filepath and type='document' whose basename isn't anywhere on disk. type='document' restriction skips conversations and memory snapshots (synthetic sources, not files on disk). First run cleaned 629 rows: 628 from moved-file duplicates (e.g., BirdAI docs that traveled across Journal/, Library/, Journal/Projects/BirdAI/) plus the AARON_NELSON_BIO.pdf phantom Aaron flagged.
This commit is contained in:
@@ -0,0 +1,123 @@
|
|||||||
|
"""One-off: remove embeddings rows that no longer correspond to a file on disk.
|
||||||
|
|
||||||
|
Two passes:
|
||||||
|
1. Modern rows (metadata.filepath set): check each filepath, delete if missing.
|
||||||
|
2. Legacy rows (metadata.filepath null): build a set of all basenames present
|
||||||
|
anywhere under NEXTCLOUD_PATH, then delete rows whose `source` basename
|
||||||
|
isn't in that set.
|
||||||
|
|
||||||
|
Default mode is a dry-run (counts + sample paths, no writes). Pass --apply to
|
||||||
|
actually delete.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
|
||||||
|
NEXTCLOUD_PATH = Path("/home/aaron/nextcloud/data/data/aaron/files")
|
||||||
|
APPLY = "--apply" in sys.argv
|
||||||
|
|
||||||
|
|
||||||
|
def get_pg():
|
||||||
|
return psycopg2.connect(os.environ["PG_DSN"])
|
||||||
|
|
||||||
|
|
||||||
|
def scan_modern_orphans():
|
||||||
|
"""Rows with metadata.filepath whose file doesn't exist on disk."""
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"SELECT id, source, metadata->>'filepath' AS filepath "
|
||||||
|
"FROM embeddings WHERE metadata->>'filepath' IS NOT NULL"
|
||||||
|
)
|
||||||
|
orphans = []
|
||||||
|
by_source = defaultdict(int)
|
||||||
|
for row in cur.fetchall():
|
||||||
|
fp = row[2]
|
||||||
|
if fp and not Path(fp).exists():
|
||||||
|
orphans.append(row)
|
||||||
|
by_source[row[1]] += 1
|
||||||
|
pg.close()
|
||||||
|
return orphans, by_source
|
||||||
|
|
||||||
|
|
||||||
|
def scan_legacy_orphans():
|
||||||
|
"""Rows without metadata.filepath whose basename isn't anywhere under
|
||||||
|
NEXTCLOUD_PATH. Restricted to type='document' so conversations and memory
|
||||||
|
snapshots (which are synthetic sources, not files on disk) aren't flagged
|
||||||
|
as orphans. Walks the filesystem once to build the basename set."""
|
||||||
|
print(f" walking {NEXTCLOUD_PATH} to build basename index...")
|
||||||
|
on_disk = set()
|
||||||
|
for p in NEXTCLOUD_PATH.rglob("*"):
|
||||||
|
if p.is_file():
|
||||||
|
on_disk.add(p.name)
|
||||||
|
print(f" {len(on_disk):,} files on disk")
|
||||||
|
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"SELECT id, source FROM embeddings "
|
||||||
|
"WHERE metadata->>'filepath' IS NULL AND type = 'document'"
|
||||||
|
)
|
||||||
|
orphans = []
|
||||||
|
by_source = defaultdict(int)
|
||||||
|
for row in cur.fetchall():
|
||||||
|
if row[1] not in on_disk:
|
||||||
|
orphans.append(row)
|
||||||
|
by_source[row[1]] += 1
|
||||||
|
pg.close()
|
||||||
|
return orphans, by_source
|
||||||
|
|
||||||
|
|
||||||
|
def delete_rows(ids):
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute("DELETE FROM embeddings WHERE id = ANY(%s)", (list(ids),))
|
||||||
|
deleted = cur.rowcount
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
return deleted
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print(f"Mode: {'APPLY (destructive)' if APPLY else 'DRY-RUN (no writes)'}")
|
||||||
|
print(f"Target: {NEXTCLOUD_PATH}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("Pass 1 — modern rows (metadata.filepath set):")
|
||||||
|
modern, modern_by_src = scan_modern_orphans()
|
||||||
|
print(f" {len(modern):,} orphan rows across {len(modern_by_src):,} files")
|
||||||
|
for src, n in sorted(modern_by_src.items(), key=lambda kv: -kv[1])[:10]:
|
||||||
|
print(f" {n:>4} chunks — {src}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("Pass 2 — legacy rows (no metadata.filepath):")
|
||||||
|
legacy, legacy_by_src = scan_legacy_orphans()
|
||||||
|
print(f" {len(legacy):,} orphan rows across {len(legacy_by_src):,} files")
|
||||||
|
for src, n in sorted(legacy_by_src.items(), key=lambda kv: -kv[1])[:10]:
|
||||||
|
print(f" {n:>4} chunks — {src}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
total = len(modern) + len(legacy)
|
||||||
|
if total == 0:
|
||||||
|
print("Nothing to delete.")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not APPLY:
|
||||||
|
print(f"Dry-run only. Re-run with --apply to delete {total:,} rows.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"Deleting {total:,} orphan rows...")
|
||||||
|
n1 = delete_rows([r[0] for r in modern]) if modern else 0
|
||||||
|
n2 = delete_rows([r[0] for r in legacy]) if legacy else 0
|
||||||
|
print(f" modern: {n1:,} legacy: {n2:,} total: {n1 + n2:,}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
+69
-1
@@ -123,6 +123,42 @@ def resolve_ingest_failure(source: str):
|
|||||||
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
|
log.warning(f"Could not resolve ingest failure record (non-fatal): {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def delete_embeddings_for_path(filepath: Path):
|
||||||
|
"""Remove embeddings rows for a file that no longer exists. Matches by
|
||||||
|
metadata.filepath so multi-folder same-basename files don't collide.
|
||||||
|
Legacy rows without filepath metadata are left alone — they get cleaned
|
||||||
|
by sweep_orphans.py."""
|
||||||
|
try:
|
||||||
|
pg = get_pg()
|
||||||
|
try:
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"DELETE FROM embeddings WHERE metadata->>'filepath' = %s",
|
||||||
|
(str(filepath),),
|
||||||
|
)
|
||||||
|
deleted = cur.rowcount
|
||||||
|
pg.commit()
|
||||||
|
if deleted:
|
||||||
|
log.info(f"Deleted {deleted} chunks for removed file: {filepath}")
|
||||||
|
finally:
|
||||||
|
pg.close()
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f"Could not delete embeddings for {filepath} (non-fatal): {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def remove_from_state(filepath: Path):
|
||||||
|
"""Drop a deleted file from watcher_state.json so it isn't carried as
|
||||||
|
'known mtime' indefinitely."""
|
||||||
|
try:
|
||||||
|
state = load_state()
|
||||||
|
key = str(filepath)
|
||||||
|
if key in state:
|
||||||
|
del state[key]
|
||||||
|
save_state(state)
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f"Could not update state for deleted {filepath} (non-fatal): {e}")
|
||||||
|
|
||||||
|
|
||||||
IGNORED_TOP_FOLDERS = {"Drafts"}
|
IGNORED_TOP_FOLDERS = {"Drafts"}
|
||||||
|
|
||||||
|
|
||||||
@@ -350,15 +386,47 @@ class IngestHandler(FileSystemEventHandler):
|
|||||||
def on_moved(self, event):
|
def on_moved(self, event):
|
||||||
if event.is_directory:
|
if event.is_directory:
|
||||||
return
|
return
|
||||||
|
src = Path(event.src_path)
|
||||||
|
dest = Path(event.dest_path)
|
||||||
|
# If destination is outside NEXTCLOUD_PATH (e.g., Nextcloud trashbin at
|
||||||
|
# /home/aaron/nextcloud/data/data/aaron/files_trashbin/), treat as a
|
||||||
|
# delete — the file is no longer in the watched corpus.
|
||||||
|
try:
|
||||||
|
dest.relative_to(NEXTCLOUD_PATH)
|
||||||
|
except ValueError:
|
||||||
|
if src.suffix.lower() in SUPPORTED:
|
||||||
|
log.info(f"Event: moved out of tree {src} -> {dest}")
|
||||||
|
threading.Thread(
|
||||||
|
target=lambda: (
|
||||||
|
delete_embeddings_for_path(src),
|
||||||
|
remove_from_state(src),
|
||||||
|
),
|
||||||
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
return
|
||||||
# Nextcloud WebDAV writes .part temp files then renames to final path.
|
# Nextcloud WebDAV writes .part temp files then renames to final path.
|
||||||
# src_path is the .part file; dest_path is the final filename.
|
# src_path is the .part file; dest_path is the final filename.
|
||||||
dest = Path(event.dest_path)
|
|
||||||
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
|
if dest.suffix.lower() not in SUPPORTED or self._should_ignore(dest):
|
||||||
return
|
return
|
||||||
log.info(f"Event: moved -> {dest}")
|
log.info(f"Event: moved -> {dest}")
|
||||||
self.pending = True
|
self.pending = True
|
||||||
self.last_event = time.time()
|
self.last_event = time.time()
|
||||||
|
|
||||||
|
def on_deleted(self, event):
|
||||||
|
if event.is_directory:
|
||||||
|
return
|
||||||
|
path = Path(event.src_path)
|
||||||
|
if path.suffix.lower() not in SUPPORTED:
|
||||||
|
return
|
||||||
|
log.info(f"Event: deleted {path}")
|
||||||
|
threading.Thread(
|
||||||
|
target=lambda: (
|
||||||
|
delete_embeddings_for_path(path),
|
||||||
|
remove_from_state(path),
|
||||||
|
),
|
||||||
|
daemon=True,
|
||||||
|
).start()
|
||||||
|
|
||||||
def on_closed(self, event):
|
def on_closed(self, event):
|
||||||
# FileClosedEvent fires on the final file after Nextcloud completes write.
|
# FileClosedEvent fires on the final file after Nextcloud completes write.
|
||||||
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
|
# Belt-and-suspenders catch for any write pattern not caught by on_moved.
|
||||||
|
|||||||
Reference in New Issue
Block a user