diff --git a/scripts/backfill_consolidation_cursor.py b/scripts/backfill_consolidation_cursor.py new file mode 100644 index 0000000..ffad01c --- /dev/null +++ b/scripts/backfill_consolidation_cursor.py @@ -0,0 +1,128 @@ +"""One-off: backfill last_consolidated_at + consolidation_count on embeddings +from the dream-manifest-*.json files already in Journal/Dreams/. + +Why this exists: the consolidation cursor columns added by the dreamer +redesign migration default to NULL / 0. Without history, the +underprocessed-count signal in dream_observation.observe_corpus() reports +"every chunk is underprocessed" (degenerate percentile), and NREM has no +basis to bias replay toward least-recently-consolidated chunks. + +We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each +listing the sources retrieved per stage. For each (manifest, source) pair +this script: + - finds matching embeddings rows by source (basename match) + - increments consolidation_count by 1 + - updates last_consolidated_at to the manifest date (UTC midnight) + +Idempotent: re-running will not double-count because we drop existing +cursor values to NULL/0 before backfilling. Pass --dry-run to print what +would change without writing. +""" + +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path + +from dotenv import load_dotenv +import psycopg2 + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +PG_DSN = os.getenv("PG_DSN") +DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams") +DRY_RUN = "--dry-run" in sys.argv + + +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def collect_manifest_records(): + """Return a list of (source_basename, manifest_date_utc) tuples from all + dream-manifest-*.json files. One pair per (manifest, source) appearance.""" + pairs = [] + if not DREAMS_DIR.exists(): + return pairs + for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")): + try: + m = json.loads(path.read_text()) + except Exception as e: + print(f" skip {path.name}: {e}") + continue + date_str = m.get("date") + if not date_str: + continue + try: + dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) + except ValueError: + continue + stages = m.get("stages") or {} + for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"): + stage = stages.get(stage_name) or {} + for src in (stage.get("sources") or []): + if src: + pairs.append((src, dt)) + return pairs + + +def main(): + print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}") + print(f"Scanning manifests in {DREAMS_DIR}") + pairs = collect_manifest_records() + print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests") + if not pairs: + print("Nothing to backfill.") + return + + # Aggregate per source: count + latest date + from collections import defaultdict + counts = defaultdict(int) + latest = {} + for src, dt in pairs: + counts[src] += 1 + if src not in latest or dt > latest[src]: + latest[src] = dt + print(f"Unique sources to update: {len(counts)}") + + # Sample what we'd write + print("Sample (top 5 by appearance count):") + for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]: + print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}") + + if DRY_RUN: + print("\nDry-run only. Re-run without --dry-run to apply.") + return + + pg = get_pg() + cur = pg.cursor() + + # Reset cursor for any sources we're about to backfill so reruns are clean. + print("\nResetting cursor for sources we'll touch...") + sources = list(counts.keys()) + cur.execute( + "UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 " + "WHERE source = ANY(%s)", + (sources,), + ) + print(f" reset {cur.rowcount} embeddings rows") + + # Apply per-source updates. For each source, set count and latest date. + print("Applying per-source backfill...") + updated_rows = 0 + for src, n in counts.items(): + cur.execute( + "UPDATE embeddings " + "SET consolidation_count = %s, last_consolidated_at = %s " + "WHERE source = %s", + (n, latest[src], src), + ) + updated_rows += cur.rowcount + pg.commit() + pg.close() + print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.") + + +if __name__ == "__main__": + main()