""" Aaron AI Dreamer — Active Inference Engine Interdependent stage architecture grounded in sleep consolidation research. Nightly pipeline: NREM → Early REM → Late REM → Synthesis Each stage receives the previous stage's output as context. Lucid mode is on-demand only (Dream Now from settings). Research basis: - Singh et al. PNAS 2022: alternating NREM/REM outperforms single-stage approaches - Klinzing et al. Nature Neuroscience 2019: SO-spindle-ripple coupling is interdependent - REM operates on what NREM produced — stages are not discrete alternatives """ import os import json import sqlite3 import argparse from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv import psycopg2 load_dotenv(Path.home() / "aaronai" / ".env") PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") def get_pg(): return psycopg2.connect(PG_DSN) # ─── Paths ────────────────────────────────────────────────────────────────── CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json") JOURNAL_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily" NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio") NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron") NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "") DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams" # Similarity ranges calibrated for all-MiniLM-L6-v2 MODE_RANGES = { "nrem": (0.48, 0.72), "early-rem": (0.38, 0.55), "late-rem": (0.22, 0.42), "lucid": (0.32, 0.72), } # ─── Stage 1: Observe ─────────────────────────────────────────────────────── def observe_corpus(): state = load_dreamer_state() last_dream = state.get("last_dream_timestamp", 0) new_chunk_count = 0 try: watcher_state = json.loads(Path(WATCHER_STATE).read_text()) for path, mtime in watcher_state.items(): if float(mtime) > last_dream: new_chunk_count += 1 except: pass days_since = (datetime.now().timestamp() - last_dream) / 86400 recent_topics = get_recent_conversation_topics() return { "new_chunks": new_chunk_count, "days_since_dream": days_since, "recent_topics": recent_topics, "last_dream": last_dream, } def get_recent_conversation_topics(days=14): try: conn = sqlite3.connect(CONVERSATIONS_DB) cutoff = (datetime.now() - timedelta(days=days)).isoformat() c = conn.cursor() c.execute(""" SELECT m.content FROM messages m JOIN conversations c ON m.conversation_id = c.id WHERE m.role = 'user' AND c.updated_at > ? ORDER BY m.timestamp DESC LIMIT 20 """, (cutoff,)) rows = c.fetchall() conn.close() return [r[0][:200] for r in rows] except: return [] # ─── Stage 2: Retrieve ────────────────────────────────────────────────────── def retrieve(mode, task=None, n_results=8): from sentence_transformers import SentenceTransformer embedder = SentenceTransformer("all-MiniLM-L6-v2") low, high = MODE_RANGES[mode] if task: query = task elif mode == "late-rem": delta = observe_corpus() topics = delta.get("recent_topics", []) query = topics[0] if topics else "practice place memory making" elif mode == "early-rem": query = "career decision personal change what matters next" else: query = "research fabrication teaching practice recent work" embedding = embedder.encode([query]).tolist()[0] chunks = [] seen_sources = set() try: pg = get_pg() cur = pg.cursor() cur.execute(""" SELECT document, source, 1 - (embedding <=> %s::vector) as similarity FROM embeddings ORDER BY embedding <=> %s::vector LIMIT %s """, (embedding, embedding, n_results * 3)) for doc, source, similarity in cur.fetchall(): if not (low <= similarity <= high): continue if source in seen_sources: continue chunks.append({ "source": source or "unknown", "content": doc, "relevance": similarity, }) seen_sources.add(source) if len(chunks) >= n_results: break pg.close() except Exception as e: print(f"pgvector retrieval error: {e}") return chunks # ─── Stage 3: Synthesize ──────────────────────────────────────────────────── def synthesize_nrem(chunks): chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) prompt = f"""You have read everything Aaron Nelson has written and published. You are a careful colleague who noticed something this week. Here is material from his corpus: {chunk_text} Write to Aaron directly. Identify one specific connection between this material and something he wrote or worked on previously. Stay close to the documents — cite them specifically by name. Do not speculate beyond what the material supports. Do not use headers or bullet points. Write one paragraph of 200-300 words that ends with a single concrete question he could act on.""" return _call_claude(prompt) def synthesize_early_rem(chunks, nrem_output): chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) prompt = f"""You have been thinking about Aaron's situation. You know his work intimately — his decade building HVAMC at New Paltz, the career decision he is facing, the Tulsa project he keeps returning to, the gap between what he has built and what he wants to build next. Earlier tonight, while consolidating his recent work, you noticed this: {nrem_output} That observation has been with you. Now, here is material from his corpus that has been on your mind alongside it: {chunk_text} Write to him the way a close friend who has read everything he has ever written would write — someone who knows where the professional and personal are tangled together and is not afraid to say so. Let what was noticed earlier inform what you say now, but don't just repeat it — go further into the personal territory it opens up. Personal register. Specific citations. Do not avoid the difficult thing. No headers, no bullet points. 200-350 words. End with something forward-facing — a question or an offer.""" return _call_claude(prompt) def synthesize_late_rem(chunks, nrem_output, early_rem_output): chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) prompt = f"""You have been moving through Aaron Nelson's corpus all night. First you found this, in the careful light of early consolidation: {nrem_output} Then, in the more personal territory that followed: {early_rem_output} Now it is late. The boundaries between things have loosened. Here is material pulled from opposite ends of his work: {chunk_text} Do not explain the connections between all of this. Do not resolve them. Do not summarize what came before. Something stranger is possible now — let the accumulated material from the night find its own shape. Compressed, associative, slightly off. Let the strangeness stand. No headers. No bullet points. No hedging. 150-250 words. Something at the end that he could follow if he wanted to.""" return _call_claude(prompt) def synthesize_final(nrem_output, early_rem_output, late_rem_output): prompt = f"""You have spent the night moving through Aaron Nelson's corpus in three passes, each building on the last. The first pass — careful, close to the documents: {nrem_output} The second pass — more personal, following what the first opened: {early_rem_output} The third pass — associative, strange, letting things touch that don't normally touch: {late_rem_output} Now synthesize. Not a summary — a synthesis. Find what runs through all three that none of them said directly. The thing that only becomes visible when you hold all three passes together. Write it as a single unbroken piece. No headers, no bullet points, no stage labels. 200-300 words. End with the one question that matters most right now.""" return _call_claude(prompt, max_tokens=800) def synthesize_lucid(chunks, task): chunk_text = "\n\n---\n\n".join([f"[{c['source']}]\n{c['content']}" for c in chunks]) prompt = f"""Aaron has a question he is sitting with: {task or "What should I be thinking about that I am not?"} You have searched his entire corpus and found material that speaks to this question from unexpected directions. Here is what you found: {chunk_text} Do not summarize. Do not list. Pick the most interesting tension between what the corpus contains and what he is asking, and follow it through to its conclusion. Cite specific documents by name. Be direct about what you think. No headers, no bullet points. 250-400 words. End with an offer to work on it together.""" return _call_claude(prompt) def _call_claude(prompt, max_tokens=1000): import anthropic client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) response = client.messages.create( model="claude-sonnet-4-6", max_tokens=max_tokens, messages=[{"role": "user", "content": prompt}] ) return response.content[0].text # ─── Stage 4: Deliver ─────────────────────────────────────────────────────── def deliver(dream_text, mode, task=None): import requests date_str = datetime.now().strftime("%Y-%m-%d") filename = f"{date_str}-{mode}.md" header = f"# Dream — {mode.upper()} — {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n" if task: header += f"*Task: {task}*\n\n" header += "---\n\n" content = header + dream_text auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD) requests.request("MKCOL", DREAMS_WEBDAV, auth=auth, timeout=10) url = f"{DREAMS_WEBDAV}/{filename}" counter = 1 while True: check = requests.request("PROPFIND", url, auth=auth, timeout=10) if check.status_code == 404: break filename = f"{date_str}-{mode}-{counter}.md" url = f"{DREAMS_WEBDAV}/{filename}" counter += 1 response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30) response.raise_for_status() print(f"Delivered: Journal/Dreams/{filename}") return f"Journal/Dreams/{filename}" def notify_sse(mode, filename): try: import requests requests.post("http://localhost:8000/api/events/notify", json={ "type": "dream", "mode": mode, "filename": filename, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M"), }, timeout=3) except Exception as e: print(f"SSE notify failed (non-critical): {e}") # ─── State ────────────────────────────────────────────────────────────────── def load_dreamer_state(): p = Path(DREAMER_STATE) if p.exists(): try: return json.loads(p.read_text()) except: pass return {} def save_dreamer_state(state): Path(DREAMER_STATE).write_text(json.dumps(state, indent=2)) # ─── Orchestrators ─────────────────────────────────────────────────────────── def dream_pipeline(): """ Full nightly pipeline — interdependent stages. NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis. """ print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}") delta = observe_corpus() print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream") # ── Stage 1: NREM ────────────────────────────────────────────────────── print("\n[NREM] Retrieving...") nrem_chunks = retrieve("nrem") if not nrem_chunks: print("[NREM] No suitable chunks — aborting pipeline") return None print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...") nrem_output = synthesize_nrem(nrem_chunks) nrem_file = deliver(nrem_output, "nrem") print(f"[NREM] Done.\n{nrem_output[:200]}...") # ── Stage 2: Early REM — informed by NREM ────────────────────────────── print("\n[Early REM] Retrieving...") early_chunks = retrieve("early-rem") if not early_chunks: print("[Early REM] No suitable chunks — skipping") early_rem_output = nrem_output # fallback else: print(f"[Early REM] Retrieved {len(early_chunks)} chunks. Synthesizing with NREM context...") early_rem_output = synthesize_early_rem(early_chunks, nrem_output) deliver(early_rem_output, "early-rem") print(f"[Early REM] Done.\n{early_rem_output[:200]}...") # ── Stage 3: Late REM — informed by NREM + Early REM ────────────────── print("\n[Late REM] Retrieving...") late_chunks = retrieve("late-rem") if not late_chunks: print("[Late REM] No suitable chunks — skipping") late_rem_output = early_rem_output # fallback else: print(f"[Late REM] Retrieved {len(late_chunks)} chunks. Synthesizing with full context...") late_rem_output = synthesize_late_rem(late_chunks, nrem_output, early_rem_output) deliver(late_rem_output, "late-rem") print(f"[Late REM] Done.\n{late_rem_output[:200]}...") # ── Stage 4: Synthesis — all three stages ───────────────────────────── print("\n[Synthesis] Integrating all stages...") synthesis_output = synthesize_final(nrem_output, early_rem_output, late_rem_output) synthesis_file = deliver(synthesis_output, "synthesis") print(f"\n{'='*60}") print("SYNTHESIS:") print(synthesis_output) print(f"{'='*60}") # Update state and notify state = load_dreamer_state() state["last_dream_timestamp"] = datetime.now().timestamp() state["last_dream_mode"] = "pipeline" state["last_dream_file"] = synthesis_file save_dreamer_state(state) notify_sse("synthesis", synthesis_file.split("/")[-1]) print(f"\nPipeline complete. Synthesis: {synthesis_file}") return synthesis_file def dream_lucid(task): """On-demand lucid dream — single mode, used by Dream Now in settings.""" print(f"Lucid dream starting — task: {task[:80] if task else 'none'}") chunks = retrieve("lucid", task=task) if not chunks: print("No suitable chunks — aborting") return None print(f"Retrieved {len(chunks)} chunks. Synthesizing...") output = synthesize_lucid(chunks, task) filepath = deliver(output, "lucid", task=task) state = load_dreamer_state() state["last_dream_timestamp"] = datetime.now().timestamp() state["last_dream_mode"] = "lucid" state["last_dream_file"] = filepath save_dreamer_state(state) notify_sse("lucid", filepath.split("/")[-1]) print(f"\n{'='*60}") print(output) print(f"{'='*60}") print(f"\nDelivered to {filepath}") return filepath def dream_single(mode, task=None): """ Single mode — used by Dream Now for non-lucid modes. Runs one stage independently (for testing/tuning individual stages). """ print(f"Single mode dream: {mode}") chunks = retrieve(mode, task=task) if not chunks: print("No suitable chunks — aborting") return None print(f"Retrieved {len(chunks)} chunks. Synthesizing...") if mode == "nrem": output = synthesize_nrem(chunks) elif mode == "early-rem": output = synthesize_early_rem(chunks, "") elif mode == "late-rem": output = synthesize_late_rem(chunks, "", "") else: output = synthesize_lucid(chunks, task) filepath = deliver(output, mode, task=task) state = load_dreamer_state() state["last_dream_timestamp"] = datetime.now().timestamp() state["last_dream_mode"] = mode state["last_dream_file"] = filepath save_dreamer_state(state) notify_sse(mode, filepath.split("/")[-1]) print(f"\n{'='*60}") print(output) print(f"{'='*60}") print(f"\nDelivered to {filepath}") return filepath # ─── CLI ──────────────────────────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser(description="Aaron AI Dreamer") parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"]) parser.add_argument("--task", type=str) args = parser.parse_args() if args.mode == "lucid": dream_lucid(args.task or "What should I be thinking about that I am not?") elif args.mode and args.mode != "pipeline": dream_single(args.mode, args.task) else: # Default: full pipeline dream_pipeline()