3c7c228db0
Backfills 9,815 type-NULL rows to 'document' (extension classifier, 100% hit)
and 12,109 created_at-NULL rows via five batches:
C1 filepath_stat: 9,649 filesystem mtime via metadata.filepath
C2 watcher_state_unique: 676 unique source-name lookup in watcher_state
C3 watcher_state_collision_pick_latest_of_N:
234 collision; most-recent watcher mtime
C4 chatgpt_export: 1,548 convo create_time from export JSONs
(168/168 distinct convo_ids resolved)
C5 sentinel: 2 2026-04-26T00:00:00Z (pgvector migration date)
Provenance written to metadata.type_source and metadata.created_at_source
on every row changed by this run. type_source is empty on rows where the
type field was already populated pre-run; in those cases the snapshot
table is the source of truth for what changed.
Snapshot: embeddings_backup_2026_05_03 (CREATE TABLE AS SELECT id, type,
created_at, metadata FROM embeddings; 14,069 rows; revertable via id-join).
Verification:
V1 live counts: type_null=0 ca_null=0
V2 spot-check 11 rows across cohorts: provenance correct
V3 snapshot intact: 14,069 rows, pre-backfill NULL counts preserved
V4 cross-check vs snapshot: reconciles per-provenance to dry-run
Read-side use (B + C: writer enforcement + minimal retrieval read) deferred
to a separate session. The backfill is complete and verified, but the type
and created_at fields are not yet load-bearing — every current reader still
ignores them. Without B+C this lands as data prep, not behavior change.
305 lines
12 KiB
Python
305 lines
12 KiB
Python
"""Backfill embeddings.type and embeddings.created_at (Improvement #2 / A.3).
|
|
|
|
Idempotent on cohort predicates (every WHERE clause includes IS NULL on the
|
|
target column). Writes provenance to metadata.type_source and metadata.created_at_source
|
|
so each row is auditable and revertable per-source. Default --dry-run=True.
|
|
|
|
Order of batches:
|
|
T1. type backfill: WHERE type IS NULL -> 'document' (extension-classified, all hit).
|
|
C1. created_at: WHERE ca IS NULL AND metadata.filepath stat-resolves -> filesystem mtime.
|
|
C2. created_at: WHERE ca IS NULL AND source has unique watcher_state path -> watcher mtime.
|
|
C3. created_at: WHERE ca IS NULL AND source has watcher_state collision -> most-recent mtime.
|
|
C4. created_at: WHERE type='chatgpt_conversation' AND ca IS NULL -> export-resolved create_time.
|
|
C5. created_at: WHERE ca IS NULL (residual) -> sentinel.
|
|
|
|
Snapshot table embeddings_backup_2026_05_03 must exist before --apply.
|
|
|
|
Usage:
|
|
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py # dry-run
|
|
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py --apply # write
|
|
|
|
Exits non-zero if snapshot is missing on --apply.
|
|
"""
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor, Json
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
WATCHER_STATE = Path.home() / "aaronai" / "watcher_state.json"
|
|
CHATGPT_EXPORT_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/ChatGPT Export")
|
|
SNAPSHOT_TABLE = "embeddings_backup_2026_05_03"
|
|
SENTINEL_ISO = "2026-04-26T00:00:00Z"
|
|
|
|
|
|
# ─── Helpers ────────────────────────────────────────────────────────────────
|
|
|
|
def get_pg():
|
|
return psycopg2.connect(PG_DSN, cursor_factory=RealDictCursor)
|
|
|
|
|
|
def header(t):
|
|
bar = "=" * 70
|
|
print(f"\n{bar}\n{t}\n{bar}")
|
|
|
|
|
|
def fmt_ts_unix(ts):
|
|
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
def fmt_ts_mtime(p):
|
|
try:
|
|
return datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def load_watcher_state():
|
|
state = json.loads(WATCHER_STATE.read_text())
|
|
by_name = defaultdict(list)
|
|
for path, mtime in state.items():
|
|
by_name[Path(path).name].append((path, mtime))
|
|
return by_name
|
|
|
|
|
|
def load_chatgpt_index():
|
|
if not CHATGPT_EXPORT_DIR.exists():
|
|
return {}
|
|
index = {}
|
|
for f in sorted(CHATGPT_EXPORT_DIR.glob("conversations*.json")):
|
|
try:
|
|
data = json.loads(f.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
continue
|
|
for convo in data:
|
|
cid = convo.get("id") or convo.get("conversation_id")
|
|
ct = convo.get("create_time")
|
|
if cid and ct is not None:
|
|
index[cid] = ct
|
|
return index
|
|
|
|
|
|
def assert_snapshot(cur):
|
|
cur.execute("SELECT to_regclass(%s) AS t;", (SNAPSHOT_TABLE,))
|
|
if cur.fetchone()["t"] is None:
|
|
print(f"ERROR: snapshot table '{SNAPSHOT_TABLE}' not found. Run A.2 first.")
|
|
sys.exit(2)
|
|
cur.execute(f"SELECT COUNT(*) AS n FROM {SNAPSHOT_TABLE};")
|
|
snap = cur.fetchone()["n"]
|
|
cur.execute("SELECT COUNT(*) AS n FROM embeddings;")
|
|
live = cur.fetchone()["n"]
|
|
print(f"snapshot {SNAPSHOT_TABLE}: {snap} rows; live embeddings: {live} rows")
|
|
if snap != live:
|
|
print(f"ERROR: snapshot row count != live ({snap} vs {live}). Refresh snapshot before --apply.")
|
|
sys.exit(2)
|
|
|
|
|
|
# ─── Batch primitive ────────────────────────────────────────────────────────
|
|
|
|
def run_batch(cur, label, candidates, apply_mode):
|
|
"""candidates: list of (id, set_type, set_ca, type_source, ca_source).
|
|
set_type / set_ca may be None to leave that column alone.
|
|
In dry-run we still execute UPDATEs inside an outer transaction (rolled back
|
|
at the end) so subsequent batches' SELECTs see the correct intermediate state."""
|
|
n = len(candidates)
|
|
print(f" {label}: {n} rows queued")
|
|
if n == 0:
|
|
return 0
|
|
for c in candidates[:3]:
|
|
print(f" sample: id={c[0]} type={c[1]!r} ca={c[2]!r} type_src={c[3]} ca_src={c[4]}")
|
|
n_written = 0
|
|
for row_id, set_type, set_ca, type_src, ca_src in candidates:
|
|
meta_patch = {}
|
|
if type_src:
|
|
meta_patch["type_source"] = type_src
|
|
if ca_src:
|
|
meta_patch["created_at_source"] = ca_src
|
|
# Build set list dynamically.
|
|
sets, params = [], []
|
|
if set_type is not None:
|
|
sets.append("type = %s")
|
|
params.append(set_type)
|
|
if set_ca is not None:
|
|
sets.append("created_at = %s")
|
|
params.append(set_ca)
|
|
if meta_patch:
|
|
sets.append("metadata = COALESCE(metadata, '{}'::jsonb) || %s::jsonb")
|
|
params.append(json.dumps(meta_patch))
|
|
params.append(row_id)
|
|
cur.execute(f"UPDATE embeddings SET {', '.join(sets)} WHERE id = %s;", params)
|
|
n_written += cur.rowcount
|
|
print(f" {n_written} rows updated{' (will rollback)' if not apply_mode else ''}")
|
|
return n_written
|
|
|
|
|
|
# ─── Batches ────────────────────────────────────────────────────────────────
|
|
|
|
def batch_T1_type(cur, apply_mode):
|
|
"""type IS NULL -> 'document'. All cohort A rows have a SUPPORTED extension."""
|
|
cur.execute("""
|
|
SELECT id, source FROM embeddings WHERE type IS NULL ORDER BY id;
|
|
""")
|
|
rows = cur.fetchall()
|
|
cands = [(r["id"], "document", None, "inferred_extension", None) for r in rows]
|
|
return run_batch(cur, "T1 type IS NULL -> 'document'", cands, apply_mode)
|
|
|
|
|
|
def batch_C1_filepath_stat(cur, apply_mode):
|
|
"""ca IS NULL AND metadata.filepath stat-resolves -> mtime."""
|
|
cur.execute("""
|
|
SELECT id, source, metadata->>'filepath' AS fp
|
|
FROM embeddings
|
|
WHERE created_at IS NULL AND metadata->>'filepath' IS NOT NULL
|
|
ORDER BY id;
|
|
""")
|
|
rows = cur.fetchall()
|
|
cands, n_skipped_missing = [], 0
|
|
for r in rows:
|
|
p = Path(r["fp"])
|
|
if p.exists():
|
|
mt = fmt_ts_mtime(p)
|
|
if mt:
|
|
cands.append((r["id"], None, mt, None, "filepath_stat"))
|
|
continue
|
|
n_skipped_missing += 1
|
|
print(f" C1 candidates: {len(cands)} (skipped {n_skipped_missing} where filepath gone or unstattable)")
|
|
return run_batch(cur, "C1 ca IS NULL AND filepath stat-resolves -> mtime", cands, apply_mode)
|
|
|
|
|
|
def batch_C2_C3_watcher_state(cur, apply_mode):
|
|
"""ca IS NULL AND filepath unresolvable -> watcher_state by source basename.
|
|
C2 = unique hit, C3 = collision pick-latest."""
|
|
by_name = load_watcher_state()
|
|
cur.execute("""
|
|
SELECT id, source, metadata->>'filepath' AS fp
|
|
FROM embeddings
|
|
WHERE created_at IS NULL
|
|
ORDER BY id;
|
|
""")
|
|
rows = cur.fetchall()
|
|
c2, c3 = [], []
|
|
skipped_no_match = 0
|
|
for r in rows:
|
|
# skip rows already targeted by C1 path
|
|
if r["fp"] and Path(r["fp"]).exists():
|
|
continue
|
|
src = r["source"]
|
|
if not src or src not in by_name:
|
|
skipped_no_match += 1
|
|
continue
|
|
candidates = by_name[src]
|
|
if len(candidates) == 1:
|
|
mt = fmt_ts_unix(candidates[0][1])
|
|
c2.append((r["id"], None, mt, None, "watcher_state_unique"))
|
|
else:
|
|
latest = max(candidates, key=lambda x: float(x[1]))
|
|
mt = fmt_ts_unix(latest[1])
|
|
c3.append((r["id"], None, mt, None, f"watcher_state_collision_pick_latest_of_{len(candidates)}"))
|
|
print(f" C2/C3 source-basename fallback: {len(c2)} unique, {len(c3)} collision, "
|
|
f"{skipped_no_match} unmatched (will fall to C4/C5)")
|
|
n2 = run_batch(cur, "C2 ca IS NULL AND watcher_state unique -> mtime", c2, apply_mode)
|
|
n3 = run_batch(cur, "C3 ca IS NULL AND watcher_state collision -> latest mtime", c3, apply_mode)
|
|
return n2 + n3
|
|
|
|
|
|
def batch_C4_chatgpt_export(cur, apply_mode):
|
|
index = load_chatgpt_index()
|
|
cur.execute("""
|
|
SELECT id, source FROM embeddings
|
|
WHERE type='chatgpt_conversation' AND created_at IS NULL ORDER BY id;
|
|
""")
|
|
rows = cur.fetchall()
|
|
cands, unresolved = [], 0
|
|
for r in rows:
|
|
m = re.match(r"^chatgpt_(.+)_(\d+)$", r["id"])
|
|
cid = m.group(1) if m else None
|
|
ct = index.get(cid)
|
|
if ct is None:
|
|
unresolved += 1
|
|
continue
|
|
ct_iso = datetime.fromtimestamp(float(ct), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
cands.append((r["id"], None, ct_iso, None, "chatgpt_export"))
|
|
print(f" C4 chatgpt export resolution: {len(cands)} resolved, {unresolved} unresolved (fall to C5)")
|
|
return run_batch(cur, "C4 type='chatgpt_conversation' AND ca IS NULL -> export create_time", cands, apply_mode)
|
|
|
|
|
|
def batch_C5_sentinel(cur, apply_mode):
|
|
cur.execute("""
|
|
SELECT id, type, source FROM embeddings WHERE created_at IS NULL ORDER BY id;
|
|
""")
|
|
rows = cur.fetchall()
|
|
cands = [(r["id"], None, SENTINEL_ISO, None, "sentinel") for r in rows]
|
|
if cands:
|
|
sample_types = Counter(r["type"] for r in rows)
|
|
print(f" C5 residual sentinel rows by type: {dict(sample_types)}")
|
|
return run_batch(cur, f"C5 ca IS NULL residual -> sentinel {SENTINEL_ISO}", cands, apply_mode)
|
|
|
|
|
|
# ─── Pre/post counts ────────────────────────────────────────────────────────
|
|
|
|
def print_counts(cur, label):
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE type IS NULL) AS type_null,
|
|
COUNT(*) FILTER (WHERE created_at IS NULL) AS ca_null
|
|
FROM embeddings;
|
|
""")
|
|
r = cur.fetchone()
|
|
print(f" [{label}] total={r['total']} type_null={r['type_null']} ca_null={r['ca_null']}")
|
|
|
|
|
|
# ─── Driver ─────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--apply", action="store_true", help="default false (dry-run)")
|
|
args = ap.parse_args()
|
|
apply_mode = args.apply
|
|
|
|
pg = get_pg()
|
|
cur = pg.cursor()
|
|
|
|
print(f"Mode: {'APPLY (writes will commit)' if apply_mode else 'DRY-RUN (no writes)'}")
|
|
print(f"Sentinel: {SENTINEL_ISO}")
|
|
|
|
if apply_mode:
|
|
assert_snapshot(cur)
|
|
|
|
header("PRE-COUNTS")
|
|
print_counts(cur, "before")
|
|
|
|
header("BATCHES")
|
|
n_t1 = batch_T1_type(cur, apply_mode)
|
|
n_c1 = batch_C1_filepath_stat(cur, apply_mode)
|
|
n_c2c3 = batch_C2_C3_watcher_state(cur, apply_mode)
|
|
n_c4 = batch_C4_chatgpt_export(cur, apply_mode)
|
|
n_c5 = batch_C5_sentinel(cur, apply_mode)
|
|
|
|
header("POST-COUNTS")
|
|
print_counts(cur, "after" if apply_mode else "after (in-transaction, will rollback)")
|
|
|
|
if apply_mode:
|
|
pg.commit()
|
|
print("\nCOMMITTED.")
|
|
else:
|
|
pg.rollback()
|
|
print("\nROLLED BACK (dry-run).")
|
|
|
|
print(f"\nSummary: T1={n_t1} C1={n_c1} C2+C3={n_c2c3} C4={n_c4} C5={n_c5}")
|
|
pg.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|