From 91166367fa98fef62a36ec35aa211d984d4ebd91 Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Thu, 30 Apr 2026 17:17:28 +0000 Subject: [PATCH] E3: add Graphiti retrieval branch to dream.py, E3 experiment script with blinding --- scripts/dream.py | 48 +++++ scripts/experiments/e3_dreamer_substrate.py | 222 ++++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 scripts/experiments/e3_dreamer_substrate.py diff --git a/scripts/dream.py b/scripts/dream.py index 0234e10..77f851e 100644 --- a/scripts/dream.py +++ b/scripts/dream.py @@ -110,7 +110,55 @@ def get_recent_conversation_topics(days=14): # ─── Stage 2: Retrieve ────────────────────────────────────────────────────── + +def retrieve_graphiti(mode, task=None, n_results=8): + """E3 experiment — Graphiti substrate retrieval. + Queries Graphiti /search endpoint instead of pgvector. + Returns chunks in same format as retrieve() for pipeline compatibility. + Note: content is Graphiti facts (synthesized relationships), not raw chunks. + """ + import requests as req_lib + if task: + query = task + elif mode == "late-rem": + delta = observe_corpus() + topics = delta.get("recent_topics", []) + query = topics[0] if topics else "practice place memory making" + elif mode == "early-rem": + query = "career decision personal change what matters next" + else: + query = "research fabrication teaching practice recent work" + + try: + resp = req_lib.get( + "http://localhost:8001/search", + params={"query": query, "limit": n_results, "group_id": "aaron"}, + timeout=30, + ) + resp.raise_for_status() + results = resp.json().get("results", []) + chunks = [] + for r in results: + fact = r.get("fact", "") + if not fact.strip(): + continue + chunks.append({ + "source": r.get("source", "graphiti"), + "content": fact, + "relevance": r.get("score", 0.5), + "similarity": r.get("score", 0.5), + }) + return chunks + except Exception as e: + print(f"[Graphiti retrieval error: {e}] — falling back to empty.") + return [] + def retrieve(mode, task=None, n_results=8, excluded_sources=None): + # E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search + # Default behavior: pgvector similarity search (unchanged) + substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") + if substrate == "graphiti": + return retrieve_graphiti(mode, task=task, n_results=n_results) from sentence_transformers import SentenceTransformer embedder = SentenceTransformer("all-MiniLM-L6-v2") low, high = MODE_RANGES[mode] diff --git a/scripts/experiments/e3_dreamer_substrate.py b/scripts/experiments/e3_dreamer_substrate.py new file mode 100644 index 0000000..f274133 --- /dev/null +++ b/scripts/experiments/e3_dreamer_substrate.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +E3 — Dreamer Substrate Comparison Experiment +Runs dream pipeline twice: once against pgvector (Condition A), +once against Graphiti (Condition B). Implements blinding per protocol. + +Usage: + python3 scripts/experiments/e3_dreamer_substrate.py + +Outputs: + Journal/Dreams/e3-dream-X-{mode}.md (one condition, identity concealed) + Journal/Dreams/e3-dream-Y-{mode}.md (other condition, identity concealed) + Journal/Dreams/e3-blind-mapping.json (kept unread until ratings complete) + Journal/Dreams/e3-manifest.json (quantitative metrics for both conditions) +""" + +import os +import sys +import json +import random +import requests +from pathlib import Path +from datetime import datetime +from dotenv import load_dotenv + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio") +NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron") +NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "") +DREAMS_WEBDAV_BASE = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams" + +auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD) + +def webdav_put(url, content_str): + requests.put(url, data=content_str.encode("utf-8"), auth=auth, timeout=60) + +def webdav_mkcol(url): + requests.request("MKCOL", url, auth=auth, timeout=10) + +def run_condition(label, substrate, modes=("nrem", "early-rem", "late-rem", "synthesis")): + """Run the dream pipeline for one substrate condition. + Returns dict of {mode: dream_text} and metrics. + """ + print(f"\n{'='*60}") + print(f"Running Condition {label} — substrate: {substrate}") + print(f"{'='*60}") + + os.environ["DREAMER_SUBSTRATE"] = substrate + + # Import dream module fresh for this run + if "dream" in sys.modules: + del sys.modules["dream"] + sys.path.insert(0, str(Path.home() / "aaronai" / "scripts")) + import dream as d + + # Run pipeline stages manually to capture outputs + outputs = {} + metrics = {} + + # Observe corpus delta + delta = d.observe_corpus() + print(f"Corpus delta: {delta.get('new_chunks', 0)} new chunks") + + previously_retrieved = set() + session_retrieved = set() + nrem_high_sources = set() + + # NREM + print("\n[NREM] Retrieving...") + nrem_chunks = d.retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved) + session_retrieved.update(c["source"] for c in nrem_chunks) + nrem_high_sources = {c["source"] for c in nrem_chunks if c.get("similarity", 0) > 0.55} + if not nrem_chunks: + print("[NREM] No chunks — aborting.") + return None, None + nrem_output = d.synthesize_nrem(nrem_chunks) + outputs["nrem"] = nrem_output + metrics["nrem"] = { + "chunks_retrieved": len(nrem_chunks), + "sources": [c["source"] for c in nrem_chunks], + "word_count": len(nrem_output.split()), + } + print(f"[NREM] Done. {len(nrem_chunks)} chunks, {metrics['nrem']['word_count']} words.") + + # Early REM + print("\n[Early REM] Retrieving...") + early_chunks = d.retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources) + session_retrieved.update(c["source"] for c in early_chunks) + if early_chunks: + early_output = d.synthesize_early_rem(early_chunks, nrem_output) + outputs["early-rem"] = early_output + metrics["early-rem"] = { + "chunks_retrieved": len(early_chunks), + "sources": [c["source"] for c in early_chunks], + "word_count": len(early_output.split()), + } + print(f"[Early REM] Done. {len(early_chunks)} chunks.") + else: + early_output = nrem_output + outputs["early-rem"] = None + metrics["early-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0} + print("[Early REM] No chunks — using NREM fallback.") + + # Late REM + print("\n[Late REM] Retrieving...") + late_chunks = d.retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved) + session_retrieved.update(c["source"] for c in late_chunks) + if late_chunks: + late_output = d.synthesize_late_rem(late_chunks, nrem_output, early_output) + outputs["late-rem"] = late_output + late_sources = [c["source"] for c in late_chunks] + metrics["late-rem"] = { + "chunks_retrieved": len(late_chunks), + "sources": late_sources, + "word_count": len(late_output.split()), + } + print(f"[Late REM] Done. {len(late_chunks)} chunks.") + else: + late_output = early_output + outputs["late-rem"] = None + metrics["late-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0} + print("[Late REM] No chunks — using fallback.") + + # Synthesis + print("\n[Synthesis] Integrating...") + synthesis_output = d.synthesize_final(nrem_output, early_output, late_output) + outputs["synthesis"] = synthesis_output + metrics["synthesis"] = {"word_count": len(synthesis_output.split())} + print(f"[Synthesis] Done. {metrics['synthesis']['word_count']} words.") + + return outputs, metrics + + +def main(): + date_str = datetime.now().strftime("%Y-%m-%d") + + # Randomize which condition gets label X vs Y for blinding + conditions = [("pgvector", "A"), ("graphiti", "B")] + labels = ["X", "Y"] + random.shuffle(labels) + assignment = {conditions[0][0]: labels[0], conditions[1][0]: labels[1]} + + mapping = { + "date": date_str, + "X": assignment["pgvector"] == "X" and "pgvector" or "graphiti", + "Y": assignment["pgvector"] == "Y" and "pgvector" or "graphiti", + "reveal_after_rating": True, + } + # Simpler mapping + mapping = { + "date": date_str, + "X_is": "pgvector" if assignment["pgvector"] == "X" else "graphiti", + "Y_is": "pgvector" if assignment["pgvector"] == "Y" else "graphiti", + "reveal_after_rating": True, + } + + print(f"E3 Dreamer Substrate Comparison — {date_str}") + print(f"Blind mapping written (do not read until ratings complete)") + + # Write blind mapping to Nextcloud immediately + webdav_mkcol(DREAMS_WEBDAV_BASE) + mapping_url = f"{DREAMS_WEBDAV_BASE}/e3-blind-mapping.json" + webdav_put(mapping_url, json.dumps(mapping, indent=2)) + print(f"Mapping written to Journal/Dreams/e3-blind-mapping.json") + + all_metrics = {} + + # Run both conditions + for substrate, condition_label in conditions: + blind_label = assignment[substrate] + outputs, metrics = run_condition(condition_label, substrate) + + if outputs is None: + print(f"Condition {condition_label} failed — aborting E3.") + return + + all_metrics[blind_label] = { + "substrate": "CONCEALED", # concealed until reveal + "stages": metrics, + } + + # Write dream files with blind labels + modes_map = { + "nrem": "nrem", + "early-rem": "early-rem", + "late-rem": "late-rem", + "synthesis": "synthesis", + } + for mode, text in outputs.items(): + if text is None: + continue + filename = f"e3-dream-{blind_label}-{mode}.md" + header = f"# E3 Dream — {blind_label} — {mode.upper()} — {date_str}\n" + header += f"*Substrate concealed until rating complete*\n\n---\n\n" + file_content = header + text + url = f"{DREAMS_WEBDAV_BASE}/{filename}" + webdav_put(url, file_content) + print(f"Written: Journal/Dreams/{filename}") + + # Write quantitative metrics (substrate concealed) + e3_manifest = { + "date": date_str, + "experiment": "E3", + "protocol": "E3-dreamer-substrate-comparison-protocol.md", + "blind_labels": ["X", "Y"], + "substrate_concealed": True, + "metrics": all_metrics, + "rating_dimensions": ["Texture", "Reach", "Specificity", "Forward-facing"], + "rating_scale": "1-5 per dimension per dream", + } + manifest_url = f"{DREAMS_WEBDAV_BASE}/e3-manifest.json" + webdav_put(manifest_url, json.dumps(e3_manifest, indent=2)) + print(f"\nE3 manifest written to Journal/Dreams/e3-manifest.json") + print(f"\n{'='*60}") + print("E3 runs complete. Eight dream files written.") + print("Rate all 8 dreams before reading e3-blind-mapping.json") + print(f"{'='*60}") + + +if __name__ == "__main__": + main()