embeddings: backfill type and created_at (Improvement #2 part A)
Backfills 9,815 type-NULL rows to 'document' (extension classifier, 100% hit)
and 12,109 created_at-NULL rows via five batches:
C1 filepath_stat: 9,649 filesystem mtime via metadata.filepath
C2 watcher_state_unique: 676 unique source-name lookup in watcher_state
C3 watcher_state_collision_pick_latest_of_N:
234 collision; most-recent watcher mtime
C4 chatgpt_export: 1,548 convo create_time from export JSONs
(168/168 distinct convo_ids resolved)
C5 sentinel: 2 2026-04-26T00:00:00Z (pgvector migration date)
Provenance written to metadata.type_source and metadata.created_at_source
on every row changed by this run. type_source is empty on rows where the
type field was already populated pre-run; in those cases the snapshot
table is the source of truth for what changed.
Snapshot: embeddings_backup_2026_05_03 (CREATE TABLE AS SELECT id, type,
created_at, metadata FROM embeddings; 14,069 rows; revertable via id-join).
Verification:
V1 live counts: type_null=0 ca_null=0
V2 spot-check 11 rows across cohorts: provenance correct
V3 snapshot intact: 14,069 rows, pre-backfill NULL counts preserved
V4 cross-check vs snapshot: reconciles per-provenance to dry-run
Read-side use (B + C: writer enforcement + minimal retrieval read) deferred
to a separate session. The backfill is complete and verified, but the type
and created_at fields are not yet load-bearing — every current reader still
ignores them. Without B+C this lands as data prep, not behavior change.
This commit is contained in:
@@ -0,0 +1,304 @@
|
||||
"""Backfill embeddings.type and embeddings.created_at (Improvement #2 / A.3).
|
||||
|
||||
Idempotent on cohort predicates (every WHERE clause includes IS NULL on the
|
||||
target column). Writes provenance to metadata.type_source and metadata.created_at_source
|
||||
so each row is auditable and revertable per-source. Default --dry-run=True.
|
||||
|
||||
Order of batches:
|
||||
T1. type backfill: WHERE type IS NULL -> 'document' (extension-classified, all hit).
|
||||
C1. created_at: WHERE ca IS NULL AND metadata.filepath stat-resolves -> filesystem mtime.
|
||||
C2. created_at: WHERE ca IS NULL AND source has unique watcher_state path -> watcher mtime.
|
||||
C3. created_at: WHERE ca IS NULL AND source has watcher_state collision -> most-recent mtime.
|
||||
C4. created_at: WHERE type='chatgpt_conversation' AND ca IS NULL -> export-resolved create_time.
|
||||
C5. created_at: WHERE ca IS NULL (residual) -> sentinel.
|
||||
|
||||
Snapshot table embeddings_backup_2026_05_03 must exist before --apply.
|
||||
|
||||
Usage:
|
||||
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py # dry-run
|
||||
venv/bin/python3 scripts/experiments/embeddings_backfill_apply.py --apply # write
|
||||
|
||||
Exits non-zero if snapshot is missing on --apply.
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor, Json
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env")
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
WATCHER_STATE = Path.home() / "aaronai" / "watcher_state.json"
|
||||
CHATGPT_EXPORT_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/ChatGPT Export")
|
||||
SNAPSHOT_TABLE = "embeddings_backup_2026_05_03"
|
||||
SENTINEL_ISO = "2026-04-26T00:00:00Z"
|
||||
|
||||
|
||||
# ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN, cursor_factory=RealDictCursor)
|
||||
|
||||
|
||||
def header(t):
|
||||
bar = "=" * 70
|
||||
print(f"\n{bar}\n{t}\n{bar}")
|
||||
|
||||
|
||||
def fmt_ts_unix(ts):
|
||||
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def fmt_ts_mtime(p):
|
||||
try:
|
||||
return datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def load_watcher_state():
|
||||
state = json.loads(WATCHER_STATE.read_text())
|
||||
by_name = defaultdict(list)
|
||||
for path, mtime in state.items():
|
||||
by_name[Path(path).name].append((path, mtime))
|
||||
return by_name
|
||||
|
||||
|
||||
def load_chatgpt_index():
|
||||
if not CHATGPT_EXPORT_DIR.exists():
|
||||
return {}
|
||||
index = {}
|
||||
for f in sorted(CHATGPT_EXPORT_DIR.glob("conversations*.json")):
|
||||
try:
|
||||
data = json.loads(f.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
continue
|
||||
for convo in data:
|
||||
cid = convo.get("id") or convo.get("conversation_id")
|
||||
ct = convo.get("create_time")
|
||||
if cid and ct is not None:
|
||||
index[cid] = ct
|
||||
return index
|
||||
|
||||
|
||||
def assert_snapshot(cur):
|
||||
cur.execute("SELECT to_regclass(%s) AS t;", (SNAPSHOT_TABLE,))
|
||||
if cur.fetchone()["t"] is None:
|
||||
print(f"ERROR: snapshot table '{SNAPSHOT_TABLE}' not found. Run A.2 first.")
|
||||
sys.exit(2)
|
||||
cur.execute(f"SELECT COUNT(*) AS n FROM {SNAPSHOT_TABLE};")
|
||||
snap = cur.fetchone()["n"]
|
||||
cur.execute("SELECT COUNT(*) AS n FROM embeddings;")
|
||||
live = cur.fetchone()["n"]
|
||||
print(f"snapshot {SNAPSHOT_TABLE}: {snap} rows; live embeddings: {live} rows")
|
||||
if snap != live:
|
||||
print(f"ERROR: snapshot row count != live ({snap} vs {live}). Refresh snapshot before --apply.")
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
# ─── Batch primitive ────────────────────────────────────────────────────────
|
||||
|
||||
def run_batch(cur, label, candidates, apply_mode):
|
||||
"""candidates: list of (id, set_type, set_ca, type_source, ca_source).
|
||||
set_type / set_ca may be None to leave that column alone.
|
||||
In dry-run we still execute UPDATEs inside an outer transaction (rolled back
|
||||
at the end) so subsequent batches' SELECTs see the correct intermediate state."""
|
||||
n = len(candidates)
|
||||
print(f" {label}: {n} rows queued")
|
||||
if n == 0:
|
||||
return 0
|
||||
for c in candidates[:3]:
|
||||
print(f" sample: id={c[0]} type={c[1]!r} ca={c[2]!r} type_src={c[3]} ca_src={c[4]}")
|
||||
n_written = 0
|
||||
for row_id, set_type, set_ca, type_src, ca_src in candidates:
|
||||
meta_patch = {}
|
||||
if type_src:
|
||||
meta_patch["type_source"] = type_src
|
||||
if ca_src:
|
||||
meta_patch["created_at_source"] = ca_src
|
||||
# Build set list dynamically.
|
||||
sets, params = [], []
|
||||
if set_type is not None:
|
||||
sets.append("type = %s")
|
||||
params.append(set_type)
|
||||
if set_ca is not None:
|
||||
sets.append("created_at = %s")
|
||||
params.append(set_ca)
|
||||
if meta_patch:
|
||||
sets.append("metadata = COALESCE(metadata, '{}'::jsonb) || %s::jsonb")
|
||||
params.append(json.dumps(meta_patch))
|
||||
params.append(row_id)
|
||||
cur.execute(f"UPDATE embeddings SET {', '.join(sets)} WHERE id = %s;", params)
|
||||
n_written += cur.rowcount
|
||||
print(f" {n_written} rows updated{' (will rollback)' if not apply_mode else ''}")
|
||||
return n_written
|
||||
|
||||
|
||||
# ─── Batches ────────────────────────────────────────────────────────────────
|
||||
|
||||
def batch_T1_type(cur, apply_mode):
|
||||
"""type IS NULL -> 'document'. All cohort A rows have a SUPPORTED extension."""
|
||||
cur.execute("""
|
||||
SELECT id, source FROM embeddings WHERE type IS NULL ORDER BY id;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
cands = [(r["id"], "document", None, "inferred_extension", None) for r in rows]
|
||||
return run_batch(cur, "T1 type IS NULL -> 'document'", cands, apply_mode)
|
||||
|
||||
|
||||
def batch_C1_filepath_stat(cur, apply_mode):
|
||||
"""ca IS NULL AND metadata.filepath stat-resolves -> mtime."""
|
||||
cur.execute("""
|
||||
SELECT id, source, metadata->>'filepath' AS fp
|
||||
FROM embeddings
|
||||
WHERE created_at IS NULL AND metadata->>'filepath' IS NOT NULL
|
||||
ORDER BY id;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
cands, n_skipped_missing = [], 0
|
||||
for r in rows:
|
||||
p = Path(r["fp"])
|
||||
if p.exists():
|
||||
mt = fmt_ts_mtime(p)
|
||||
if mt:
|
||||
cands.append((r["id"], None, mt, None, "filepath_stat"))
|
||||
continue
|
||||
n_skipped_missing += 1
|
||||
print(f" C1 candidates: {len(cands)} (skipped {n_skipped_missing} where filepath gone or unstattable)")
|
||||
return run_batch(cur, "C1 ca IS NULL AND filepath stat-resolves -> mtime", cands, apply_mode)
|
||||
|
||||
|
||||
def batch_C2_C3_watcher_state(cur, apply_mode):
|
||||
"""ca IS NULL AND filepath unresolvable -> watcher_state by source basename.
|
||||
C2 = unique hit, C3 = collision pick-latest."""
|
||||
by_name = load_watcher_state()
|
||||
cur.execute("""
|
||||
SELECT id, source, metadata->>'filepath' AS fp
|
||||
FROM embeddings
|
||||
WHERE created_at IS NULL
|
||||
ORDER BY id;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
c2, c3 = [], []
|
||||
skipped_no_match = 0
|
||||
for r in rows:
|
||||
# skip rows already targeted by C1 path
|
||||
if r["fp"] and Path(r["fp"]).exists():
|
||||
continue
|
||||
src = r["source"]
|
||||
if not src or src not in by_name:
|
||||
skipped_no_match += 1
|
||||
continue
|
||||
candidates = by_name[src]
|
||||
if len(candidates) == 1:
|
||||
mt = fmt_ts_unix(candidates[0][1])
|
||||
c2.append((r["id"], None, mt, None, "watcher_state_unique"))
|
||||
else:
|
||||
latest = max(candidates, key=lambda x: float(x[1]))
|
||||
mt = fmt_ts_unix(latest[1])
|
||||
c3.append((r["id"], None, mt, None, f"watcher_state_collision_pick_latest_of_{len(candidates)}"))
|
||||
print(f" C2/C3 source-basename fallback: {len(c2)} unique, {len(c3)} collision, "
|
||||
f"{skipped_no_match} unmatched (will fall to C4/C5)")
|
||||
n2 = run_batch(cur, "C2 ca IS NULL AND watcher_state unique -> mtime", c2, apply_mode)
|
||||
n3 = run_batch(cur, "C3 ca IS NULL AND watcher_state collision -> latest mtime", c3, apply_mode)
|
||||
return n2 + n3
|
||||
|
||||
|
||||
def batch_C4_chatgpt_export(cur, apply_mode):
|
||||
index = load_chatgpt_index()
|
||||
cur.execute("""
|
||||
SELECT id, source FROM embeddings
|
||||
WHERE type='chatgpt_conversation' AND created_at IS NULL ORDER BY id;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
cands, unresolved = [], 0
|
||||
for r in rows:
|
||||
m = re.match(r"^chatgpt_(.+)_(\d+)$", r["id"])
|
||||
cid = m.group(1) if m else None
|
||||
ct = index.get(cid)
|
||||
if ct is None:
|
||||
unresolved += 1
|
||||
continue
|
||||
ct_iso = datetime.fromtimestamp(float(ct), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
cands.append((r["id"], None, ct_iso, None, "chatgpt_export"))
|
||||
print(f" C4 chatgpt export resolution: {len(cands)} resolved, {unresolved} unresolved (fall to C5)")
|
||||
return run_batch(cur, "C4 type='chatgpt_conversation' AND ca IS NULL -> export create_time", cands, apply_mode)
|
||||
|
||||
|
||||
def batch_C5_sentinel(cur, apply_mode):
|
||||
cur.execute("""
|
||||
SELECT id, type, source FROM embeddings WHERE created_at IS NULL ORDER BY id;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
cands = [(r["id"], None, SENTINEL_ISO, None, "sentinel") for r in rows]
|
||||
if cands:
|
||||
sample_types = Counter(r["type"] for r in rows)
|
||||
print(f" C5 residual sentinel rows by type: {dict(sample_types)}")
|
||||
return run_batch(cur, f"C5 ca IS NULL residual -> sentinel {SENTINEL_ISO}", cands, apply_mode)
|
||||
|
||||
|
||||
# ─── Pre/post counts ────────────────────────────────────────────────────────
|
||||
|
||||
def print_counts(cur, label):
|
||||
cur.execute("""
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
COUNT(*) FILTER (WHERE type IS NULL) AS type_null,
|
||||
COUNT(*) FILTER (WHERE created_at IS NULL) AS ca_null
|
||||
FROM embeddings;
|
||||
""")
|
||||
r = cur.fetchone()
|
||||
print(f" [{label}] total={r['total']} type_null={r['type_null']} ca_null={r['ca_null']}")
|
||||
|
||||
|
||||
# ─── Driver ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--apply", action="store_true", help="default false (dry-run)")
|
||||
args = ap.parse_args()
|
||||
apply_mode = args.apply
|
||||
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
|
||||
print(f"Mode: {'APPLY (writes will commit)' if apply_mode else 'DRY-RUN (no writes)'}")
|
||||
print(f"Sentinel: {SENTINEL_ISO}")
|
||||
|
||||
if apply_mode:
|
||||
assert_snapshot(cur)
|
||||
|
||||
header("PRE-COUNTS")
|
||||
print_counts(cur, "before")
|
||||
|
||||
header("BATCHES")
|
||||
n_t1 = batch_T1_type(cur, apply_mode)
|
||||
n_c1 = batch_C1_filepath_stat(cur, apply_mode)
|
||||
n_c2c3 = batch_C2_C3_watcher_state(cur, apply_mode)
|
||||
n_c4 = batch_C4_chatgpt_export(cur, apply_mode)
|
||||
n_c5 = batch_C5_sentinel(cur, apply_mode)
|
||||
|
||||
header("POST-COUNTS")
|
||||
print_counts(cur, "after" if apply_mode else "after (in-transaction, will rollback)")
|
||||
|
||||
if apply_mode:
|
||||
pg.commit()
|
||||
print("\nCOMMITTED.")
|
||||
else:
|
||||
pg.rollback()
|
||||
print("\nROLLED BACK (dry-run).")
|
||||
|
||||
print(f"\nSummary: T1={n_t1} C1={n_c1} C2+C3={n_c2c3} C4={n_c4} C5={n_c5}")
|
||||
pg.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,557 @@
|
||||
"""Read-only inspection for the embeddings.type / embeddings.created_at backfill (Improvement #2 / A.1).
|
||||
|
||||
Produces a survey of every backfill source-of-truth question without writing
|
||||
to the database. Output is a human-readable report on stdout plus a JSON
|
||||
sidecar at experiments/embeddings_backfill_inspection_<date>.json.
|
||||
|
||||
Sections:
|
||||
1. Cohort recap (counts; should match prior investigation).
|
||||
2. Cohort A type inference: extension classifier coverage.
|
||||
3. created_at inference for cohort A + B-doc-old:
|
||||
- rows with metadata.filepath: stat the file, check existence.
|
||||
- rows without filepath: lookup source against watcher_state.json.
|
||||
- filename-collision shape audit (live+backup, live+archive, ambiguous).
|
||||
4. ChatGPT export resolution (Plan A.1 addition #1):
|
||||
- existence of /home/aaron/nextcloud/.../ChatGPT Export/.
|
||||
- sample 5 B-chatgpt rows; resolve convo_id -> create_time.
|
||||
5. Sentinel date discovery (Plan A.1 addition #3):
|
||||
- earliest non-NULL created_at per type (already-populated rows are the
|
||||
lower bound for when the substrate started carrying timestamps).
|
||||
- git log for the pgvector migration commit.
|
||||
- any ChromaDB sqlite still on disk.
|
||||
- propose a sentinel with reasoning, or flag as arbitrary.
|
||||
6. 50-row stratified sample: derived (type, created_at, source) per row.
|
||||
|
||||
Usage: venv/bin/python3 scripts/experiments/embeddings_backfill_inspection.py
|
||||
|
||||
Read-only. No DB writes. No filesystem writes outside experiments/.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env")
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
WATCHER_STATE = Path.home() / "aaronai" / "watcher_state.json"
|
||||
CHATGPT_EXPORT_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/ChatGPT Export")
|
||||
NEXTCLOUD_ROOT = Path("/home/aaron/nextcloud/data/data/aaron/files")
|
||||
OUT_PATH = Path.home() / "aaronai" / "experiments" / f"embeddings_backfill_inspection_{datetime.now().strftime('%Y-%m-%d')}.json"
|
||||
|
||||
SUPPORTED_EXT = {".pdf", ".docx", ".pptx", ".txt", ".md"}
|
||||
|
||||
random.seed(20260503)
|
||||
|
||||
|
||||
# ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN, cursor_factory=RealDictCursor)
|
||||
|
||||
|
||||
def header(title):
|
||||
bar = "=" * 70
|
||||
print(f"\n{bar}\n{title}\n{bar}")
|
||||
|
||||
|
||||
def sub(title):
|
||||
print(f"\n--- {title} ---")
|
||||
|
||||
|
||||
def fmt_ts_from_unix(ts):
|
||||
"""Watcher state stores unix timestamps as strings."""
|
||||
try:
|
||||
return datetime.fromtimestamp(float(ts), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def fmt_ts_from_st_mtime(p):
|
||||
try:
|
||||
return datetime.fromtimestamp(p.stat().st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def load_watcher_state():
|
||||
"""Returns (path -> mtime_str), and (basename -> [(path, mtime_str), ...])."""
|
||||
state = json.loads(WATCHER_STATE.read_text())
|
||||
by_path = state
|
||||
by_name = defaultdict(list)
|
||||
for path, mtime in state.items():
|
||||
by_name[Path(path).name].append((path, mtime))
|
||||
return by_path, by_name
|
||||
|
||||
|
||||
def classify_collision_shape(paths):
|
||||
"""Categorize a filename-collision group:
|
||||
- 'live+backup' : exactly one path doesn't contain backup/.bak markers
|
||||
and others do
|
||||
- 'live+archive' : exactly one is outside Archive/ and others are inside
|
||||
- 'multi-live' : >=2 paths look like live (no backup/archive markers)
|
||||
- 'all-archive' : every path is inside Archive/ or backup-like
|
||||
- 'other'
|
||||
"""
|
||||
def is_backup(p):
|
||||
s = p.lower()
|
||||
return ".bak" in s or "/backup" in s or "backups/" in s
|
||||
def is_archive(p):
|
||||
s = p.lower()
|
||||
return "/archive/" in s
|
||||
backups = [p for p in paths if is_backup(p)]
|
||||
archives = [p for p in paths if is_archive(p)]
|
||||
live = [p for p in paths if not is_backup(p) and not is_archive(p)]
|
||||
if len(live) == 1 and len(backups) >= 1 and len(archives) == 0:
|
||||
return "live+backup"
|
||||
if len(live) == 1 and len(archives) >= 1 and len(backups) == 0:
|
||||
return "live+archive"
|
||||
if len(live) == 1 and (len(backups) + len(archives)) >= 1:
|
||||
return "live+mixed-old"
|
||||
if len(live) >= 2:
|
||||
return "multi-live"
|
||||
if len(live) == 0:
|
||||
return "all-archive-or-backup"
|
||||
return "other"
|
||||
|
||||
|
||||
# ─── Section 1: Cohort recap ────────────────────────────────────────────────
|
||||
|
||||
def section_1_cohort_recap(cur):
|
||||
header("1. COHORT RECAP")
|
||||
cur.execute("""
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
COUNT(*) FILTER (WHERE type IS NULL) AS type_null,
|
||||
COUNT(*) FILTER (WHERE created_at IS NULL) AS ca_null,
|
||||
COUNT(*) FILTER (WHERE type IS NULL AND created_at IS NULL) AS both_null,
|
||||
COUNT(*) FILTER (WHERE type IS NOT NULL AND created_at IS NOT NULL) AS both_set
|
||||
FROM embeddings;
|
||||
""")
|
||||
overall = cur.fetchone()
|
||||
print(f"Total: {overall['total']} type_null: {overall['type_null']} "
|
||||
f"ca_null: {overall['ca_null']} both_null: {overall['both_null']} "
|
||||
f"both_set: {overall['both_set']}")
|
||||
|
||||
cur.execute("""
|
||||
SELECT type, created_at IS NULL AS ca_null, COUNT(*) AS n
|
||||
FROM embeddings GROUP BY type, ca_null ORDER BY type NULLS LAST, ca_null;
|
||||
""")
|
||||
cohorts = cur.fetchall()
|
||||
sub("Per-(type, ca_null) cohorts")
|
||||
for r in cohorts:
|
||||
print(f" type={r['type'] or 'NULL':<22} ca_null={r['ca_null']!s:<5} n={r['n']}")
|
||||
return {"overall": overall, "cohorts": cohorts}
|
||||
|
||||
|
||||
# ─── Section 2: Cohort A type inference ─────────────────────────────────────
|
||||
|
||||
def section_2_type_inference(cur):
|
||||
header("2. COHORT A TYPE INFERENCE (extension classifier)")
|
||||
cur.execute("""
|
||||
SELECT LOWER(SUBSTRING(source FROM '\.[^.]+$')) AS ext, COUNT(*) AS rows
|
||||
FROM embeddings WHERE type IS NULL
|
||||
GROUP BY ext ORDER BY rows DESC;
|
||||
""")
|
||||
by_ext = cur.fetchall()
|
||||
classified = sum(r["rows"] for r in by_ext if r["ext"] in SUPPORTED_EXT)
|
||||
unknown = sum(r["rows"] for r in by_ext if r["ext"] not in SUPPORTED_EXT)
|
||||
print(f"NULL-type rows by extension:")
|
||||
for r in by_ext:
|
||||
flag = "OK" if r["ext"] in SUPPORTED_EXT else "??"
|
||||
print(f" {flag} {r['ext'] or '(none)':<8} rows={r['rows']}")
|
||||
print(f"\nClassified as 'document' via extension: {classified}")
|
||||
print(f"Unclassifiable (no SUPPORTED extension): {unknown}")
|
||||
return {"by_ext": by_ext, "classified": classified, "unclassifiable": unknown}
|
||||
|
||||
|
||||
# ─── Section 3: created_at inference ────────────────────────────────────────
|
||||
|
||||
def section_3_created_at_inference(cur):
|
||||
header("3. CREATED_AT INFERENCE — file-derived rows")
|
||||
by_path, by_name = load_watcher_state()
|
||||
print(f"watcher_state.json: {len(by_path)} tracked paths, "
|
||||
f"{len(by_name)} distinct filenames, "
|
||||
f"{sum(1 for v in by_name.values() if len(v) > 1)} filename collisions")
|
||||
|
||||
# 3a. Rows with metadata.filepath: probe stat()
|
||||
sub("3a. Rows with metadata.filepath — stat probe")
|
||||
cur.execute("""
|
||||
SELECT id, source, metadata->>'filepath' AS filepath
|
||||
FROM embeddings
|
||||
WHERE created_at IS NULL AND metadata->>'filepath' IS NOT NULL;
|
||||
""")
|
||||
rows_with_fp = cur.fetchall()
|
||||
fp_exists = 0
|
||||
fp_missing = 0
|
||||
fp_outside_root = 0
|
||||
sample_resolved = []
|
||||
for r in rows_with_fp:
|
||||
p = Path(r["filepath"])
|
||||
if not str(p).startswith(str(NEXTCLOUD_ROOT)):
|
||||
fp_outside_root += 1
|
||||
if p.exists():
|
||||
fp_exists += 1
|
||||
if len(sample_resolved) < 5:
|
||||
sample_resolved.append({
|
||||
"id": r["id"], "source": r["source"],
|
||||
"filepath": str(p), "mtime": fmt_ts_from_st_mtime(p),
|
||||
})
|
||||
else:
|
||||
fp_missing += 1
|
||||
print(f" rows with metadata.filepath: {len(rows_with_fp)}")
|
||||
print(f" exists on disk: {fp_exists}")
|
||||
print(f" missing on disk: {fp_missing}")
|
||||
print(f" outside Nextcloud root: {fp_outside_root}")
|
||||
print(f" Sample of 5 resolved mtimes:")
|
||||
for s in sample_resolved:
|
||||
print(f" {s['id']:<15} {s['source'][:60]:<60} mtime={s['mtime']}")
|
||||
|
||||
# 3b. Rows without metadata.filepath: watcher_state lookup
|
||||
sub("3b. Rows without metadata.filepath — watcher_state lookup")
|
||||
cur.execute("""
|
||||
SELECT id, source FROM embeddings
|
||||
WHERE created_at IS NULL
|
||||
AND metadata->>'filepath' IS NULL
|
||||
AND type IS NULL OR (type='document' AND created_at IS NULL AND metadata->>'filepath' IS NULL);
|
||||
""")
|
||||
rows_no_fp = cur.fetchall()
|
||||
# Distinct source basenames to look up
|
||||
basenames_to_resolve = sorted({r["source"] for r in rows_no_fp if r["source"]})
|
||||
n_resolved_unique = sum(1 for n in basenames_to_resolve if len(by_name.get(n, [])) == 1)
|
||||
n_collision_unique = sum(1 for n in basenames_to_resolve if len(by_name.get(n, [])) > 1)
|
||||
n_unfound = sum(1 for n in basenames_to_resolve if n not in by_name)
|
||||
print(f" rows without filepath: {len(rows_no_fp)}")
|
||||
print(f" distinct source basenames to resolve: {len(basenames_to_resolve)}")
|
||||
print(f" unique watcher_state hit (no collision): {n_resolved_unique}")
|
||||
print(f" collision in watcher_state (>1 path): {n_collision_unique}")
|
||||
print(f" not in watcher_state at all: {n_unfound}")
|
||||
|
||||
# 3c. Collision-shape audit
|
||||
sub("3c. Collision-shape audit — all collisions in watcher_state")
|
||||
collisions = {n: [(p, m) for p, m in by_name[n]] for n in by_name if len(by_name[n]) > 1}
|
||||
shape_counts = Counter()
|
||||
rows_affected_by_shape = Counter()
|
||||
# Map from basename to count of NULL-ca rows that need it (rows_no_fp)
|
||||
rows_no_fp_by_name = Counter(r["source"] for r in rows_no_fp)
|
||||
sample_per_shape = defaultdict(list)
|
||||
for name, paths_mtimes in collisions.items():
|
||||
paths = [p for p, _ in paths_mtimes]
|
||||
shape = classify_collision_shape(paths)
|
||||
shape_counts[shape] += 1
|
||||
rows_affected_by_shape[shape] += rows_no_fp_by_name.get(name, 0)
|
||||
if len(sample_per_shape[shape]) < 3:
|
||||
entry = {
|
||||
"name": name,
|
||||
"rows_no_fp_using_this_name": rows_no_fp_by_name.get(name, 0),
|
||||
"candidates": [
|
||||
{"path": p, "mtime": fmt_ts_from_unix(m)}
|
||||
for p, m in sorted(paths_mtimes, key=lambda x: -float(x[1]))
|
||||
],
|
||||
}
|
||||
sample_per_shape[shape].append(entry)
|
||||
print(f" collisions in watcher_state: {len(collisions)}")
|
||||
print(f" shape breakdown:")
|
||||
for shape, n in shape_counts.most_common():
|
||||
print(f" {shape:<22} collisions={n:<4} rows_affected={rows_affected_by_shape[shape]}")
|
||||
print(f"\n Up-to-3 sample collisions per shape (sorted by mtime desc):")
|
||||
for shape, samples in sample_per_shape.items():
|
||||
print(f" [{shape}]")
|
||||
for s in samples:
|
||||
print(f" {s['name']} (rows_no_fp using this name: {s['rows_no_fp_using_this_name']})")
|
||||
for c in s["candidates"]:
|
||||
print(f" {c['mtime']} {c['path']}")
|
||||
|
||||
return {
|
||||
"watcher_state_paths": len(by_path),
|
||||
"watcher_state_basenames": len(by_name),
|
||||
"watcher_state_collisions": len(collisions),
|
||||
"rows_with_filepath": {
|
||||
"total": len(rows_with_fp),
|
||||
"exists": fp_exists, "missing": fp_missing,
|
||||
"outside_root": fp_outside_root,
|
||||
"sample": sample_resolved,
|
||||
},
|
||||
"rows_without_filepath": {
|
||||
"total": len(rows_no_fp),
|
||||
"distinct_basenames": len(basenames_to_resolve),
|
||||
"unique_hit": n_resolved_unique,
|
||||
"collision_hit": n_collision_unique,
|
||||
"unfound": n_unfound,
|
||||
},
|
||||
"collision_shapes": {
|
||||
"total": len(collisions),
|
||||
"shape_counts": dict(shape_counts),
|
||||
"rows_affected_by_shape": dict(rows_affected_by_shape),
|
||||
"samples": {k: v for k, v in sample_per_shape.items()},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ─── Section 4: ChatGPT export resolution ───────────────────────────────────
|
||||
|
||||
def section_4_chatgpt_export(cur):
|
||||
header("4. CHATGPT EXPORT RESOLUTION (Plan addition #1)")
|
||||
print(f"Probing: {CHATGPT_EXPORT_DIR}")
|
||||
if not CHATGPT_EXPORT_DIR.exists():
|
||||
print(" NOT FOUND — plan on sentinel for entire B-chatgpt cohort.")
|
||||
return {"export_dir_exists": False, "files": []}
|
||||
files = sorted(CHATGPT_EXPORT_DIR.glob("conversations*.json"))
|
||||
print(f" found {len(files)} export file(s):")
|
||||
for f in files:
|
||||
print(f" {f.name} size={f.stat().st_size:,} mtime={fmt_ts_from_st_mtime(f)}")
|
||||
|
||||
# Build convo_id -> create_time index from all export files.
|
||||
print("\nLoading export(s) to build convo_id -> create_time index...")
|
||||
convo_index = {}
|
||||
for f in files:
|
||||
try:
|
||||
data = json.loads(f.read_text(encoding="utf-8"))
|
||||
except Exception as e:
|
||||
print(f" failed to parse {f.name}: {e}")
|
||||
continue
|
||||
for convo in data:
|
||||
cid = convo.get("id") or convo.get("conversation_id")
|
||||
ct = convo.get("create_time")
|
||||
if cid and ct is not None:
|
||||
convo_index[cid] = ct
|
||||
print(f" indexed {len(convo_index)} conversations across {len(files)} export files")
|
||||
|
||||
# Sample 5 chatgpt_conversation rows; resolve.
|
||||
cur.execute("""
|
||||
SELECT id, source FROM embeddings
|
||||
WHERE type='chatgpt_conversation' AND created_at IS NULL
|
||||
ORDER BY random() LIMIT 5;
|
||||
""")
|
||||
sample = cur.fetchall()
|
||||
sub("Sample of 5 B-chatgpt rows: convo lookup")
|
||||
resolved = 0
|
||||
sample_results = []
|
||||
for r in sample:
|
||||
# IDs look like chatgpt_<uuid>_<idx>; uuid extends until last underscore.
|
||||
m = re.match(r"^chatgpt_(.+)_(\d+)$", r["id"])
|
||||
cid = m.group(1) if m else None
|
||||
ct = convo_index.get(cid)
|
||||
ct_iso = None
|
||||
if ct is not None:
|
||||
try:
|
||||
ct_iso = datetime.fromtimestamp(float(ct), tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
except Exception:
|
||||
ct_iso = None
|
||||
if ct_iso:
|
||||
resolved += 1
|
||||
sample_results.append({
|
||||
"id": r["id"], "source": r["source"], "convo_id": cid,
|
||||
"create_time": ct, "create_time_iso": ct_iso,
|
||||
"resolved": ct_iso is not None,
|
||||
})
|
||||
print(f" {r['id']} cid={cid}")
|
||||
print(f" -> create_time={ct} iso={ct_iso}")
|
||||
print(f"\nResolved {resolved}/5. "
|
||||
f"{'PROCEED with re-derive for full cohort.' if resolved == 5 else 'PARTIAL — plan re-derive + sentinel for unresolved.'}")
|
||||
|
||||
# Estimate full-cohort coverage by counting how many B-chatgpt convo_ids appear in the index.
|
||||
cur.execute("""
|
||||
SELECT DISTINCT regexp_replace(id, '^chatgpt_(.+)_\\d+$', '\\1') AS cid
|
||||
FROM embeddings WHERE type='chatgpt_conversation' AND created_at IS NULL;
|
||||
""")
|
||||
distinct_cids = [r["cid"] for r in cur.fetchall()]
|
||||
in_index = sum(1 for c in distinct_cids if c in convo_index)
|
||||
print(f"Full-cohort coverage estimate: {in_index} / {len(distinct_cids)} distinct convo_ids "
|
||||
f"resolvable from export.")
|
||||
return {
|
||||
"export_dir_exists": True,
|
||||
"files": [{"name": f.name, "size": f.stat().st_size, "mtime": fmt_ts_from_st_mtime(f)} for f in files],
|
||||
"convo_index_size": len(convo_index),
|
||||
"sample_results": sample_results,
|
||||
"sample_resolved": resolved,
|
||||
"full_cohort": {
|
||||
"distinct_convo_ids": len(distinct_cids),
|
||||
"resolvable_from_export": in_index,
|
||||
"unresolvable": len(distinct_cids) - in_index,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ─── Section 5: Sentinel date discovery ─────────────────────────────────────
|
||||
|
||||
def section_5_sentinel(cur):
|
||||
header("5. SENTINEL DATE DISCOVERY (Plan addition #3)")
|
||||
|
||||
# 5a. Earliest non-NULL created_at per type: lower bound on substrate age.
|
||||
sub("5a. Earliest non-NULL created_at per type")
|
||||
cur.execute("""
|
||||
SELECT type, MIN(created_at) AS earliest, MAX(created_at) AS latest, COUNT(*) AS rows
|
||||
FROM embeddings WHERE created_at IS NOT NULL GROUP BY type ORDER BY type;
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
print(f" {r['type']:<22} earliest={r['earliest']:<32} latest={r['latest']}")
|
||||
|
||||
# 5b. git log for the pgvector-migration commit.
|
||||
sub("5b. Git log — pgvector migration commits")
|
||||
git_findings = []
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["git", "log", "--all", "--format=%H %ci %s",
|
||||
"--", "deprecated/migrate_to_pgvector.py", "scripts/migrate_to_pgvector.py"],
|
||||
cwd=str(Path.home() / "aaronai"), capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
for line in out.stdout.strip().splitlines():
|
||||
print(f" {line}")
|
||||
git_findings.append(line)
|
||||
except Exception as e:
|
||||
print(f" git log failed: {e}")
|
||||
# Also: when did the api/ingest scripts cut over to pgvector?
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["git", "log", "--all", "--format=%H %ci %s", "--grep=pgvector", "-i"],
|
||||
cwd=str(Path.home() / "aaronai"), capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
print("\n Commits mentioning pgvector:")
|
||||
for line in out.stdout.strip().splitlines()[:10]:
|
||||
print(f" {line}")
|
||||
git_findings.append(line)
|
||||
except Exception as e:
|
||||
print(f" git log (pgvector grep) failed: {e}")
|
||||
|
||||
# 5c. ChromaDB sqlite still on disk?
|
||||
sub("5c. ChromaDB dump on disk?")
|
||||
candidates = []
|
||||
for root in [Path.home() / "aaronai", Path.home() / "aaronai" / "db"]:
|
||||
if root.exists():
|
||||
for p in root.rglob("chroma*.sqlite*"):
|
||||
candidates.append({"path": str(p), "mtime": fmt_ts_from_st_mtime(p)})
|
||||
if candidates:
|
||||
for c in candidates:
|
||||
print(f" found: {c['path']} mtime={c['mtime']}")
|
||||
else:
|
||||
print(" no ChromaDB sqlite found under ~/aaronai")
|
||||
|
||||
# 5d. Propose sentinel.
|
||||
sub("5d. Sentinel proposal")
|
||||
# Earliest doc cutover: per query, document=2026-04-30. Migration commit f78b830 was
|
||||
# 2026-04-26. Most defensible sentinel for "rows that entered pgvector before NOW()
|
||||
# writes were canonical" = the migration commit date.
|
||||
proposed = "2026-04-26T00:00:00Z"
|
||||
reasoning = (
|
||||
"git f78b830 'Migrate to pgvector — remove ChromaDB from api.py, ingest scripts, "
|
||||
"dream.py' is dated 2026-04-26. The earliest type='document' row with a non-NULL "
|
||||
"created_at lands 2026-04-30 (the F11 canonical-encoding cutover). Rows with NULL "
|
||||
"created_at all predate F11 and most predate the pgvector cutover itself. "
|
||||
"2026-04-26 is the date the ChromaDB->pgvector migration script was committed, "
|
||||
"so any row currently in the embeddings table with NULL created_at must have been "
|
||||
"ingested on or after that date (when the table came into existence in current form). "
|
||||
"It is the tightest defensible upper bound on 'the row entered pgvector before "
|
||||
"timestamps were tracked', so it is the right sentinel."
|
||||
)
|
||||
print(f" Proposed sentinel: {proposed}")
|
||||
print(f" Reasoning: {reasoning}")
|
||||
|
||||
return {
|
||||
"earliest_per_type": rows,
|
||||
"git_findings": git_findings,
|
||||
"chromadb_candidates": candidates,
|
||||
"proposed_sentinel": proposed,
|
||||
"reasoning": reasoning,
|
||||
}
|
||||
|
||||
|
||||
# ─── Section 6: 50-row stratified sample ────────────────────────────────────
|
||||
|
||||
def section_6_stratified_sample(cur, sentinel_iso):
|
||||
header("6. 50-ROW STRATIFIED SAMPLE — derived (type, created_at, source)")
|
||||
by_path, by_name = load_watcher_state()
|
||||
|
||||
cohorts = [
|
||||
("A (type NULL, ca NULL)", "type IS NULL AND created_at IS NULL", 10),
|
||||
("B-doc-old (type='document', ca NULL)", "type='document' AND created_at IS NULL", 10),
|
||||
("B-chatgpt (type='chatgpt_conversation', ca NULL)", "type='chatgpt_conversation' AND created_at IS NULL", 10),
|
||||
("C-doc-new (type='document', ca set)", "type='document' AND created_at IS NOT NULL", 10),
|
||||
("C-claude (type='claude_conversation', ca set)", "type='claude_conversation' AND created_at IS NOT NULL", 5),
|
||||
("C-aaronai (type='aaronai_conversation', ca set)", "type='aaronai_conversation' AND created_at IS NOT NULL", 5),
|
||||
]
|
||||
|
||||
samples = []
|
||||
for label, predicate, n in cohorts:
|
||||
sub(f"{label} (sample size: {n})")
|
||||
cur.execute(f"""
|
||||
SELECT id, source, type, created_at, metadata
|
||||
FROM embeddings WHERE {predicate}
|
||||
ORDER BY random() LIMIT %s;
|
||||
""", (n,))
|
||||
rows = cur.fetchall()
|
||||
for r in rows:
|
||||
row_meta = r["metadata"] or {}
|
||||
fp = row_meta.get("filepath")
|
||||
inferred_type = r["type"] or ("document" if (r["source"] or "").lower().endswith(tuple(SUPPORTED_EXT)) else "?")
|
||||
inferred_ca = r["created_at"]
|
||||
inferred_ca_source = "preserved" if inferred_ca else None
|
||||
if not inferred_ca:
|
||||
if fp and Path(fp).exists():
|
||||
inferred_ca = fmt_ts_from_st_mtime(Path(fp))
|
||||
inferred_ca_source = "filepath_stat"
|
||||
elif r["source"] and r["source"] in by_name:
|
||||
candidates = by_name[r["source"]]
|
||||
if len(candidates) == 1:
|
||||
inferred_ca = fmt_ts_from_unix(candidates[0][1])
|
||||
inferred_ca_source = "watcher_state_unique"
|
||||
else:
|
||||
# take most recent
|
||||
latest = max(candidates, key=lambda x: float(x[1]))
|
||||
inferred_ca = fmt_ts_from_unix(latest[1])
|
||||
inferred_ca_source = f"watcher_state_collision_pick_latest_of_{len(candidates)}"
|
||||
else:
|
||||
inferred_ca = sentinel_iso
|
||||
inferred_ca_source = "sentinel"
|
||||
print(f" id={r['id']:<22} src={(r['source'] or '')[:38]:<38}")
|
||||
print(f" existing: type={r['type']!r:<22} ca={r['created_at']!r}")
|
||||
print(f" inferred: type={inferred_type!r:<22} ca={inferred_ca!r} ({inferred_ca_source})")
|
||||
samples.append({
|
||||
"cohort": label, "id": r["id"], "source": r["source"],
|
||||
"existing_type": r["type"], "existing_ca": r["created_at"],
|
||||
"inferred_type": inferred_type, "inferred_ca": inferred_ca,
|
||||
"inferred_ca_source": inferred_ca_source,
|
||||
})
|
||||
return samples
|
||||
|
||||
|
||||
# ─── Driver ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
|
||||
out = {"generated_at": datetime.now(timezone.utc).isoformat()}
|
||||
out["section_1"] = section_1_cohort_recap(cur)
|
||||
out["section_2"] = section_2_type_inference(cur)
|
||||
out["section_3"] = section_3_created_at_inference(cur)
|
||||
out["section_4"] = section_4_chatgpt_export(cur)
|
||||
out["section_5"] = section_5_sentinel(cur)
|
||||
sentinel_iso = out["section_5"]["proposed_sentinel"]
|
||||
out["section_6"] = section_6_stratified_sample(cur, sentinel_iso)
|
||||
|
||||
pg.close()
|
||||
|
||||
# JSON sidecar — strip non-serializables.
|
||||
def _serialize(o):
|
||||
if isinstance(o, datetime):
|
||||
return o.isoformat()
|
||||
return str(o)
|
||||
|
||||
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
OUT_PATH.write_text(json.dumps(out, indent=2, default=_serialize))
|
||||
print(f"\nJSON sidecar written: {OUT_PATH}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user