Files
aaronAI/scripts/dream.py
T
aaron f185ed60cb dream.py: Stage 3+ refactor — LLM-generated queries, MMR, mutable windows, consolidation cursor
Implements the rest of dreamer-design-spec.md's Stage 3 alongside the
prescriptions from the external literature review:

- Hardcoded seed query strings are gone. _llm_generate_queries() produces
  4 mode-appropriate retrieval queries per call from the observation signal
  (Park et al. 2023 reflection pattern). NREM queries probe RECENT additions;
  Early REM bridges associative/emotional threads; Late REM forces cross-
  domain pairs; Lucid decomposes the task. Empirical first-run output:
  queries like "SUNY New Paltz Fall 2026 registration moratorium" instead of
  the fixed "research fabrication teaching practice recent work" — vector
  neighborhood now drifts with what the user has been actually doing.

- TIME_WINDOWS_HOURS makes per-mode retrieval windows mutable
  (dreamer-multimodal-design.md §2's tech-debt item): NREM 72hr / Early REM
  30d / Late REM 90d / Lucid no-window. NULL created_at rows are excluded
  from windowed modes — correct since they predate the cursor by definition.

- NREM bias toward under-processed chunks via "ORDER BY consolidation_count
  ASC" before vector distance. Biologically motivated: sharp-wave-ripple
  replay is tagged/biased, not uniform. Chunks that haven't been replayed
  recently win the tiebreak.

- MMR merge (Carbonell & Goldstein 1998) over the union of all queries'
  candidates. λ=0.5. Directly attacks the cluster-dominance failure mode
  where 8 dossier-narrative variants filled all 8 slots in 5 consecutive
  nights.

- _bump_consolidation_cursor() called after NREM completes. Each source
  used gets consolidation_count += 1 and last_consolidated_at = NOW().
  Tomorrow's signal sees these as more-processed, less under-processed.

- dream_pipeline now runs observe_corpus + select_mode at the top per spec
  lines 27-34. If select_mode returns None — corpus unchanged + no new
  journal entry — pipeline exits with no dream rather than manufacturing
  novelty (spec line 67's "dreamer goes quiet").

Back-compat preserved:
- retrieve()'s signature gains `signal` as optional kwarg; default behavior
  calls observe_corpus() inline so dream_single / dream_lucid keep working
  unchanged.
- Graphiti substrate (E3 experiment) path untouched.
- Manifest schema keeps the "query" field; value is now
  "[llm-generated from observation signal]" so historical manifest
  consumers don't break.
2026-05-20 18:11:07 +00:00

1021 lines
43 KiB
Python

"""
Aaron AI Dreamer — Active Inference Engine
Interdependent stage architecture grounded in sleep consolidation research.
Nightly pipeline: NREM → Early REM → Late REM → Synthesis
Each stage receives the previous stage's output as context.
Lucid mode is on-demand only (Dream Now from settings).
Research basis:
- Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches
- Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent
- REM operates on what NREM produced — stages are not discrete alternatives
"""
import os
import json
import sqlite3
import argparse
from functools import lru_cache
from collections import Counter
from pathlib import Path
from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
import numpy as np
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
def get_pg():
return psycopg2.connect(PG_DSN)
# ─── Paths ──────────────────────────────────────────────────────────────────
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# ─── Retrieval-window config (per dreamer-multimodal-design.md §2) ─────────
# Biological grounding: NREM replays recent traces (24-72 hrs); REM links
# across time on structural similarity, not temporal proximity. Synthesis
# pulls from salience across the full corpus (no window). Spec calls for
# these to be mutable rather than hardcoded — this is the mutable home.
TIME_WINDOWS_HOURS = {
"nrem": 72, # 24-72 hrs, take wider end
"early-rem": 24 * 30, # 30 days
"late-rem": 24 * 90, # 90 days
"lucid": None, # no window
}
# Maximal Marginal Relevance: λ=1 → pure relevance, λ=0 → pure diversity.
# 0.5 is the standard balance; tune later if the dossier-cluster problem
# isn't sufficiently broken up.
MMR_LAMBDA = 0.5
# Fast/cheap model for query generation. Sonnet for synthesis (in synthesize_*).
LLM_QUERY_MODEL = os.getenv("DREAMER_QUERY_MODEL", "claude-haiku-4-5-20251001")
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
"early-rem": (0.38, 0.55),
"late-rem": (0.22, 0.42),
"lucid": (0.32, 0.72),
}
DREAMER_VERSION = "1.1" # 1.0=original exclusion logic; 1.1=score-band exclusion
# ─── Prompt versioning ──────────────────────────────────────────────────────
# Bump the relevant constant manually when changing a prompt.
PROMPT_VERSION_NREM = "1.0"
PROMPT_VERSION_EREM = "1.1"
PROMPT_VERSION_LREM = "1.2"
PROMPT_VERSION_SYN = "1.0"
def prompt_signature():
return (f"nrem={PROMPT_VERSION_NREM}|erem={PROMPT_VERSION_EREM}"
f"|lrem={PROMPT_VERSION_LREM}|syn={PROMPT_VERSION_SYN}")
def prompt_hash(prompts: list[str]) -> str:
combined = "".join(prompts)
return hashlib.md5(combined.encode()).hexdigest()[:8]
# ─── Prompt templates ───────────────────────────────────────────────────────
# Module-level so prompt_hash() can hash actual prompt content. Any change to
# any template — even a single character — flips the manifest's prompt_hash.
# Templates use str.format() placeholders ({chunk_text}, {nrem_output}, ...);
# do not switch back to f-strings (the constant must be hashable independent
# of variable values). Literal { or } in template text would need to be
# doubled ({{, }}) — currently no template contains literal braces.
NREM_PROMPT_TEMPLATE = """You have read everything Aaron Nelson has written and published.
You are a careful colleague who noticed something this week.
Here is material from his corpus:
{chunk_text}
Write to Aaron directly. Identify one specific connection between
this material and something he wrote or worked on previously.
Stay close to the documents — cite them specifically by name.
Do not speculate beyond what the material supports. Do not use
headers or bullet points. Write one paragraph of 200-300 words
that ends with a single concrete question he could act on."""
EARLY_REM_PROMPT_TEMPLATE = """Something was noticed earlier tonight, moving through Aaron's recent work:
{nrem_output}
That observation is still with you. Now here is material from a different
time — pulled from further back, from different parts of his corpus:
{chunk_text}
You are not analyzing. You are recognizing.
Something in the earlier observation and something in this older material
are the same thing wearing different clothes. Find it. Don't explain why
they're connected — just let the connection speak. Write from inside the
recognition, not from above it.
The emotional register underneath the career logic is more interesting
than the career logic. The pattern that has been repeating longer than
he has been aware of it is more interesting than the current instance.
Write directly to Aaron. No citations, no references, no analysis.
First person, present tense. Let what you noticed arrive rather than
be delivered. 150-250 words. End with one thing that is true that
he probably already knows but hasn't said out loud yet."""
LATE_REM_PROMPT_TEMPLATE = """You have been moving through Aaron Nelson's corpus all night.
First you found this, in the careful light of early consolidation:
{nrem_output}
Then, in the more personal territory that followed:
{early_rem_output}
Now it is late. The boundaries between things have loosened.
Here is material pulled from opposite ends of his work:
{chunk_text}
Do not explain the connections between all of this.
Do not resolve them. Do not summarize what came before.
Something stranger is possible now — let the accumulated
material from the night find its own shape. Compressed,
associative, slightly off. Let the strangeness stand.
No headers. No bullet points. No hedging. No resolution.
No offer. End mid-thought if that is where the material ends.
150-250 words."""
SYNTHESIS_PROMPT_TEMPLATE = """You have spent the night moving through Aaron Nelson's corpus
in three passes, each building on the last.
The first pass — careful, close to the documents:
{nrem_output}
The second pass — more personal, following what the first opened:
{early_rem_output}
The third pass — associative, strange, letting things touch that
don't normally touch:
{late_rem_output}
Now synthesize. Not a summary — a synthesis. Find what runs through
all three that none of them said directly. The thing that only becomes
visible when you hold all three passes together.
Write it as a single unbroken piece. No headers, no bullet points,
no stage labels. 200-300 words. End with the one question that
matters most right now."""
LUCID_PROMPT_TEMPLATE = """Aaron has a question he is sitting with:
{task}
You have searched his entire corpus and found material that
speaks to this question from unexpected directions. Here is
what you found:
{chunk_text}
Do not summarize. Do not list. Pick the most interesting
tension between what the corpus contains and what he is
asking, and follow it through to its conclusion. Cite
specific documents by name. Be direct about what you think.
No headers, no bullet points. 250-400 words.
End with an offer to work on it together."""
LUCID_DEFAULT_TASK = "What should I be thinking about that I am not?"
def extract_folder(source_path):
"""Extract top-level Nextcloud folder from source path."""
parts = source_path.replace("\\", "/").split("/")
return parts[0] if parts else "unknown"
# ─── Stage 1: Observe ───────────────────────────────────────────────────────
def observe_corpus():
state = load_dreamer_state()
last_dream = state.get("last_dream_timestamp", 0)
new_chunk_count = 0
try:
watcher_state = json.loads(Path(WATCHER_STATE).read_text())
for path, mtime in watcher_state.items():
if float(mtime) > last_dream:
new_chunk_count += 1
except:
pass
days_since = (datetime.now().timestamp() - last_dream) / 86400
recent_topics = get_recent_conversation_topics()
return {
"new_chunks": new_chunk_count,
"days_since_dream": days_since,
"recent_topics": recent_topics,
"last_dream": last_dream,
}
def get_recent_conversation_topics(days=14):
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
c = conn.cursor()
c.execute("""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT 20
""", (cutoff,))
rows = c.fetchall()
conn.close()
return [r[0][:200] for r in rows]
except:
return []
# ─── Stage 2: Retrieve ──────────────────────────────────────────────────────
def retrieve_graphiti(mode, task=None, n_results=8, excluded_sources=None):
"""E3 experiment — Graphiti substrate retrieval.
Queries Graphiti /search endpoint instead of pgvector.
Returns chunks in same format as retrieve() for pipeline compatibility.
Note: content is Graphiti facts (synthesized relationships), not raw chunks.
Over-fetches by 3x to allow in-process filtering against excluded_sources,
matching the cross-stage exclusion mechanism the pgvector branch uses.
Without this filter, NREM/Early REM/Late REM would see overlapping content
and the score-band Early REM exclusion (v1.1) would not apply in Graphiti mode.
"""
import requests as req_lib
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
else:
query = "research fabrication teaching practice recent work"
excluded_sources = excluded_sources or set()
# Over-fetch so in-process exclusion still leaves enough results
fetch_limit = n_results * 3 if excluded_sources else n_results
try:
resp = req_lib.get(
"http://localhost:8001/search",
params={"query": query, "limit": fetch_limit, "group_id": "aaron"},
timeout=30,
)
resp.raise_for_status()
results = resp.json().get("results", [])
chunks = []
seen_sources = set()
for r in results:
fact = r.get("fact", "")
if not fact.strip():
continue
source = r.get("source", "graphiti")
if source in excluded_sources:
continue
if source in seen_sources:
continue
chunks.append({
"source": source,
"content": fact,
"relevance": r.get("score", 0.5),
"similarity": r.get("score", 0.5),
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
return chunks
except Exception as e:
print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return []
@lru_cache(maxsize=1)
def _get_embedder():
from sentence_transformers import SentenceTransformer
return SentenceTransformer("all-MiniLM-L6-v2")
def _llm_generate_queries(mode, signal, task=None, n_queries=4):
"""Park et al. 2023 reflection-style query generation. Feeds the LLM the
observation signal + a mode-specific framing; emits N retrieval queries
that probe different corners of the recent corpus instead of the same
hardcoded string every night. Sources cited in dream_observation.py.
Falls back to recent_questions from the signal if the LLM call fails."""
import anthropic
if task:
# Lucid mode: decompose the user's task into sub-queries
prompt = (
f"Decompose this user task into {n_queries} distinct sub-questions, "
f"each suitable as a retrieval query against Aaron's personal corpus.\n\n"
f"TASK: {task}\n\n"
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
)
else:
mode_framings = {
"nrem": (
"NREM is replay-and-consolidation of RECENT traces. Generate queries "
"that probe what Aaron has been working on or capturing in the last "
"few days. Concrete entities — project names, course codes, named "
"subjects. The dreamer is re-touching specific recent material to "
"strengthen schema connections, not finding novel content."
),
"early-rem": (
"Early REM is associative bridging with emotional/personal register. "
"Generate queries that surface unresolved themes, career questions, "
"ongoing personal threads — material that connects intellectual and "
"emotional dimensions. Tone: thoughtful friend, not researcher."
),
"late-rem": (
"Late REM tests novel connections across DISTANT material. Generate "
"queries that pair concrete subjects from DIFFERENT domains of Aaron's "
"work (e.g., one from academic teaching, one from consulting, one from "
"creative practice) to probe for surprising structural similarity. "
"Cross-domain is required."
),
}
framing = mode_framings.get(mode, mode_framings["nrem"])
questions_snippet = "\n".join(
f" - {q[:200]}" for q in signal.get("recent_questions", [])[:8]
) or " (no recent user questions)"
journal_snippet = ", ".join(signal.get("new_journal_entries", [])[:5]) or "(none)"
days_str = (
f"{signal['days_since_dream']:.1f}"
if signal.get("days_since_dream") not in (None, float("inf"))
else "infinite (first dream)"
)
prompt = (
f"You generate retrieval queries for an Active Inference dreamer. The "
f"dreamer surfaces prediction errors — gaps between Aaron's model and "
f"reality — not summaries or generic associations.\n\n"
f"MODE: {mode}\n"
f"FRAMING: {framing}\n\n"
f"OBSERVATION SIGNAL:\n"
f"- Days since last dream: {days_str}\n"
f"- New chunks since last dream: {signal.get('new_chunks', 0)}\n"
f"- New journal entries: {journal_snippet}\n"
f"- Underprocessed chunks pool: {signal.get('underprocessed_count', 0):,}\n\n"
f"RECENT USER QUESTIONS (last 14 days, top 8):\n{questions_snippet}\n\n"
f"Generate {n_queries} retrieval queries. Requirements:\n"
f"- Use concrete entities, named projects, course codes, specific topics "
f"— NOT generic phrasing like 'research work practice'\n"
f"- Each query probes a DIFFERENT corner of recent activity\n"
f"- Match the {mode} framing\n"
f"- 5-15 words each\n\n"
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
)
try:
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
resp = client.messages.create(
model=LLM_QUERY_MODEL,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
text = "".join(b.text for b in resp.content if hasattr(b, "text")).strip()
if text.startswith("```"):
text = text.split("```", 2)[1]
if text.startswith("json"):
text = text[4:]
text = text.strip()
data = json.loads(text)
queries = data.get("queries", [])
if isinstance(queries, list) and queries:
return [str(q).strip() for q in queries[:n_queries] if str(q).strip()]
except Exception as e:
print(f"[dream] LLM query generation failed ({e}); falling back to recent questions")
fallback = signal.get("recent_questions", [])[:n_queries] if signal else []
return fallback or [task or "recent activity decisions thinking"]
def _mmr_select(candidate_embeddings, query_embedding, n, lambda_=MMR_LAMBDA):
"""Maximal Marginal Relevance — greedy selection that balances relevance
against pairwise diversity. Carbonell & Goldstein 1998. Used to prevent
cluster lock-in (e.g., 8 dossier-narrative variants filling all 8 slots).
candidate_embeddings: (N, D) numpy array
query_embedding: (D,) numpy array
Returns: list of indices into candidate_embeddings, len ≤ n."""
if len(candidate_embeddings) == 0:
return []
n = min(n, len(candidate_embeddings))
cands = candidate_embeddings / (np.linalg.norm(candidate_embeddings, axis=1, keepdims=True) + 1e-9)
q = query_embedding / (np.linalg.norm(query_embedding) + 1e-9)
relevance = cands @ q
selected = []
remaining = list(range(len(cands)))
while len(selected) < n and remaining:
if not selected:
best = max(remaining, key=lambda i: relevance[i])
else:
sel = cands[selected]
scores = {
i: lambda_ * relevance[i] - (1 - lambda_) * float((cands[i] @ sel.T).max())
for i in remaining
}
best = max(scores, key=scores.get)
selected.append(best)
remaining.remove(best)
return selected
def _bump_consolidation_cursor(chunks):
"""Increment consolidation_count + set last_consolidated_at=NOW() for each
source represented in chunks. Called from dream_pipeline after NREM
completes. Per sharp-wave-ripples biology, NREM does the actual
consolidation; REM is associative use, so we only bump on NREM."""
if not chunks:
return
sources = list({c["source"] for c in chunks if c.get("source")})
if not sources:
return
try:
pg = get_pg()
cur = pg.cursor()
cur.execute(
"UPDATE embeddings "
"SET consolidation_count = consolidation_count + 1, "
" last_consolidated_at = NOW() "
"WHERE source = ANY(%s)",
(sources,),
)
pg.commit()
pg.close()
except Exception as e:
print(f"[dream] cursor bump failed (non-fatal): {e}")
def retrieve(mode, task=None, n_results=8, excluded_sources=None,
type_filter=None, signal=None):
"""Refactored retrieval — see dreamer-design-spec.md Stage 3 + the
external-literature prescription in birdai-dreamer-exclusion-finding-2026-05-02.md.
Changes from the prior hardcoded-query version:
- Queries are LLM-generated from the observation signal (Park et al.
reflection pattern) instead of fixed strings. Solves the "same 8 sources
every night" failure where fixed seeds locked into one neighborhood.
- Per-mode time windows (24-72hr NREM / 30d Early REM / 90d Late REM)
filter candidates before vector search. Spec calls for these to be
mutable; they live in TIME_WINDOWS_HOURS.
- NREM biases toward under-processed chunks (low consolidation_count).
Biologically motivated: sharp-wave ripples tag what to replay, not
uniform sampling.
- Multiple queries (4 by default) → over-fetch → MMR merge for
within-night diversity. Prevents cluster domination.
signal is the observation-signal dict from dream_observation.observe_corpus().
If None, observe_corpus is called inline (back-compat for ad-hoc invocation).
"""
# E3 substrate experiment unchanged
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results,
excluded_sources=excluded_sources)
if signal is None:
from dream_observation import observe_corpus as _obs
signal = _obs()
queries = _llm_generate_queries(mode, signal, task=task, n_queries=4)
if not queries:
print(f"[dream:{mode}] no queries generated; bailing")
return []
print(f"[dream:{mode}] generated queries: {queries}")
embedder = _get_embedder()
excluded_sources = excluded_sources or set()
window_hours = TIME_WINDOWS_HOURS.get(mode)
per_query_n = 12 # over-fetch for MMR
candidates = []
seen_ids = set()
try:
pg = get_pg()
cur = pg.cursor()
for q in queries:
q_emb = embedder.encode([q]).tolist()[0]
where, params = [], []
if excluded_sources:
where.append("source NOT IN %s")
params.append(tuple(excluded_sources))
if type_filter:
where.append("type = ANY(%s)")
params.append(list(type_filter))
if window_hours is not None:
# created_at is TEXT (legacy); cast it. NULL created_at fails
# the comparison so legacy rows are excluded from windowed
# modes — correct: NULL means "indexed before cursor existed,"
# which by definition is older than any window.
where.append(
f"(created_at IS NOT NULL AND "
f"created_at::timestamptz > NOW() - INTERVAL '{int(window_hours)} hours')"
)
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
# NREM bias: order by consolidation_count ASC first (under-processed
# chunks win the tiebreak before vector distance). Other modes:
# vector distance only.
order_clause = (
"ORDER BY consolidation_count ASC, embedding <=> %s::vector"
if mode == "nrem"
else "ORDER BY embedding <=> %s::vector"
)
cur.execute(f"""
SELECT id, document, source, type, embedding,
1 - (embedding <=> %s::vector) as similarity
FROM embeddings
{where_clause}
{order_clause}
LIMIT %s
""", [q_emb, *params, q_emb, per_query_n])
for row in cur.fetchall():
if row[0] in seen_ids:
continue
seen_ids.add(row[0])
emb = row[4]
# pgvector returns embeddings as string "[...]" by default
if isinstance(emb, str):
emb = np.array([float(x) for x in emb.strip("[]").split(",")])
else:
emb = np.array(emb)
candidates.append({
"id": row[0],
"content": row[1],
"source": row[2] or "unknown",
"type": row[3],
"embedding": emb,
"similarity": float(row[5]),
})
pg.close()
except Exception as e:
import traceback
print(f"[dream:{mode}] retrieval SQL error: {e}")
traceback.print_exc()
return []
if not candidates:
print(f"[dream:{mode}] zero candidates after filters")
return []
# MMR over the union, using the first query as pivot for the relevance term.
# Averaging query embeddings would be theoretically cleaner but adds
# complexity for marginal benefit at this scale.
pivot_emb = np.array(embedder.encode([queries[0]]).tolist()[0])
cand_embs = np.array([c["embedding"] for c in candidates])
selected_idx = _mmr_select(cand_embs, pivot_emb, n=n_results * 2)
# Post-MMR source-level dedup (multi-chunk same source collapses to one).
chunks = []
seen_sources = set()
for i in selected_idx:
c = candidates[i]
if c["source"] in seen_sources:
continue
seen_sources.add(c["source"])
chunks.append({
"source": c["source"],
"content": c["content"],
"relevance": c["similarity"],
"similarity": c["similarity"],
"type": c["type"],
})
if len(chunks) >= n_results:
break
return chunks
# ─── Stage 3: Synthesize ────────────────────────────────────────────────────
def synthesize_nrem(chunks):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
return _call_claude(NREM_PROMPT_TEMPLATE.format(chunk_text=chunk_text))
def synthesize_early_rem(chunks, nrem_output):
# v1.1 — removed citation instruction, removed close-friend persona,
# shifted register from analysis to recognition.
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
return _call_claude(EARLY_REM_PROMPT_TEMPLATE.format(
nrem_output=nrem_output, chunk_text=chunk_text))
def synthesize_late_rem(chunks, nrem_output, early_rem_output):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
return _call_claude(LATE_REM_PROMPT_TEMPLATE.format(
nrem_output=nrem_output,
early_rem_output=early_rem_output,
chunk_text=chunk_text))
def synthesize_final(nrem_output, early_rem_output, late_rem_output):
return _call_claude(
SYNTHESIS_PROMPT_TEMPLATE.format(
nrem_output=nrem_output,
early_rem_output=early_rem_output,
late_rem_output=late_rem_output),
max_tokens=800)
def synthesize_lucid(chunks, task):
chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks])
resolved_task = task or LUCID_DEFAULT_TASK
return _call_claude(LUCID_PROMPT_TEMPLATE.format(
task=resolved_task, chunk_text=chunk_text))
def _call_claude(prompt, max_tokens=1000):
import anthropic
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
# ─── Stage 4: Deliver ───────────────────────────────────────────────────────
def deliver(dream_text, mode, task=None):
import requests
date_str = datetime.now().strftime("%Y-%m-%d")
filename = f"{date_str}-{mode}.md"
header = f"# Dream — {mode.upper()}{datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
header += f"*prompt_sig: {prompt_signature()}*\n\n"
if task:
header += f"*Task: {task}*\n\n"
header += "---\n\n"
content = header + dream_text
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10)
url = f"{DREAMS_WEBDAV}/{filename}"
counter = 1
while True:
check = requests.request("PROPFIND", url, auth=auth, timeout=10)
if check.status_code == 404:
break
filename = f"{date_str}-{mode}-{counter}.md"
url = f"{DREAMS_WEBDAV}/{filename}"
counter += 1
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Delivered: Journal/Dreams/{filename}")
return f"Journal/Dreams/{filename}"
def notify_sse(mode, filename):
try:
import requests
requests.post("http://localhost:8000/api/events/notify", json={
"type": "dream",
"mode": mode,
"filename": filename,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"),
}, timeout=3)
except Exception as e:
print(f"SSE notify failed (non-critical): {e}")
# ─── State ──────────────────────────────────────────────────────────────────
def load_dreamer_state():
p = Path(DREAMER_STATE)
if p.exists():
try:
return json.loads(p.read_text())
except:
pass
return {}
def save_dreamer_state(state):
Path(DREAMER_STATE).write_text(json.dumps(state, indent=2))
# ─── Orchestrators ───────────────────────────────────────────────────────────
def write_manifest(date_str, stage_data, corpus_data):
import requests
manifest = {
"date": date_str,
"prompt_sig": prompt_signature(),
"dreamer_version": DREAMER_VERSION,
"prompt_hash": prompt_hash([
NREM_PROMPT_TEMPLATE,
EARLY_REM_PROMPT_TEMPLATE,
LATE_REM_PROMPT_TEMPLATE,
SYNTHESIS_PROMPT_TEMPLATE,
]),
"stages": stage_data,
"corpus": corpus_data,
"rating": None,
"notes": "",
}
content = json.dumps(manifest, indent=2)
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try:
response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e:
print(f"Manifest write failed — manifest not persisted: {e}")
def dream_pipeline(type_filter=None):
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
Per dreamer-design-spec.md, this now runs Stage 1 (observe) and Stage 2
(select) first. If select_mode returns None — corpus unchanged and no new
journal entry — the dreamer goes quiet rather than manufacturing novelty.
Otherwise NREM/Early-REM/Late-REM run with LLM-generated queries seeded
from the observation signal.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
state = load_dreamer_state()
state.pop("retrieved_sources", None) # legacy key; session-scoped novelty now
session_retrieved = set()
# ── Stage 1 + 2: Observe + Select ──────────────────────────────────────
from dream_observation import observe_corpus as _obs, select_mode as _select
signal = _obs()
print(
f"Signal: new_chunks={signal['new_chunks']}, "
f"new_journal={len(signal['new_journal_entries'])}, "
f"days_since={signal['days_since_dream']:.1f}, "
f"underprocessed={signal['underprocessed_count']:,}"
)
selected = _select(signal)
if selected is None:
print("[select_mode] None — nothing worth dreaming about tonight (going quiet)")
# Update last-dream-attempted-at but not last_dream — caller can distinguish
# an actual dream from a skipped night by looking at last_dream_file or
# checking the manifest dir.
state["last_select_quiet_at"] = datetime.now().isoformat()
save_dreamer_state(state)
return None
print(f"[select_mode] → {selected}")
# The pipeline always runs all three modes for the manifest's continuity.
# select_mode's choice signals the *primary* focus; the others still run
# but draw from their own mode-appropriate windows.
primary_mode = selected
# ── Stage 3: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
# NREM is replay-and-consolidation — does not exclude prior traces.
# Late REM and Early REM exclude prior content for novelty; NREM does not.
nrem_chunks = retrieve("nrem", excluded_sources=None,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
# Cursor bump: NREM is the consolidation stage. Each appearance increments
# consolidation_count + updates last_consolidated_at, so the next dream's
# observation sees these sources as less under-processed.
_bump_consolidation_cursor(nrem_chunks)
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
nrem_file = deliver(nrem_output, "nrem")
nrem_sources = [c["source"] for c in nrem_chunks]
nrem_folders = list({extract_folder(s) for s in nrem_sources})
stage_data = {
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "[llm-generated from observation signal]",
"word_count": len(nrem_output.split()),
"sources": nrem_sources,
"distinct_folders": nrem_folders,
"folder_count": len(nrem_folders),
# Counter filters None: Graphiti chunks lack `type` (facts, not embeddings rows).
# Pgvector chunks always carry type post-Improvement-#2 backfill. If type
# ever appears as None here, the backfill or writer enforcement has regressed.
"type_distribution": dict(Counter(c.get("type") for c in nrem_chunks if c.get("type"))),
"status": "ok",
}
}
print(f"[NREM] Done.\n{nrem_output[:200]}...")
# ── Stage 2: Early REM — informed by NREM ──────────────────────────────
print("\n[Early REM] Retrieving...")
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
early_rem_output = nrem_output # fallback
else:
print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...")
early_rem_output = synthesize_early_rem(early_chunks, nrem_output)
deliver(early_rem_output, "early-rem")
early_sources = [c["source"] for c in early_chunks]
early_folders = list({extract_folder(s) for s in early_sources})
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "[llm-generated from observation signal]",
"word_count": len(early_rem_output.split()),
"sources": early_sources,
"distinct_folders": early_folders,
"folder_count": len(early_folders),
"type_distribution": dict(Counter(c.get("type") for c in early_chunks if c.get("type"))),
"status": "ok",
}
print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in late_chunks)
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
late_rem_output = early_rem_output # fallback
else:
print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...")
late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output)
deliver(late_rem_output, "late-rem")
late_sources = [c["source"] for c in late_chunks]
late_folders = [extract_folder(s) for s in late_sources]
cross_domain_pairs = sum(
1 for i in range(len(late_folders))
for j in range(i+1, len(late_folders))
if late_folders[i] != late_folders[j]
)
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "[llm-generated from observation signal]",
"word_count": len(late_rem_output.split()),
"sources": late_sources,
"distinct_folders": list(set(late_folders)),
"folder_count": len(set(late_folders)),
"cross_domain_pairs": cross_domain_pairs,
"type_distribution": dict(Counter(c.get("type") for c in late_chunks if c.get("type"))),
"status": "ok",
}
print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
# ── Stage 4: Synthesis — all three stages ─────────────────────────────
print("\n[Synthesis] Integrating all stages...")
synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output)
synthesis_file = deliver(synthesis_output, "synthesis")
stage_data["synthesis"] = {
"word_count": len(synthesis_output.split()),
"status": "ok",
}
print(f"\n{'='*60}")
print("SYNTHESIS:")
print(synthesis_output)
print(f"{'='*60}")
# Write manifest
all_session_sources = list(session_retrieved)
all_session_folders = list({extract_folder(s) for s in all_session_sources})
total_chunks = 0
pg = None
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT COUNT(*) FROM embeddings")
total_chunks = cur.fetchone()[0]
except Exception as e:
print(f"total_chunks query failed (non-critical): {e}")
finally:
if pg is not None:
pg.close()
corpus_data = {
"total_chunks": total_chunks,
"new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
"substrate": "pgvector",
"aggregate": {
"total_distinct_sources": len(all_session_sources),
"total_distinct_folders": len(all_session_folders),
"folders_touched": all_session_folders,
}
}
write_manifest(datetime.now().strftime("%Y-%m-%d"), stage_data, corpus_data)
# Update state and notify (reuse state from start of pipeline; legacy key already popped)
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "pipeline"
state["last_dream_file"] = synthesis_file
save_dreamer_state(state)
notify_sse("synthesis", synthesis_file.split("/")[-1])
print(f"\nPipeline complete. Synthesis: {synthesis_file}")
return synthesis_file
def dream_lucid(task, type_filter=None):
"""On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task, type_filter=type_filter)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
output = synthesize_lucid(chunks, task)
filepath = deliver(output, "lucid", task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = "lucid"
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse("lucid", filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
def dream_single(mode, task=None, type_filter=None):
"""
Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages).
"""
print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task, type_filter=type_filter)
if not chunks:
print("No suitable chunks — aborting")
return None
print(f"Retrieved {len(chunks)} chunks. Synthesizing...")
if mode == "nrem":
output = synthesize_nrem(chunks)
elif mode == "early-rem":
output = synthesize_early_rem(chunks, "")
elif mode == "late-rem":
output = synthesize_late_rem(chunks, "", "")
else:
output = synthesize_lucid(chunks, task)
filepath = deliver(output, mode, task=task)
state = load_dreamer_state()
state["last_dream_timestamp"] = datetime.now().timestamp()
state["last_dream_mode"] = mode
state["last_dream_file"] = filepath
save_dreamer_state(state)
notify_sse(mode, filepath.split("/")[-1])
print(f"\n{'='*60}")
print(output)
print(f"{'='*60}")
print(f"\nDelivered to {filepath}")
return filepath
# ─── CLI ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str)
parser.add_argument(
"--type-filter", type=str, default=None,
help="Comma-separated embeddings.type allowlist (e.g. 'document,aaronai_conversation'). "
"Applies to pgvector retrieval only; Graphiti chunks are not filtered. "
"Experimental — default is no filter, no behavior change.",
)
args = parser.parse_args()
type_filter = [t.strip() for t in args.type_filter.split(",")] if args.type_filter else None
if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?", type_filter=type_filter)
elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task, type_filter=type_filter)
else:
# Default: full pipeline
dream_pipeline(type_filter=type_filter)