backfill_consolidation_cursor.py: populate cursor from historical dream manifests
One-off script. Walks Journal/Dreams/dream-manifest-*.json and increments consolidation_count + sets last_consolidated_at for every (manifest, source) pair. Idempotent — resets the cursor for any touched sources before backfilling, so reruns don't double-count. First run: 7547 embeddings rows updated across 105 unique sources, 416 (source, manifest_date) pairs across all manifests. Distribution now: 422 chunks at count=18 (the dominant dossier-narrative cluster that fills every NREM in the last 18 days), long tail down to count=1, 12,011 still at 0. This makes dream_observation.underprocessed_count meaningful — before, all counts were 0 so the bottom-quartile percentile was 0 and the signal was degenerate. After, the signal correctly identifies the 12k chunks that have never been replayed.
This commit is contained in:
@@ -0,0 +1,128 @@
|
||||
"""One-off: backfill last_consolidated_at + consolidation_count on embeddings
|
||||
from the dream-manifest-*.json files already in Journal/Dreams/.
|
||||
|
||||
Why this exists: the consolidation cursor columns added by the dreamer
|
||||
redesign migration default to NULL / 0. Without history, the
|
||||
underprocessed-count signal in dream_observation.observe_corpus() reports
|
||||
"every chunk is underprocessed" (degenerate percentile), and NREM has no
|
||||
basis to bias replay toward least-recently-consolidated chunks.
|
||||
|
||||
We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each
|
||||
listing the sources retrieved per stage. For each (manifest, source) pair
|
||||
this script:
|
||||
- finds matching embeddings rows by source (basename match)
|
||||
- increments consolidation_count by 1
|
||||
- updates last_consolidated_at to the manifest date (UTC midnight)
|
||||
|
||||
Idempotent: re-running will not double-count because we drop existing
|
||||
cursor values to NULL/0 before backfilling. Pass --dry-run to print what
|
||||
would change without writing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import psycopg2
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams")
|
||||
DRY_RUN = "--dry-run" in sys.argv
|
||||
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def collect_manifest_records():
|
||||
"""Return a list of (source_basename, manifest_date_utc) tuples from all
|
||||
dream-manifest-*.json files. One pair per (manifest, source) appearance."""
|
||||
pairs = []
|
||||
if not DREAMS_DIR.exists():
|
||||
return pairs
|
||||
for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")):
|
||||
try:
|
||||
m = json.loads(path.read_text())
|
||||
except Exception as e:
|
||||
print(f" skip {path.name}: {e}")
|
||||
continue
|
||||
date_str = m.get("date")
|
||||
if not date_str:
|
||||
continue
|
||||
try:
|
||||
dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
stages = m.get("stages") or {}
|
||||
for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"):
|
||||
stage = stages.get(stage_name) or {}
|
||||
for src in (stage.get("sources") or []):
|
||||
if src:
|
||||
pairs.append((src, dt))
|
||||
return pairs
|
||||
|
||||
|
||||
def main():
|
||||
print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}")
|
||||
print(f"Scanning manifests in {DREAMS_DIR}")
|
||||
pairs = collect_manifest_records()
|
||||
print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests")
|
||||
if not pairs:
|
||||
print("Nothing to backfill.")
|
||||
return
|
||||
|
||||
# Aggregate per source: count + latest date
|
||||
from collections import defaultdict
|
||||
counts = defaultdict(int)
|
||||
latest = {}
|
||||
for src, dt in pairs:
|
||||
counts[src] += 1
|
||||
if src not in latest or dt > latest[src]:
|
||||
latest[src] = dt
|
||||
print(f"Unique sources to update: {len(counts)}")
|
||||
|
||||
# Sample what we'd write
|
||||
print("Sample (top 5 by appearance count):")
|
||||
for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]:
|
||||
print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}")
|
||||
|
||||
if DRY_RUN:
|
||||
print("\nDry-run only. Re-run without --dry-run to apply.")
|
||||
return
|
||||
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
|
||||
# Reset cursor for any sources we're about to backfill so reruns are clean.
|
||||
print("\nResetting cursor for sources we'll touch...")
|
||||
sources = list(counts.keys())
|
||||
cur.execute(
|
||||
"UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 "
|
||||
"WHERE source = ANY(%s)",
|
||||
(sources,),
|
||||
)
|
||||
print(f" reset {cur.rowcount} embeddings rows")
|
||||
|
||||
# Apply per-source updates. For each source, set count and latest date.
|
||||
print("Applying per-source backfill...")
|
||||
updated_rows = 0
|
||||
for src, n in counts.items():
|
||||
cur.execute(
|
||||
"UPDATE embeddings "
|
||||
"SET consolidation_count = %s, last_consolidated_at = %s "
|
||||
"WHERE source = %s",
|
||||
(n, latest[src], src),
|
||||
)
|
||||
updated_rows += cur.rowcount
|
||||
pg.commit()
|
||||
pg.close()
|
||||
print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user