From f185ed60cb7e144cbd7abc713f6fec7578326aa1 Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Wed, 20 May 2026 18:11:07 +0000 Subject: [PATCH] =?UTF-8?q?dream.py:=20Stage=203+=20refactor=20=E2=80=94?= =?UTF-8?q?=20LLM-generated=20queries,=20MMR,=20mutable=20windows,=20conso?= =?UTF-8?q?lidation=20cursor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the rest of dreamer-design-spec.md's Stage 3 alongside the prescriptions from the external literature review: - Hardcoded seed query strings are gone. _llm_generate_queries() produces 4 mode-appropriate retrieval queries per call from the observation signal (Park et al. 2023 reflection pattern). NREM queries probe RECENT additions; Early REM bridges associative/emotional threads; Late REM forces cross- domain pairs; Lucid decomposes the task. Empirical first-run output: queries like "SUNY New Paltz Fall 2026 registration moratorium" instead of the fixed "research fabrication teaching practice recent work" — vector neighborhood now drifts with what the user has been actually doing. - TIME_WINDOWS_HOURS makes per-mode retrieval windows mutable (dreamer-multimodal-design.md §2's tech-debt item): NREM 72hr / Early REM 30d / Late REM 90d / Lucid no-window. NULL created_at rows are excluded from windowed modes — correct since they predate the cursor by definition. - NREM bias toward under-processed chunks via "ORDER BY consolidation_count ASC" before vector distance. Biologically motivated: sharp-wave-ripple replay is tagged/biased, not uniform. Chunks that haven't been replayed recently win the tiebreak. - MMR merge (Carbonell & Goldstein 1998) over the union of all queries' candidates. λ=0.5. Directly attacks the cluster-dominance failure mode where 8 dossier-narrative variants filled all 8 slots in 5 consecutive nights. - _bump_consolidation_cursor() called after NREM completes. Each source used gets consolidation_count += 1 and last_consolidated_at = NOW(). Tomorrow's signal sees these as more-processed, less under-processed. - dream_pipeline now runs observe_corpus + select_mode at the top per spec lines 27-34. If select_mode returns None — corpus unchanged + no new journal entry — pipeline exits with no dream rather than manufacturing novelty (spec line 67's "dreamer goes quiet"). Back-compat preserved: - retrieve()'s signature gains `signal` as optional kwarg; default behavior calls observe_corpus() inline so dream_single / dream_lucid keep working unchanged. - Graphiti substrate (E3 experiment) path untouched. - Manifest schema keeps the "query" field; value is now "[llm-generated from observation signal]" so historical manifest consumers don't break. --- scripts/dream.py | 406 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 342 insertions(+), 64 deletions(-) diff --git a/scripts/dream.py b/scripts/dream.py index dd9b26e..feed179 100644 --- a/scripts/dream.py +++ b/scripts/dream.py @@ -23,6 +23,7 @@ from datetime import datetime, timedelta from dotenv import load_dotenv import psycopg2 import hashlib +import numpy as np load_dotenv(Path.home() / "aaronai" / ".env", override=True) @@ -42,6 +43,26 @@ NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron") NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "") DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams" +# ─── Retrieval-window config (per dreamer-multimodal-design.md §2) ───────── +# Biological grounding: NREM replays recent traces (24-72 hrs); REM links +# across time on structural similarity, not temporal proximity. Synthesis +# pulls from salience across the full corpus (no window). Spec calls for +# these to be mutable rather than hardcoded — this is the mutable home. +TIME_WINDOWS_HOURS = { + "nrem": 72, # 24-72 hrs, take wider end + "early-rem": 24 * 30, # 30 days + "late-rem": 24 * 90, # 90 days + "lucid": None, # no window +} + +# Maximal Marginal Relevance: λ=1 → pure relevance, λ=0 → pure diversity. +# 0.5 is the standard balance; tune later if the dossier-cluster problem +# isn't sufficiently broken up. +MMR_LAMBDA = 0.5 + +# Fast/cheap model for query generation. Sonnet for synthesis (in synthesize_*). +LLM_QUERY_MODEL = os.getenv("DREAMER_QUERY_MODEL", "claude-haiku-4-5-20251001") + # Similarity ranges calibrated for all-MiniLM-L6-v2 MODE_RANGES = { "nrem": (0.48, 0.72), @@ -289,70 +310,293 @@ def _get_embedder(): from sentence_transformers import SentenceTransformer return SentenceTransformer("all-MiniLM-L6-v2") -def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None): - # E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search - # Default behavior: pgvector similarity search (unchanged) - # type_filter is experimental and applies to pgvector retrieval only — Graphiti - # facts are not embeddings rows and have no embeddings.type to filter on. - substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") - if substrate == "graphiti": - return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources) - embedder = _get_embedder() - low, high = MODE_RANGES[mode] +def _llm_generate_queries(mode, signal, task=None, n_queries=4): + """Park et al. 2023 reflection-style query generation. Feeds the LLM the + observation signal + a mode-specific framing; emits N retrieval queries + that probe different corners of the recent corpus instead of the same + hardcoded string every night. Sources cited in dream_observation.py. + + Falls back to recent_questions from the signal if the LLM call fails.""" + import anthropic if task: - query = task - elif mode == "late-rem": - delta = observe_corpus() - topics = delta.get("recent_topics", []) - query = topics[0] if topics else "practice place memory making" - elif mode == "early-rem": - query = "career decision personal change what matters next" + # Lucid mode: decompose the user's task into sub-queries + prompt = ( + f"Decompose this user task into {n_queries} distinct sub-questions, " + f"each suitable as a retrieval query against Aaron's personal corpus.\n\n" + f"TASK: {task}\n\n" + f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}' + ) else: - query = "research fabrication teaching practice recent work" + mode_framings = { + "nrem": ( + "NREM is replay-and-consolidation of RECENT traces. Generate queries " + "that probe what Aaron has been working on or capturing in the last " + "few days. Concrete entities — project names, course codes, named " + "subjects. The dreamer is re-touching specific recent material to " + "strengthen schema connections, not finding novel content." + ), + "early-rem": ( + "Early REM is associative bridging with emotional/personal register. " + "Generate queries that surface unresolved themes, career questions, " + "ongoing personal threads — material that connects intellectual and " + "emotional dimensions. Tone: thoughtful friend, not researcher." + ), + "late-rem": ( + "Late REM tests novel connections across DISTANT material. Generate " + "queries that pair concrete subjects from DIFFERENT domains of Aaron's " + "work (e.g., one from academic teaching, one from consulting, one from " + "creative practice) to probe for surprising structural similarity. " + "Cross-domain is required." + ), + } + framing = mode_framings.get(mode, mode_framings["nrem"]) + questions_snippet = "\n".join( + f" - {q[:200]}" for q in signal.get("recent_questions", [])[:8] + ) or " (no recent user questions)" + journal_snippet = ", ".join(signal.get("new_journal_entries", [])[:5]) or "(none)" + days_str = ( + f"{signal['days_since_dream']:.1f}" + if signal.get("days_since_dream") not in (None, float("inf")) + else "infinite (first dream)" + ) + prompt = ( + f"You generate retrieval queries for an Active Inference dreamer. The " + f"dreamer surfaces prediction errors — gaps between Aaron's model and " + f"reality — not summaries or generic associations.\n\n" + f"MODE: {mode}\n" + f"FRAMING: {framing}\n\n" + f"OBSERVATION SIGNAL:\n" + f"- Days since last dream: {days_str}\n" + f"- New chunks since last dream: {signal.get('new_chunks', 0)}\n" + f"- New journal entries: {journal_snippet}\n" + f"- Underprocessed chunks pool: {signal.get('underprocessed_count', 0):,}\n\n" + f"RECENT USER QUESTIONS (last 14 days, top 8):\n{questions_snippet}\n\n" + f"Generate {n_queries} retrieval queries. Requirements:\n" + f"- Use concrete entities, named projects, course codes, specific topics " + f"— NOT generic phrasing like 'research work practice'\n" + f"- Each query probes a DIFFERENT corner of recent activity\n" + f"- Match the {mode} framing\n" + f"- 5-15 words each\n\n" + f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}' + ) - embedding = embedder.encode([query]).tolist()[0] - chunks = [] - seen_sources = set() + try: + client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) + resp = client.messages.create( + model=LLM_QUERY_MODEL, + max_tokens=512, + messages=[{"role": "user", "content": prompt}], + ) + text = "".join(b.text for b in resp.content if hasattr(b, "text")).strip() + if text.startswith("```"): + text = text.split("```", 2)[1] + if text.startswith("json"): + text = text[4:] + text = text.strip() + data = json.loads(text) + queries = data.get("queries", []) + if isinstance(queries, list) and queries: + return [str(q).strip() for q in queries[:n_queries] if str(q).strip()] + except Exception as e: + print(f"[dream] LLM query generation failed ({e}); falling back to recent questions") + fallback = signal.get("recent_questions", [])[:n_queries] if signal else [] + return fallback or [task or "recent activity decisions thinking"] + + +def _mmr_select(candidate_embeddings, query_embedding, n, lambda_=MMR_LAMBDA): + """Maximal Marginal Relevance — greedy selection that balances relevance + against pairwise diversity. Carbonell & Goldstein 1998. Used to prevent + cluster lock-in (e.g., 8 dossier-narrative variants filling all 8 slots). + + candidate_embeddings: (N, D) numpy array + query_embedding: (D,) numpy array + Returns: list of indices into candidate_embeddings, len ≤ n.""" + if len(candidate_embeddings) == 0: + return [] + n = min(n, len(candidate_embeddings)) + cands = candidate_embeddings / (np.linalg.norm(candidate_embeddings, axis=1, keepdims=True) + 1e-9) + q = query_embedding / (np.linalg.norm(query_embedding) + 1e-9) + relevance = cands @ q + selected = [] + remaining = list(range(len(cands))) + while len(selected) < n and remaining: + if not selected: + best = max(remaining, key=lambda i: relevance[i]) + else: + sel = cands[selected] + scores = { + i: lambda_ * relevance[i] - (1 - lambda_) * float((cands[i] @ sel.T).max()) + for i in remaining + } + best = max(scores, key=scores.get) + selected.append(best) + remaining.remove(best) + return selected + + +def _bump_consolidation_cursor(chunks): + """Increment consolidation_count + set last_consolidated_at=NOW() for each + source represented in chunks. Called from dream_pipeline after NREM + completes. Per sharp-wave-ripples biology, NREM does the actual + consolidation; REM is associative use, so we only bump on NREM.""" + if not chunks: + return + sources = list({c["source"] for c in chunks if c.get("source")}) + if not sources: + return try: pg = get_pg() cur = pg.cursor() - excluded_sources = excluded_sources or set() - where, params = [], [] - if excluded_sources: - where.append("source NOT IN %s") - params.append(tuple(excluded_sources)) - if type_filter: - where.append("type = ANY(%s)") - params.append(list(type_filter)) - where_clause = ("WHERE " + " AND ".join(where)) if where else "" - cur.execute(f""" - SELECT document, source, type, 1 - (embedding <=> %s::vector) as similarity - FROM embeddings - {where_clause} - ORDER BY embedding <=> %s::vector - LIMIT %s - """, [embedding, *params, embedding, n_results * 3]) - - for doc, source, etype, similarity in cur.fetchall(): - if not (low <= similarity <= high): - continue - if source in seen_sources: - continue - chunks.append({ - "source": source or "unknown", - "content": doc, - "relevance": similarity, - "similarity": similarity, - "type": etype, - }) - seen_sources.add(source) - if len(chunks) >= n_results: - break + cur.execute( + "UPDATE embeddings " + "SET consolidation_count = consolidation_count + 1, " + " last_consolidated_at = NOW() " + "WHERE source = ANY(%s)", + (sources,), + ) + pg.commit() pg.close() except Exception as e: - print(f"pgvector retrieval error: {e}") + print(f"[dream] cursor bump failed (non-fatal): {e}") + + +def retrieve(mode, task=None, n_results=8, excluded_sources=None, + type_filter=None, signal=None): + """Refactored retrieval — see dreamer-design-spec.md Stage 3 + the + external-literature prescription in birdai-dreamer-exclusion-finding-2026-05-02.md. + + Changes from the prior hardcoded-query version: + - Queries are LLM-generated from the observation signal (Park et al. + reflection pattern) instead of fixed strings. Solves the "same 8 sources + every night" failure where fixed seeds locked into one neighborhood. + - Per-mode time windows (24-72hr NREM / 30d Early REM / 90d Late REM) + filter candidates before vector search. Spec calls for these to be + mutable; they live in TIME_WINDOWS_HOURS. + - NREM biases toward under-processed chunks (low consolidation_count). + Biologically motivated: sharp-wave ripples tag what to replay, not + uniform sampling. + - Multiple queries (4 by default) → over-fetch → MMR merge for + within-night diversity. Prevents cluster domination. + + signal is the observation-signal dict from dream_observation.observe_corpus(). + If None, observe_corpus is called inline (back-compat for ad-hoc invocation). + """ + # E3 substrate experiment unchanged + substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") + if substrate == "graphiti": + return retrieve_graphiti(mode, task=task, n_results=n_results, + excluded_sources=excluded_sources) + + if signal is None: + from dream_observation import observe_corpus as _obs + signal = _obs() + + queries = _llm_generate_queries(mode, signal, task=task, n_queries=4) + if not queries: + print(f"[dream:{mode}] no queries generated; bailing") + return [] + print(f"[dream:{mode}] generated queries: {queries}") + + embedder = _get_embedder() + excluded_sources = excluded_sources or set() + window_hours = TIME_WINDOWS_HOURS.get(mode) + per_query_n = 12 # over-fetch for MMR + + candidates = [] + seen_ids = set() + try: + pg = get_pg() + cur = pg.cursor() + for q in queries: + q_emb = embedder.encode([q]).tolist()[0] + where, params = [], [] + if excluded_sources: + where.append("source NOT IN %s") + params.append(tuple(excluded_sources)) + if type_filter: + where.append("type = ANY(%s)") + params.append(list(type_filter)) + if window_hours is not None: + # created_at is TEXT (legacy); cast it. NULL created_at fails + # the comparison so legacy rows are excluded from windowed + # modes — correct: NULL means "indexed before cursor existed," + # which by definition is older than any window. + where.append( + f"(created_at IS NOT NULL AND " + f"created_at::timestamptz > NOW() - INTERVAL '{int(window_hours)} hours')" + ) + where_clause = ("WHERE " + " AND ".join(where)) if where else "" + # NREM bias: order by consolidation_count ASC first (under-processed + # chunks win the tiebreak before vector distance). Other modes: + # vector distance only. + order_clause = ( + "ORDER BY consolidation_count ASC, embedding <=> %s::vector" + if mode == "nrem" + else "ORDER BY embedding <=> %s::vector" + ) + cur.execute(f""" + SELECT id, document, source, type, embedding, + 1 - (embedding <=> %s::vector) as similarity + FROM embeddings + {where_clause} + {order_clause} + LIMIT %s + """, [q_emb, *params, q_emb, per_query_n]) + for row in cur.fetchall(): + if row[0] in seen_ids: + continue + seen_ids.add(row[0]) + emb = row[4] + # pgvector returns embeddings as string "[...]" by default + if isinstance(emb, str): + emb = np.array([float(x) for x in emb.strip("[]").split(",")]) + else: + emb = np.array(emb) + candidates.append({ + "id": row[0], + "content": row[1], + "source": row[2] or "unknown", + "type": row[3], + "embedding": emb, + "similarity": float(row[5]), + }) + pg.close() + except Exception as e: + import traceback + print(f"[dream:{mode}] retrieval SQL error: {e}") + traceback.print_exc() + return [] + + if not candidates: + print(f"[dream:{mode}] zero candidates after filters") + return [] + + # MMR over the union, using the first query as pivot for the relevance term. + # Averaging query embeddings would be theoretically cleaner but adds + # complexity for marginal benefit at this scale. + pivot_emb = np.array(embedder.encode([queries[0]]).tolist()[0]) + cand_embs = np.array([c["embedding"] for c in candidates]) + selected_idx = _mmr_select(cand_embs, pivot_emb, n=n_results * 2) + + # Post-MMR source-level dedup (multi-chunk same source collapses to one). + chunks = [] + seen_sources = set() + for i in selected_idx: + c = candidates[i] + if c["source"] in seen_sources: + continue + seen_sources.add(c["source"]) + chunks.append({ + "source": c["source"], + "content": c["content"], + "relevance": c["similarity"], + "similarity": c["similarity"], + "type": c["type"], + }) + if len(chunks) >= n_results: + break return chunks @@ -496,6 +740,12 @@ def dream_pipeline(type_filter=None): """ Full nightly pipeline — interdependent stages. NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis. + + Per dreamer-design-spec.md, this now runs Stage 1 (observe) and Stage 2 + (select) first. If select_mode returns None — corpus unchanged and no new + journal entry — the dreamer goes quiet rather than manufacturing novelty. + Otherwise NREM/Early-REM/Late-REM run with LLM-generated queries seeded + from the observation signal. """ print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}") @@ -503,21 +753,47 @@ def dream_pipeline(type_filter=None): state.pop("retrieved_sources", None) # legacy key; session-scoped novelty now session_retrieved = set() - delta = observe_corpus() - print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream") - print("Novelty: session-scoped (no across-night exclusion)") + # ── Stage 1 + 2: Observe + Select ────────────────────────────────────── + from dream_observation import observe_corpus as _obs, select_mode as _select + signal = _obs() + print( + f"Signal: new_chunks={signal['new_chunks']}, " + f"new_journal={len(signal['new_journal_entries'])}, " + f"days_since={signal['days_since_dream']:.1f}, " + f"underprocessed={signal['underprocessed_count']:,}" + ) + selected = _select(signal) + if selected is None: + print("[select_mode] None — nothing worth dreaming about tonight (going quiet)") + # Update last-dream-attempted-at but not last_dream — caller can distinguish + # an actual dream from a skipped night by looking at last_dream_file or + # checking the manifest dir. + state["last_select_quiet_at"] = datetime.now().isoformat() + save_dreamer_state(state) + return None + print(f"[select_mode] → {selected}") - # ── Stage 1: NREM ────────────────────────────────────────────────────── + # The pipeline always runs all three modes for the manifest's continuity. + # select_mode's choice signals the *primary* focus; the others still run + # but draw from their own mode-appropriate windows. + primary_mode = selected + + # ── Stage 3: NREM ────────────────────────────────────────────────────── print("\n[NREM] Retrieving...") # NREM is replay-and-consolidation — does not exclude prior traces. # Late REM and Early REM exclude prior content for novelty; NREM does not. - nrem_chunks = retrieve("nrem", excluded_sources=None, type_filter=type_filter) + nrem_chunks = retrieve("nrem", excluded_sources=None, + type_filter=type_filter, signal=signal) session_retrieved.update(c["source"] for c in nrem_chunks) # Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55} if not nrem_chunks: print("[NREM] No suitable chunks — aborting pipeline") return None + # Cursor bump: NREM is the consolidation stage. Each appearance increments + # consolidation_count + updates last_consolidated_at, so the next dream's + # observation sees these sources as less under-processed. + _bump_consolidation_cursor(nrem_chunks) print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...") nrem_output = synthesize_nrem(nrem_chunks) @@ -528,7 +804,7 @@ def dream_pipeline(type_filter=None): "nrem": { "chunks_retrieved": len(nrem_chunks), "avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3), - "query": "research fabrication teaching practice recent work", + "query": "[llm-generated from observation signal]", "word_count": len(nrem_output.split()), "sources": nrem_sources, "distinct_folders": nrem_folders, @@ -546,7 +822,8 @@ def dream_pipeline(type_filter=None): print("\n[Early REM] Retrieving...") # Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved) # Sources that scored in Early REM band during NREM remain available - early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, type_filter=type_filter) + early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, + type_filter=type_filter, signal=signal) session_retrieved.update(c["source"] for c in early_chunks) if not early_chunks: print("[Early REM] No suitable chunks — skipping") @@ -560,7 +837,7 @@ def dream_pipeline(type_filter=None): stage_data["early_rem"] = { "chunks_retrieved": len(early_chunks), "avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3), - "query": "career decision personal change what matters next", + "query": "[llm-generated from observation signal]", "word_count": len(early_rem_output.split()), "sources": early_sources, "distinct_folders": early_folders, @@ -572,7 +849,8 @@ def dream_pipeline(type_filter=None): # ── Stage 3: Late REM — informed by NREM + Early REM ────────────────── print("\n[Late REM] Retrieving...") - late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, type_filter=type_filter) + late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, + type_filter=type_filter, signal=signal) session_retrieved.update(c["source"] for c in late_chunks) if not late_chunks: print("[Late REM] No suitable chunks — skipping") @@ -591,7 +869,7 @@ def dream_pipeline(type_filter=None): stage_data["late_rem"] = { "chunks_retrieved": len(late_chunks), "avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3), - "query": "practice place memory making", + "query": "[llm-generated from observation signal]", "word_count": len(late_rem_output.split()), "sources": late_sources, "distinct_folders": list(set(late_folders)),