Files
aaronAI/scripts/backfill_consolidation_cursor.py
aaron a4735053c2 backfill_consolidation_cursor.py: populate cursor from historical dream manifests
One-off script. Walks Journal/Dreams/dream-manifest-*.json and increments
consolidation_count + sets last_consolidated_at for every (manifest, source)
pair. Idempotent — resets the cursor for any touched sources before
backfilling, so reruns don't double-count.

First run: 7547 embeddings rows updated across 105 unique sources, 416
(source, manifest_date) pairs across all manifests. Distribution now: 422
chunks at count=18 (the dominant dossier-narrative cluster that fills every
NREM in the last 18 days), long tail down to count=1, 12,011 still at 0.

This makes dream_observation.underprocessed_count meaningful — before, all
counts were 0 so the bottom-quartile percentile was 0 and the signal was
degenerate. After, the signal correctly identifies the 12k chunks that have
never been replayed.
2026-05-20 18:04:43 +00:00

129 lines
4.3 KiB
Python

"""One-off: backfill last_consolidated_at + consolidation_count on embeddings
from the dream-manifest-*.json files already in Journal/Dreams/.
Why this exists: the consolidation cursor columns added by the dreamer
redesign migration default to NULL / 0. Without history, the
underprocessed-count signal in dream_observation.observe_corpus() reports
"every chunk is underprocessed" (degenerate percentile), and NREM has no
basis to bias replay toward least-recently-consolidated chunks.
We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each
listing the sources retrieved per stage. For each (manifest, source) pair
this script:
- finds matching embeddings rows by source (basename match)
- increments consolidation_count by 1
- updates last_consolidated_at to the manifest date (UTC midnight)
Idempotent: re-running will not double-count because we drop existing
cursor values to NULL/0 before backfilling. Pass --dry-run to print what
would change without writing.
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams")
DRY_RUN = "--dry-run" in sys.argv
def get_pg():
return psycopg2.connect(PG_DSN)
def collect_manifest_records():
"""Return a list of (source_basename, manifest_date_utc) tuples from all
dream-manifest-*.json files. One pair per (manifest, source) appearance."""
pairs = []
if not DREAMS_DIR.exists():
return pairs
for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")):
try:
m = json.loads(path.read_text())
except Exception as e:
print(f" skip {path.name}: {e}")
continue
date_str = m.get("date")
if not date_str:
continue
try:
dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
except ValueError:
continue
stages = m.get("stages") or {}
for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"):
stage = stages.get(stage_name) or {}
for src in (stage.get("sources") or []):
if src:
pairs.append((src, dt))
return pairs
def main():
print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}")
print(f"Scanning manifests in {DREAMS_DIR}")
pairs = collect_manifest_records()
print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests")
if not pairs:
print("Nothing to backfill.")
return
# Aggregate per source: count + latest date
from collections import defaultdict
counts = defaultdict(int)
latest = {}
for src, dt in pairs:
counts[src] += 1
if src not in latest or dt > latest[src]:
latest[src] = dt
print(f"Unique sources to update: {len(counts)}")
# Sample what we'd write
print("Sample (top 5 by appearance count):")
for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]:
print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}")
if DRY_RUN:
print("\nDry-run only. Re-run without --dry-run to apply.")
return
pg = get_pg()
cur = pg.cursor()
# Reset cursor for any sources we're about to backfill so reruns are clean.
print("\nResetting cursor for sources we'll touch...")
sources = list(counts.keys())
cur.execute(
"UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 "
"WHERE source = ANY(%s)",
(sources,),
)
print(f" reset {cur.rowcount} embeddings rows")
# Apply per-source updates. For each source, set count and latest date.
print("Applying per-source backfill...")
updated_rows = 0
for src, n in counts.items():
cur.execute(
"UPDATE embeddings "
"SET consolidation_count = %s, last_consolidated_at = %s "
"WHERE source = %s",
(n, latest[src], src),
)
updated_rows += cur.rowcount
pg.commit()
pg.close()
print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.")
if __name__ == "__main__":
main()