"""One-off: backfill last_consolidated_at + consolidation_count on embeddings from the dream-manifest-*.json files already in Journal/Dreams/. Why this exists: the consolidation cursor columns added by the dreamer redesign migration default to NULL / 0. Without history, the underprocessed-count signal in dream_observation.observe_corpus() reports "every chunk is underprocessed" (degenerate percentile), and NREM has no basis to bias replay toward least-recently-consolidated chunks. We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each listing the sources retrieved per stage. For each (manifest, source) pair this script: - finds matching embeddings rows by source (basename match) - increments consolidation_count by 1 - updates last_consolidated_at to the manifest date (UTC midnight) Idempotent: re-running will not double-count because we drop existing cursor values to NULL/0 before backfilling. Pass --dry-run to print what would change without writing. """ import json import os import sys from datetime import datetime, timezone from pathlib import Path from dotenv import load_dotenv import psycopg2 load_dotenv(Path.home() / "aaronai" / ".env", override=True) PG_DSN = os.getenv("PG_DSN") DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams") DRY_RUN = "--dry-run" in sys.argv def get_pg(): return psycopg2.connect(PG_DSN) def collect_manifest_records(): """Return a list of (source_basename, manifest_date_utc) tuples from all dream-manifest-*.json files. One pair per (manifest, source) appearance.""" pairs = [] if not DREAMS_DIR.exists(): return pairs for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")): try: m = json.loads(path.read_text()) except Exception as e: print(f" skip {path.name}: {e}") continue date_str = m.get("date") if not date_str: continue try: dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc) except ValueError: continue stages = m.get("stages") or {} for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"): stage = stages.get(stage_name) or {} for src in (stage.get("sources") or []): if src: pairs.append((src, dt)) return pairs def main(): print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}") print(f"Scanning manifests in {DREAMS_DIR}") pairs = collect_manifest_records() print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests") if not pairs: print("Nothing to backfill.") return # Aggregate per source: count + latest date from collections import defaultdict counts = defaultdict(int) latest = {} for src, dt in pairs: counts[src] += 1 if src not in latest or dt > latest[src]: latest[src] = dt print(f"Unique sources to update: {len(counts)}") # Sample what we'd write print("Sample (top 5 by appearance count):") for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]: print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}") if DRY_RUN: print("\nDry-run only. Re-run without --dry-run to apply.") return pg = get_pg() cur = pg.cursor() # Reset cursor for any sources we're about to backfill so reruns are clean. print("\nResetting cursor for sources we'll touch...") sources = list(counts.keys()) cur.execute( "UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 " "WHERE source = ANY(%s)", (sources,), ) print(f" reset {cur.rowcount} embeddings rows") # Apply per-source updates. For each source, set count and latest date. print("Applying per-source backfill...") updated_rows = 0 for src, n in counts.items(): cur.execute( "UPDATE embeddings " "SET consolidation_count = %s, last_consolidated_at = %s " "WHERE source = %s", (n, latest[src], src), ) updated_rows += cur.rowcount pg.commit() pg.close() print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.") if __name__ == "__main__": main()