Files
aaronAI/scripts/experiments/embeddings_backfill_apply.py
aaron 3c7c228db0 embeddings: backfill type and created_at (Improvement #2 part A)
Backfills 9,815 type-NULL rows to 'document' (extension classifier, 100% hit)
and 12,109 created_at-NULL rows via five batches:

  C1 filepath_stat:        9,649  filesystem mtime via metadata.filepath
  C2 watcher_state_unique:   676  unique source-name lookup in watcher_state
  C3 watcher_state_collision_pick_latest_of_N:
                             234  collision; most-recent watcher mtime
  C4 chatgpt_export:       1,548  convo create_time from export JSONs
                                  (168/168 distinct convo_ids resolved)
  C5 sentinel:                 2  2026-04-26T00:00:00Z (pgvector migration date)

Provenance written to metadata.type_source and metadata.created_at_source
on every row changed by this run. type_source is empty on rows where the
type field was already populated pre-run; in those cases the snapshot
table is the source of truth for what changed.

Snapshot: embeddings_backup_2026_05_03 (CREATE TABLE AS SELECT id, type,
created_at, metadata FROM embeddings; 14,069 rows; revertable via id-join).

Verification:
  V1 live counts:      type_null=0  ca_null=0
  V2 spot-check 11 rows across cohorts: provenance correct
  V3 snapshot intact: 14,069 rows, pre-backfill NULL counts preserved
  V4 cross-check vs snapshot: reconciles per-provenance to dry-run

Read-side use (B + C: writer enforcement + minimal retrieval read) deferred
to a separate session. The backfill is complete and verified, but the type
and created_at fields are not yet load-bearing — every current reader still
ignores them. Without B+C this lands as data prep, not behavior change.
2026-05-03 23:58:53 +00:00

305 lines
12 KiB
Python

"""Backfill embeddings.type and embeddings.created_at (Improvement #2 / A.3).
Idempotent on cohort predicates (every WHERE clause includes IS NULL on the
target column). Writes provenance to metadata.type_source and metadata.created_at_source
so each row is auditable and revertable per-source. Default --dry-run=True.
Order of batches:
T1. type backfill: WHERE type IS NULL -> 'document' (extension-classified, all hit).
C1. created_at: WHERE ca IS NULL AND metadata.filepath stat-resolves -> filesystem mtime.
C2. created_at: WHERE ca IS NULL AND source has unique watcher_state path -> watcher mtime.
C3. created_at: WHERE ca IS NULL AND source has watcher_state collision -> most-recent mtime.
C4. created_at: WHERE type='chatgpt_conversation' AND ca IS NULL -> export-resolved create_time.
C5. created_at: WHERE ca IS NULL (residual) -> sentinel.
Snapshot table embeddings_backup_2026_05_03 must exist before --apply.
Usage:
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py # dry-run
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py --apply # write
Exits non-zero if snapshot is missing on --apply.
"""
import argparse
import json
import os
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
import psycopg2
from psycopg2.extras import RealDictCursor, Json
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
PG_DSN = os.getenv("PG_DSN")
WATCHER_STATE = Path.home() / "aaronai" / "watcher_state.json"
CHATGPT_EXPORT_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/ChatGPT Export")
SNAPSHOT_TABLE = "embeddings_backup_2026_05_03"
SENTINEL_ISO = "2026-04-26T00:00:00Z"
# ─── Helpers ────────────────────────────────────────────────────────────────
def get_pg():
return psycopg2.connect(PG_DSN, cursor_factory=RealDictCursor)
def header(t):
bar = "=" * 70
print(f"\n{bar}\n{t}\n{bar}")
def fmt_ts_unix(ts):
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z")
def fmt_ts_mtime(p):
try:
return datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
except Exception:
return None
def load_watcher_state():
state = json.loads(WATCHER_STATE.read_text())
by_name = defaultdict(list)
for path, mtime in state.items():
by_name[Path(path).name].append((path, mtime))
return by_name
def load_chatgpt_index():
if not CHATGPT_EXPORT_DIR.exists():
return {}
index = {}
for f in sorted(CHATGPT_EXPORT_DIR.glob("conversations*.json")):
try:
data = json.loads(f.read_text(encoding="utf-8"))
except Exception:
continue
for convo in data:
cid = convo.get("id") or convo.get("conversation_id")
ct = convo.get("create_time")
if cid and ct is not None:
index[cid] = ct
return index
def assert_snapshot(cur):
cur.execute("SELECT to_regclass(%s) AS t;", (SNAPSHOT_TABLE,))
if cur.fetchone()["t"] is None:
print(f"ERROR: snapshot table '{SNAPSHOT_TABLE}' not found. Run A.2 first.")
sys.exit(2)
cur.execute(f"SELECT COUNT(*) AS n FROM {SNAPSHOT_TABLE};")
snap = cur.fetchone()["n"]
cur.execute("SELECT COUNT(*) AS n FROM embeddings;")
live = cur.fetchone()["n"]
print(f"snapshot {SNAPSHOT_TABLE}: {snap} rows; live embeddings: {live} rows")
if snap != live:
print(f"ERROR: snapshot row count != live ({snap} vs {live}). Refresh snapshot before --apply.")
sys.exit(2)
# ─── Batch primitive ────────────────────────────────────────────────────────
def run_batch(cur, label, candidates, apply_mode):
"""candidates: list of (id, set_type, set_ca, type_source, ca_source).
set_type / set_ca may be None to leave that column alone.
In dry-run we still execute UPDATEs inside an outer transaction (rolled back
at the end) so subsequent batches' SELECTs see the correct intermediate state."""
n = len(candidates)
print(f" {label}: {n} rows queued")
if n == 0:
return 0
for c in candidates[:3]:
print(f" sample: id={c[0]} type={c[1]!r} ca={c[2]!r} type_src={c[3]} ca_src={c[4]}")
n_written = 0
for row_id, set_type, set_ca, type_src, ca_src in candidates:
meta_patch = {}
if type_src:
meta_patch["type_source"] = type_src
if ca_src:
meta_patch["created_at_source"] = ca_src
# Build set list dynamically.
sets, params = [], []
if set_type is not None:
sets.append("type = %s")
params.append(set_type)
if set_ca is not None:
sets.append("created_at = %s")
params.append(set_ca)
if meta_patch:
sets.append("metadata = COALESCE(metadata, '{}'::jsonb) || %s::jsonb")
params.append(json.dumps(meta_patch))
params.append(row_id)
cur.execute(f"UPDATE embeddings SET {', '.join(sets)} WHERE id = %s;", params)
n_written += cur.rowcount
print(f" {n_written} rows updated{' (will rollback)' if not apply_mode else ''}")
return n_written
# ─── Batches ────────────────────────────────────────────────────────────────
def batch_T1_type(cur, apply_mode):
"""type IS NULL -> 'document'. All cohort A rows have a SUPPORTED extension."""
cur.execute("""
SELECT id, source FROM embeddings WHERE type IS NULL ORDER BY id;
""")
rows = cur.fetchall()
cands = [(r["id"], "document", None, "inferred_extension", None) for r in rows]
return run_batch(cur, "T1 type IS NULL -> 'document'", cands, apply_mode)
def batch_C1_filepath_stat(cur, apply_mode):
"""ca IS NULL AND metadata.filepath stat-resolves -> mtime."""
cur.execute("""
SELECT id, source, metadata->>'filepath' AS fp
FROM embeddings
WHERE created_at IS NULL AND metadata->>'filepath' IS NOT NULL
ORDER BY id;
""")
rows = cur.fetchall()
cands, n_skipped_missing = [], 0
for r in rows:
p = Path(r["fp"])
if p.exists():
mt = fmt_ts_mtime(p)
if mt:
cands.append((r["id"], None, mt, None, "filepath_stat"))
continue
n_skipped_missing += 1
print(f" C1 candidates: {len(cands)} (skipped {n_skipped_missing} where filepath gone or unstattable)")
return run_batch(cur, "C1 ca IS NULL AND filepath stat-resolves -> mtime", cands, apply_mode)
def batch_C2_C3_watcher_state(cur, apply_mode):
"""ca IS NULL AND filepath unresolvable -> watcher_state by source basename.
C2 = unique hit, C3 = collision pick-latest."""
by_name = load_watcher_state()
cur.execute("""
SELECT id, source, metadata->>'filepath' AS fp
FROM embeddings
WHERE created_at IS NULL
ORDER BY id;
""")
rows = cur.fetchall()
c2, c3 = [], []
skipped_no_match = 0
for r in rows:
# skip rows already targeted by C1 path
if r["fp"] and Path(r["fp"]).exists():
continue
src = r["source"]
if not src or src not in by_name:
skipped_no_match += 1
continue
candidates = by_name[src]
if len(candidates) == 1:
mt = fmt_ts_unix(candidates[0][1])
c2.append((r["id"], None, mt, None, "watcher_state_unique"))
else:
latest = max(candidates, key=lambda x: float(x[1]))
mt = fmt_ts_unix(latest[1])
c3.append((r["id"], None, mt, None, f"watcher_state_collision_pick_latest_of_{len(candidates)}"))
print(f" C2/C3 source-basename fallback: {len(c2)} unique, {len(c3)} collision, "
f"{skipped_no_match} unmatched (will fall to C4/C5)")
n2 = run_batch(cur, "C2 ca IS NULL AND watcher_state unique -> mtime", c2, apply_mode)
n3 = run_batch(cur, "C3 ca IS NULL AND watcher_state collision -> latest mtime", c3, apply_mode)
return n2 + n3
def batch_C4_chatgpt_export(cur, apply_mode):
index = load_chatgpt_index()
cur.execute("""
SELECT id, source FROM embeddings
WHERE type='chatgpt_conversation' AND created_at IS NULL ORDER BY id;
""")
rows = cur.fetchall()
cands, unresolved = [], 0
for r in rows:
m = re.match(r"^chatgpt_(.+)_(\d+)$", r["id"])
cid = m.group(1) if m else None
ct = index.get(cid)
if ct is None:
unresolved += 1
continue
ct_iso = datetime.fromtimestamp(float(ct), tz=timezone.utc).isoformat().replace("+00:00", "Z")
cands.append((r["id"], None, ct_iso, None, "chatgpt_export"))
print(f" C4 chatgpt export resolution: {len(cands)} resolved, {unresolved} unresolved (fall to C5)")
return run_batch(cur, "C4 type='chatgpt_conversation' AND ca IS NULL -> export create_time", cands, apply_mode)
def batch_C5_sentinel(cur, apply_mode):
cur.execute("""
SELECT id, type, source FROM embeddings WHERE created_at IS NULL ORDER BY id;
""")
rows = cur.fetchall()
cands = [(r["id"], None, SENTINEL_ISO, None, "sentinel") for r in rows]
if cands:
sample_types = Counter(r["type"] for r in rows)
print(f" C5 residual sentinel rows by type: {dict(sample_types)}")
return run_batch(cur, f"C5 ca IS NULL residual -> sentinel {SENTINEL_ISO}", cands, apply_mode)
# ─── Pre/post counts ────────────────────────────────────────────────────────
def print_counts(cur, label):
cur.execute("""
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE type IS NULL) AS type_null,
COUNT(*) FILTER (WHERE created_at IS NULL) AS ca_null
FROM embeddings;
""")
r = cur.fetchone()
print(f" [{label}] total={r['total']} type_null={r['type_null']} ca_null={r['ca_null']}")
# ─── Driver ─────────────────────────────────────────────────────────────────
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--apply", action="store_true", help="default false (dry-run)")
args = ap.parse_args()
apply_mode = args.apply
pg = get_pg()
cur = pg.cursor()
print(f"Mode: {'APPLY (writes will commit)' if apply_mode else 'DRY-RUN (no writes)'}")
print(f"Sentinel: {SENTINEL_ISO}")
if apply_mode:
assert_snapshot(cur)
header("PRE-COUNTS")
print_counts(cur, "before")
header("BATCHES")
n_t1 = batch_T1_type(cur, apply_mode)
n_c1 = batch_C1_filepath_stat(cur, apply_mode)
n_c2c3 = batch_C2_C3_watcher_state(cur, apply_mode)
n_c4 = batch_C4_chatgpt_export(cur, apply_mode)
n_c5 = batch_C5_sentinel(cur, apply_mode)
header("POST-COUNTS")
print_counts(cur, "after" if apply_mode else "after (in-transaction, will rollback)")
if apply_mode:
pg.commit()
print("\nCOMMITTED.")
else:
pg.rollback()
print("\nROLLED BACK (dry-run).")
print(f"\nSummary: T1={n_t1} C1={n_c1} C2+C3={n_c2c3} C4={n_c4} C5={n_c5}")
pg.close()
if __name__ == "__main__":
main()