""" Dreamer Stages 1 + 2 — Observe and Select. Implements `dreamer-design-spec.md`'s Stage 1 (observe_corpus) and Stage 2 (select_mode). These have been latent in dream.py — observe_corpus existed in skeletal form but its output was largely unused; select_mode did not exist at all. The dreamer always ran all stages with hardcoded queries. Per spec (lines 27–34 of dreamer-design-spec.md): delta = observe_corpus() selected_mode = select_mode(delta, task, project) if selected_mode is None: return # nothing worth dreaming The "returns None — dreamer goes quiet rather than manufacturing novelty" semantics (spec line 67) is the canonical answer to the repetition problem documented in birdai-dreamer-exclusion-finding-2026-05-02.md. Grounded in: - Active Inference (Friston 2010, 2017) — observe error, choose action that minimizes free energy. The dreamer is a prediction-error machine; observe what's diverged from the model, dream about that. - Sleep stages (Stickgold 2005; Walker 2017; Diekelberg & Born 2010) — NREM for replay of new traces, REM for associative cross-cluster integration. - Sharp-wave ripples (Buzsáki, Wilson) — biology tags WHAT to replay (under-processed chunks); not uniform. Implemented via the consolidation cursor on the embeddings table. """ import json import os import sqlite3 from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv import psycopg2 load_dotenv(Path.home() / "aaronai" / ".env", override=True) # ─── Paths ────────────────────────────────────────────────────────────────── PG_DSN = os.getenv("PG_DSN") CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json") JOURNAL_DAILY = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily" # ─── Thresholds ───────────────────────────────────────────────────────────── # Per spec, these become settings-panel controls eventually. For now they're # constants here; moving them to a config module is task #48. NEW_CHUNK_THRESHOLD = 5 # below this, NREM not warranted on novelty alone STALENESS_TRIGGER_DAYS = 3 # corpus quiet ≥3 days → Late REM ("shake things loose") QUESTION_LOOKBACK_DAYS = 14 # spec line 61: "the last 14 days" UNDERPROCESSED_PERCENTILE = 0.25 # bottom quartile of consolidation_count # ─── Helpers ──────────────────────────────────────────────────────────────── def _get_pg(): return psycopg2.connect(PG_DSN) def _load_json(path, default): try: return json.loads(Path(path).read_text()) except Exception: return default def _recent_user_questions(days=QUESTION_LOOKBACK_DAYS, limit=20): """Pull recent user-turn content from conversations.db. The spec calls these 'live questions' — what Aaron has been asking about. They become seed material for the REM modes.""" try: conn = sqlite3.connect(CONVERSATIONS_DB) cutoff = (datetime.now() - timedelta(days=days)).isoformat() cur = conn.cursor() cur.execute( """ SELECT m.content FROM messages m JOIN conversations c ON m.conversation_id = c.id WHERE m.role = 'user' AND c.updated_at > ? ORDER BY m.timestamp DESC LIMIT ? """, (cutoff, limit), ) rows = cur.fetchall() conn.close() return [r[0][:280] for r in rows] except Exception: return [] def _new_journal_entries(since_ts): """Files in Journal/Daily/ created or modified since the last dream. Journal entries with emotional/personal register route to Early REM per the spec (line 71).""" journal_path = Path(JOURNAL_DAILY) if not journal_path.exists(): return [] new = [] for p in journal_path.rglob("*.md"): try: if p.stat().st_mtime > since_ts: new.append(str(p.relative_to(journal_path))) except OSError: continue return new def _new_chunks_count(since_ts): """Files in the watcher state with mtime > last_dream. The spec calls this 'what changed' (line 58). Used as the NREM novelty signal.""" state = _load_json(WATCHER_STATE, {}) count = 0 for _path, mtime in state.items(): try: if float(mtime) > since_ts: count += 1 except (ValueError, TypeError): continue return count def _underprocessed_chunk_count(): """Chunks below the underprocessed percentile by consolidation_count. Biologically motivated: sharp-wave ripples bias replay toward novel / under-encoded experience, not uniform sampling. We give NREM a pool of 'least-replayed' chunks to draw from in Stage 3.""" try: pg = _get_pg() cur = pg.cursor() cur.execute( """ WITH t AS ( SELECT percentile_cont(%s) WITHIN GROUP (ORDER BY consolidation_count) AS threshold FROM embeddings ) SELECT COUNT(*) FROM embeddings, t WHERE consolidation_count <= t.threshold """, (UNDERPROCESSED_PERCENTILE,), ) result = cur.fetchone()[0] pg.close() return int(result or 0) except Exception: return 0 # ─── Stage 1: observe_corpus ──────────────────────────────────────────────── def observe_corpus(): """Build the signal vector consumed by select_mode and (downstream) by retrieve. Concrete observations only — no interpretation. Each key is a direct measurement from the corpus, watcher, journal, or conversation log. Returns a dict with: now_ts -- current Unix timestamp last_dream_ts -- last completed dream timestamp (0 if never) days_since_dream -- float; inf if never dreamed new_chunks -- count of files newer than last_dream new_journal_entries -- list of Journal/Daily/*.md filenames since last_dream recent_questions -- user-turn content from last 14 days underprocessed_count -- chunks in the bottom 25% by consolidation_count """ state = _load_json(DREAMER_STATE, {}) last_dream_ts = float(state.get("last_dream_timestamp", 0) or 0) now_ts = datetime.now().timestamp() return { "now_ts": now_ts, "last_dream_ts": last_dream_ts, "days_since_dream": (now_ts - last_dream_ts) / 86400 if last_dream_ts else float("inf"), "new_chunks": _new_chunks_count(last_dream_ts), "new_journal_entries": _new_journal_entries(last_dream_ts), "recent_questions": _recent_user_questions(), "underprocessed_count": _underprocessed_chunk_count(), } # ─── Stage 2: select_mode ─────────────────────────────────────────────────── def select_mode(signal, task=None, explicit_mode=None): """Return one of {'nrem', 'early-rem', 'late-rem', 'lucid'}. Never None. The dreamer fires every scheduled night. The earlier "go quiet on null delta" rule was a synthesis-doc invention that didn't match the actual desired UX — the original dreamer always dreamed, even if it repeated itself. The cure for repetition lives in the retrieve layer (LLM-generated queries from the observation signal, MMR diversity, cursor bias toward under-processed chunks), not in skipping nights. Routing logic: - explicit_mode argument wins - task supplied → 'lucid' (question-anchored) - days_since_dream ≥ STALENESS_TRIGGER_DAYS → 'late-rem' (shake loose via cross-domain pairs when nothing's been added in a while) - new journal entry → 'early-rem' (emotional/personal register) - default → 'nrem' (replay-and-consolidation; always has something to do because the corpus always has under-processed chunks) """ if explicit_mode: return explicit_mode if task: return "lucid" days_since = signal["days_since_dream"] new_journal = signal["new_journal_entries"] if days_since >= STALENESS_TRIGGER_DAYS: return "late-rem" if new_journal: return "early-rem" return "nrem" # ─── CLI for manual inspection ────────────────────────────────────────────── if __name__ == "__main__": signal = observe_corpus() short = {k: v for k, v in signal.items() if k != "recent_questions"} print("Signal (excluding recent_questions):") print(json.dumps(short, indent=2, default=str)) print(f"\nRecent user questions ({len(signal['recent_questions'])}):") for q in signal["recent_questions"][:5]: print(f" - {q[:140]}") mode = select_mode(signal) print(f"\nselect_mode() → {mode!r}")