"""Backfill embeddings.type and embeddings.created_at (Improvement #2 / A.3). Idempotent on cohort predicates (every WHERE clause includes IS NULL on the target column). Writes provenance to metadata.type_source and metadata.created_at_source so each row is auditable and revertable per-source. Default --dry-run=True. Order of batches: T1. type backfill: WHERE type IS NULL -> 'document' (extension-classified, all hit). C1. created_at: WHERE ca IS NULL AND metadata.filepath stat-resolves -> filesystem mtime. C2. created_at: WHERE ca IS NULL AND source has unique watcher_state path -> watcher mtime. C3. created_at: WHERE ca IS NULL AND source has watcher_state collision -> most-recent mtime. C4. created_at: WHERE type='chatgpt_conversation' AND ca IS NULL -> export-resolved create_time. C5. created_at: WHERE ca IS NULL (residual) -> sentinel. Snapshot table embeddings_backup_2026_05_03 must exist before --apply. Usage: venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py # dry-run venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py --apply # write Exits non-zero if snapshot is missing on --apply. """ import argparse import json import os import re import sys from collections import Counter, defaultdict from datetime import datetime, timezone from pathlib import Path import psycopg2 from psycopg2.extras import RealDictCursor, Json from dotenv import load_dotenv load_dotenv(Path.home() / "aaronai" / ".env") PG_DSN = os.getenv("PG_DSN") WATCHER_STATE = Path.home() / "aaronai" / "watcher_state.json" CHATGPT_EXPORT_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/ChatGPT Export") SNAPSHOT_TABLE = "embeddings_backup_2026_05_03" SENTINEL_ISO = "2026-04-26T00:00:00Z" # ─── Helpers ──────────────────────────────────────────────────────────────── def get_pg(): return psycopg2.connect(PG_DSN, cursor_factory=RealDictCursor) def header(t): bar = "=" * 70 print(f"\n{bar}\n{t}\n{bar}") def fmt_ts_unix(ts): return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z") def fmt_ts_mtime(p): try: return datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z") except Exception: return None def load_watcher_state(): state = json.loads(WATCHER_STATE.read_text()) by_name = defaultdict(list) for path, mtime in state.items(): by_name[Path(path).name].append((path, mtime)) return by_name def load_chatgpt_index(): if not CHATGPT_EXPORT_DIR.exists(): return {} index = {} for f in sorted(CHATGPT_EXPORT_DIR.glob("conversations*.json")): try: data = json.loads(f.read_text(encoding="utf-8")) except Exception: continue for convo in data: cid = convo.get("id") or convo.get("conversation_id") ct = convo.get("create_time") if cid and ct is not None: index[cid] = ct return index def assert_snapshot(cur): cur.execute("SELECT to_regclass(%s) AS t;", (SNAPSHOT_TABLE,)) if cur.fetchone()["t"] is None: print(f"ERROR: snapshot table '{SNAPSHOT_TABLE}' not found. Run A.2 first.") sys.exit(2) cur.execute(f"SELECT COUNT(*) AS n FROM {SNAPSHOT_TABLE};") snap = cur.fetchone()["n"] cur.execute("SELECT COUNT(*) AS n FROM embeddings;") live = cur.fetchone()["n"] print(f"snapshot {SNAPSHOT_TABLE}: {snap} rows; live embeddings: {live} rows") if snap != live: print(f"ERROR: snapshot row count != live ({snap} vs {live}). Refresh snapshot before --apply.") sys.exit(2) # ─── Batch primitive ──────────────────────────────────────────────────────── def run_batch(cur, label, candidates, apply_mode): """candidates: list of (id, set_type, set_ca, type_source, ca_source). set_type / set_ca may be None to leave that column alone. In dry-run we still execute UPDATEs inside an outer transaction (rolled back at the end) so subsequent batches' SELECTs see the correct intermediate state.""" n = len(candidates) print(f" {label}: {n} rows queued") if n == 0: return 0 for c in candidates[:3]: print(f" sample: id={c[0]} type={c[1]!r} ca={c[2]!r} type_src={c[3]} ca_src={c[4]}") n_written = 0 for row_id, set_type, set_ca, type_src, ca_src in candidates: meta_patch = {} if type_src: meta_patch["type_source"] = type_src if ca_src: meta_patch["created_at_source"] = ca_src # Build set list dynamically. sets, params = [], [] if set_type is not None: sets.append("type = %s") params.append(set_type) if set_ca is not None: sets.append("created_at = %s") params.append(set_ca) if meta_patch: sets.append("metadata = COALESCE(metadata, '{}'::jsonb) || %s::jsonb") params.append(json.dumps(meta_patch)) params.append(row_id) cur.execute(f"UPDATE embeddings SET {', '.join(sets)} WHERE id = %s;", params) n_written += cur.rowcount print(f" {n_written} rows updated{' (will rollback)' if not apply_mode else ''}") return n_written # ─── Batches ──────────────────────────────────────────────────────────────── def batch_T1_type(cur, apply_mode): """type IS NULL -> 'document'. All cohort A rows have a SUPPORTED extension.""" cur.execute(""" SELECT id, source FROM embeddings WHERE type IS NULL ORDER BY id; """) rows = cur.fetchall() cands = [(r["id"], "document", None, "inferred_extension", None) for r in rows] return run_batch(cur, "T1 type IS NULL -> 'document'", cands, apply_mode) def batch_C1_filepath_stat(cur, apply_mode): """ca IS NULL AND metadata.filepath stat-resolves -> mtime.""" cur.execute(""" SELECT id, source, metadata->>'filepath' AS fp FROM embeddings WHERE created_at IS NULL AND metadata->>'filepath' IS NOT NULL ORDER BY id; """) rows = cur.fetchall() cands, n_skipped_missing = [], 0 for r in rows: p = Path(r["fp"]) if p.exists(): mt = fmt_ts_mtime(p) if mt: cands.append((r["id"], None, mt, None, "filepath_stat")) continue n_skipped_missing += 1 print(f" C1 candidates: {len(cands)} (skipped {n_skipped_missing} where filepath gone or unstattable)") return run_batch(cur, "C1 ca IS NULL AND filepath stat-resolves -> mtime", cands, apply_mode) def batch_C2_C3_watcher_state(cur, apply_mode): """ca IS NULL AND filepath unresolvable -> watcher_state by source basename. C2 = unique hit, C3 = collision pick-latest.""" by_name = load_watcher_state() cur.execute(""" SELECT id, source, metadata->>'filepath' AS fp FROM embeddings WHERE created_at IS NULL ORDER BY id; """) rows = cur.fetchall() c2, c3 = [], [] skipped_no_match = 0 for r in rows: # skip rows already targeted by C1 path if r["fp"] and Path(r["fp"]).exists(): continue src = r["source"] if not src or src not in by_name: skipped_no_match += 1 continue candidates = by_name[src] if len(candidates) == 1: mt = fmt_ts_unix(candidates[0][1]) c2.append((r["id"], None, mt, None, "watcher_state_unique")) else: latest = max(candidates, key=lambda x: float(x[1])) mt = fmt_ts_unix(latest[1]) c3.append((r["id"], None, mt, None, f"watcher_state_collision_pick_latest_of_{len(candidates)}")) print(f" C2/C3 source-basename fallback: {len(c2)} unique, {len(c3)} collision, " f"{skipped_no_match} unmatched (will fall to C4/C5)") n2 = run_batch(cur, "C2 ca IS NULL AND watcher_state unique -> mtime", c2, apply_mode) n3 = run_batch(cur, "C3 ca IS NULL AND watcher_state collision -> latest mtime", c3, apply_mode) return n2 + n3 def batch_C4_chatgpt_export(cur, apply_mode): index = load_chatgpt_index() cur.execute(""" SELECT id, source FROM embeddings WHERE type='chatgpt_conversation' AND created_at IS NULL ORDER BY id; """) rows = cur.fetchall() cands, unresolved = [], 0 for r in rows: m = re.match(r"^chatgpt_(.+)_(\d+)$", r["id"]) cid = m.group(1) if m else None ct = index.get(cid) if ct is None: unresolved += 1 continue ct_iso = datetime.fromtimestamp(float(ct), tz=timezone.utc).isoformat().replace("+00:00", "Z") cands.append((r["id"], None, ct_iso, None, "chatgpt_export")) print(f" C4 chatgpt export resolution: {len(cands)} resolved, {unresolved} unresolved (fall to C5)") return run_batch(cur, "C4 type='chatgpt_conversation' AND ca IS NULL -> export create_time", cands, apply_mode) def batch_C5_sentinel(cur, apply_mode): cur.execute(""" SELECT id, type, source FROM embeddings WHERE created_at IS NULL ORDER BY id; """) rows = cur.fetchall() cands = [(r["id"], None, SENTINEL_ISO, None, "sentinel") for r in rows] if cands: sample_types = Counter(r["type"] for r in rows) print(f" C5 residual sentinel rows by type: {dict(sample_types)}") return run_batch(cur, f"C5 ca IS NULL residual -> sentinel {SENTINEL_ISO}", cands, apply_mode) # ─── Pre/post counts ──────────────────────────────────────────────────────── def print_counts(cur, label): cur.execute(""" SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE type IS NULL) AS type_null, COUNT(*) FILTER (WHERE created_at IS NULL) AS ca_null FROM embeddings; """) r = cur.fetchone() print(f" [{label}] total={r['total']} type_null={r['type_null']} ca_null={r['ca_null']}") # ─── Driver ───────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser() ap.add_argument("--apply", action="store_true", help="default false (dry-run)") args = ap.parse_args() apply_mode = args.apply pg = get_pg() cur = pg.cursor() print(f"Mode: {'APPLY (writes will commit)' if apply_mode else 'DRY-RUN (no writes)'}") print(f"Sentinel: {SENTINEL_ISO}") if apply_mode: assert_snapshot(cur) header("PRE-COUNTS") print_counts(cur, "before") header("BATCHES") n_t1 = batch_T1_type(cur, apply_mode) n_c1 = batch_C1_filepath_stat(cur, apply_mode) n_c2c3 = batch_C2_C3_watcher_state(cur, apply_mode) n_c4 = batch_C4_chatgpt_export(cur, apply_mode) n_c5 = batch_C5_sentinel(cur, apply_mode) header("POST-COUNTS") print_counts(cur, "after" if apply_mode else "after (in-transaction, will rollback)") if apply_mode: pg.commit() print("\nCOMMITTED.") else: pg.rollback() print("\nROLLED BACK (dry-run).") print(f"\nSummary: T1={n_t1} C1={n_c1} C2+C3={n_c2c3} C4={n_c4} C5={n_c5}") pg.close() if __name__ == "__main__": main()