dream_observation.py: Stage 1 + 2 of the design spec — observe and select

Implements `dreamer-design-spec.md` lines 27-74: observe_corpus() returns a
signal vector (new_chunks delta, new_journal_entries, recent_questions over
14-day window, days_since_dream, underprocessed_count derived from the new
consolidation cursor); select_mode() returns one of {nrem, early-rem,
late-rem, lucid} or None per the spec's rules. The None return is the spec's
canonical answer to the repetition problem (line 67) — "dreamer goes quiet
rather than manufacturing novelty."

Standalone for now. Not wired into dream_pipeline yet — that happens in the
retrieve() refactor (task #46). dream.py is unchanged in this commit.

Grounded sources cited in module docstring: Friston Active Inference, sleep
research (Stickgold/Walker/Diekelberg & Born), sharp-wave ripples (Buzsáki).
All three appear in BirdAI-Bibliography.md.

Migration prerequisite (already shipped in the prior commit): consolidation
cursor columns last_consolidated_at + consolidation_count added to
embeddings. Backfill from dream-manifest history is task #49.
This commit is contained in:
2026-05-20 17:57:38 +00:00
parent 151c756b89
commit f682d8c6a0
+242
View File
@@ -0,0 +1,242 @@
"""
Dreamer Stages 1 + 2 — Observe and Select.
Implements `dreamer-design-spec.md`'s Stage 1 (observe_corpus) and Stage 2
(select_mode). These have been latent in dream.py — observe_corpus existed
in skeletal form but its output was largely unused; select_mode did not
exist at all. The dreamer always ran all stages with hardcoded queries.
Per spec (lines 2734 of dreamer-design-spec.md):
delta = observe_corpus()
selected_mode = select_mode(delta, task, project)
if selected_mode is None:
return # nothing worth dreaming
The "returns None — dreamer goes quiet rather than manufacturing novelty"
semantics (spec line 67) is the canonical answer to the repetition problem
documented in birdai-dreamer-exclusion-finding-2026-05-02.md.
Grounded in:
- Active Inference (Friston 2010, 2017) — observe error, choose action that
minimizes free energy. The dreamer is a prediction-error machine; observe
what's diverged from the model, dream about that.
- Sleep stages (Stickgold 2005; Walker 2017; Diekelberg & Born 2010) — NREM
for replay of new traces, REM for associative cross-cluster integration.
- Sharp-wave ripples (Buzsáki, Wilson) — biology tags WHAT to replay
(under-processed chunks); not uniform. Implemented via the consolidation
cursor on the embeddings table.
"""
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
# ─── Paths ──────────────────────────────────────────────────────────────────
PG_DSN = os.getenv("PG_DSN")
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DAILY = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
# ─── Thresholds ─────────────────────────────────────────────────────────────
# Per spec, these become settings-panel controls eventually. For now they're
# constants here; moving them to a config module is task #48.
NEW_CHUNK_THRESHOLD = 5 # below this, NREM not warranted on novelty alone
STALENESS_TRIGGER_DAYS = 3 # corpus quiet ≥3 days → Late REM ("shake things loose")
QUESTION_LOOKBACK_DAYS = 14 # spec line 61: "the last 14 days"
UNDERPROCESSED_PERCENTILE = 0.25 # bottom quartile of consolidation_count
# ─── Helpers ────────────────────────────────────────────────────────────────
def _get_pg():
return psycopg2.connect(PG_DSN)
def _load_json(path, default):
try:
return json.loads(Path(path).read_text())
except Exception:
return default
def _recent_user_questions(days=QUESTION_LOOKBACK_DAYS, limit=20):
"""Pull recent user-turn content from conversations.db. The spec calls
these 'live questions' — what Aaron has been asking about. They become
seed material for the REM modes."""
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
cur = conn.cursor()
cur.execute(
"""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT ?
""",
(cutoff, limit),
)
rows = cur.fetchall()
conn.close()
return [r[0][:280] for r in rows]
except Exception:
return []
def _new_journal_entries(since_ts):
"""Files in Journal/Daily/ created or modified since the last dream.
Journal entries with emotional/personal register route to Early REM per
the spec (line 71)."""
journal_path = Path(JOURNAL_DAILY)
if not journal_path.exists():
return []
new = []
for p in journal_path.rglob("*.md"):
try:
if p.stat().st_mtime > since_ts:
new.append(str(p.relative_to(journal_path)))
except OSError:
continue
return new
def _new_chunks_count(since_ts):
"""Files in the watcher state with mtime > last_dream. The spec calls
this 'what changed' (line 58). Used as the NREM novelty signal."""
state = _load_json(WATCHER_STATE, {})
count = 0
for _path, mtime in state.items():
try:
if float(mtime) > since_ts:
count += 1
except (ValueError, TypeError):
continue
return count
def _underprocessed_chunk_count():
"""Chunks below the underprocessed percentile by consolidation_count.
Biologically motivated: sharp-wave ripples bias replay toward novel /
under-encoded experience, not uniform sampling. We give NREM a pool of
'least-replayed' chunks to draw from in Stage 3."""
try:
pg = _get_pg()
cur = pg.cursor()
cur.execute(
"""
WITH t AS (
SELECT percentile_cont(%s) WITHIN GROUP (ORDER BY consolidation_count)
AS threshold
FROM embeddings
)
SELECT COUNT(*) FROM embeddings, t
WHERE consolidation_count <= t.threshold
""",
(UNDERPROCESSED_PERCENTILE,),
)
result = cur.fetchone()[0]
pg.close()
return int(result or 0)
except Exception:
return 0
# ─── Stage 1: observe_corpus ────────────────────────────────────────────────
def observe_corpus():
"""Build the signal vector consumed by select_mode and (downstream) by
retrieve. Concrete observations only — no interpretation. Each key is
a direct measurement from the corpus, watcher, journal, or conversation
log.
Returns a dict with:
now_ts -- current Unix timestamp
last_dream_ts -- last completed dream timestamp (0 if never)
days_since_dream -- float; inf if never dreamed
new_chunks -- count of files newer than last_dream
new_journal_entries -- list of Journal/Daily/*.md filenames since last_dream
recent_questions -- user-turn content from last 14 days
underprocessed_count -- chunks in the bottom 25% by consolidation_count
"""
state = _load_json(DREAMER_STATE, {})
last_dream_ts = float(state.get("last_dream_timestamp", 0) or 0)
now_ts = datetime.now().timestamp()
return {
"now_ts": now_ts,
"last_dream_ts": last_dream_ts,
"days_since_dream": (now_ts - last_dream_ts) / 86400 if last_dream_ts else float("inf"),
"new_chunks": _new_chunks_count(last_dream_ts),
"new_journal_entries": _new_journal_entries(last_dream_ts),
"recent_questions": _recent_user_questions(),
"underprocessed_count": _underprocessed_chunk_count(),
}
# ─── Stage 2: select_mode ───────────────────────────────────────────────────
def select_mode(signal, task=None, explicit_mode=None):
"""Return one of {'nrem', 'early-rem', 'late-rem', 'lucid'} or None.
Selection logic from spec (lines 6974):
- Explicit mode argument → use that mode
- task supplied → 'lucid' (question-anchored)
- Active journal entry → Early REM (emotional/personal register)
- Corpus unchanged ≥ STALENESS_TRIGGER_DAYS → Late REM (shake loose)
- New chunks above threshold → NREM
- Otherwise → None ("dreamer goes quiet rather than manufacturing novelty")
The None return is load-bearing. Per spec line 67, it's the canonical
answer to the repetition problem: when nothing has changed, do not
manufacture a dream just to fill the schedule slot.
"""
if explicit_mode:
return explicit_mode
if task:
return "lucid"
new_chunks = signal["new_chunks"]
new_journal = signal["new_journal_entries"]
days_since = signal["days_since_dream"]
# Spec line 67: nothing changed → quiet
if new_chunks < NEW_CHUNK_THRESHOLD and not new_journal:
return None
# Spec line 71: journal entry → Early REM
# We treat "any new journal entry" as the trigger. Refining to detect
# emotional/personal register requires LLM sentiment tagging, which
# is left for a later iteration.
if new_journal:
return "early-rem"
# Spec line 72: corpus unchanged 3+ days → Late REM
if days_since >= STALENESS_TRIGGER_DAYS:
return "late-rem"
# Default: new chunks above threshold → NREM
return "nrem"
# ─── CLI for manual inspection ──────────────────────────────────────────────
if __name__ == "__main__":
signal = observe_corpus()
short = {k: v for k, v in signal.items() if k != "recent_questions"}
print("Signal (excluding recent_questions):")
print(json.dumps(short, indent=2, default=str))
print(f"\nRecent user questions ({len(signal['recent_questions'])}):")
for q in signal["recent_questions"][:5]:
print(f" - {q[:140]}")
mode = select_mode(signal)
print(f"\nselect_mode() → {mode!r}")