Files
aaronAI/scripts/experiments/e3_dreamer_substrate.py
T

223 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
E3 — Dreamer Substrate Comparison Experiment
Runs dream pipeline twice: once against pgvector (Condition A),
once against Graphiti (Condition B). Implements blinding per protocol.
Usage:
python3 scripts/experiments/e3_dreamer_substrate.py
Outputs:
Journal/Dreams/e3-dream-X-{mode}.md (one condition, identity concealed)
Journal/Dreams/e3-dream-Y-{mode}.md (other condition, identity concealed)
Journal/Dreams/e3-blind-mapping.json (kept unread until ratings complete)
Journal/Dreams/e3-manifest.json (quantitative metrics for both conditions)
"""
import os
import sys
import json
import random
import requests
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL", "https://nextcloud.aaronnelson.studio")
NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV_BASE = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
def webdav_put(url, content_str):
requests.put(url, data=content_str.encode("utf-8"), auth=auth, timeout=60)
def webdav_mkcol(url):
requests.request("MKCOL", url, auth=auth, timeout=10)
def run_condition(label, substrate, modes=("nrem", "early-rem", "late-rem", "synthesis")):
"""Run the dream pipeline for one substrate condition.
Returns dict of {mode: dream_text} and metrics.
"""
print(f"\n{'='*60}")
print(f"Running Condition {label} — substrate: {substrate}")
print(f"{'='*60}")
os.environ["DREAMER_SUBSTRATE"] = substrate
# Import dream module fresh for this run
if "dream" in sys.modules:
del sys.modules["dream"]
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
import dream as d
# Run pipeline stages manually to capture outputs
outputs = {}
metrics = {}
# Observe corpus delta
delta = d.observe_corpus()
print(f"Corpus delta: {delta.get('new_chunks', 0)} new chunks")
previously_retrieved = set()
session_retrieved = set()
nrem_high_sources = set()
# NREM
print("\n[NREM] Retrieving...")
nrem_chunks = d.retrieve("nrem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in nrem_chunks)
nrem_high_sources = {c["source"] for c in nrem_chunks if c.get("similarity", 0) > 0.55}
if not nrem_chunks:
print("[NREM] No chunks — aborting.")
return None, None
nrem_output = d.synthesize_nrem(nrem_chunks)
outputs["nrem"] = nrem_output
metrics["nrem"] = {
"chunks_retrieved": len(nrem_chunks),
"sources": [c["source"] for c in nrem_chunks],
"word_count": len(nrem_output.split()),
}
print(f"[NREM] Done. {len(nrem_chunks)} chunks, {metrics['nrem']['word_count']} words.")
# Early REM
print("\n[Early REM] Retrieving...")
early_chunks = d.retrieve("early-rem", excluded_sources=previously_retrieved | nrem_high_sources)
session_retrieved.update(c["source"] for c in early_chunks)
if early_chunks:
early_output = d.synthesize_early_rem(early_chunks, nrem_output)
outputs["early-rem"] = early_output
metrics["early-rem"] = {
"chunks_retrieved": len(early_chunks),
"sources": [c["source"] for c in early_chunks],
"word_count": len(early_output.split()),
}
print(f"[Early REM] Done. {len(early_chunks)} chunks.")
else:
early_output = nrem_output
outputs["early-rem"] = None
metrics["early-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
print("[Early REM] No chunks — using NREM fallback.")
# Late REM
print("\n[Late REM] Retrieving...")
late_chunks = d.retrieve("late-rem", excluded_sources=previously_retrieved | session_retrieved)
session_retrieved.update(c["source"] for c in late_chunks)
if late_chunks:
late_output = d.synthesize_late_rem(late_chunks, nrem_output, early_output)
outputs["late-rem"] = late_output
late_sources = [c["source"] for c in late_chunks]
metrics["late-rem"] = {
"chunks_retrieved": len(late_chunks),
"sources": late_sources,
"word_count": len(late_output.split()),
}
print(f"[Late REM] Done. {len(late_chunks)} chunks.")
else:
late_output = early_output
outputs["late-rem"] = None
metrics["late-rem"] = {"chunks_retrieved": 0, "sources": [], "word_count": 0}
print("[Late REM] No chunks — using fallback.")
# Synthesis
print("\n[Synthesis] Integrating...")
synthesis_output = d.synthesize_final(nrem_output, early_output, late_output)
outputs["synthesis"] = synthesis_output
metrics["synthesis"] = {"word_count": len(synthesis_output.split())}
print(f"[Synthesis] Done. {metrics['synthesis']['word_count']} words.")
return outputs, metrics
def main():
date_str = datetime.now().strftime("%Y-%m-%d")
# Randomize which condition gets label X vs Y for blinding
conditions = [("pgvector", "A"), ("graphiti", "B")]
labels = ["X", "Y"]
random.shuffle(labels)
assignment = {conditions[0][0]: labels[0], conditions[1][0]: labels[1]}
mapping = {
"date": date_str,
"X": assignment["pgvector"] == "X" and "pgvector" or "graphiti",
"Y": assignment["pgvector"] == "Y" and "pgvector" or "graphiti",
"reveal_after_rating": True,
}
# Simpler mapping
mapping = {
"date": date_str,
"X_is": "pgvector" if assignment["pgvector"] == "X" else "graphiti",
"Y_is": "pgvector" if assignment["pgvector"] == "Y" else "graphiti",
"reveal_after_rating": True,
}
print(f"E3 Dreamer Substrate Comparison — {date_str}")
print(f"Blind mapping written (do not read until ratings complete)")
# Write blind mapping to Nextcloud immediately
webdav_mkcol(DREAMS_WEBDAV_BASE)
mapping_url = f"{DREAMS_WEBDAV_BASE}/e3-blind-mapping.json"
webdav_put(mapping_url, json.dumps(mapping, indent=2))
print(f"Mapping written to Journal/Dreams/e3-blind-mapping.json")
all_metrics = {}
# Run both conditions
for substrate, condition_label in conditions:
blind_label = assignment[substrate]
outputs, metrics = run_condition(condition_label, substrate)
if outputs is None:
print(f"Condition {condition_label} failed — aborting E3.")
return
all_metrics[blind_label] = {
"substrate": "CONCEALED", # concealed until reveal
"stages": metrics,
}
# Write dream files with blind labels
modes_map = {
"nrem": "nrem",
"early-rem": "early-rem",
"late-rem": "late-rem",
"synthesis": "synthesis",
}
for mode, text in outputs.items():
if text is None:
continue
filename = f"e3-dream-{blind_label}-{mode}.md"
header = f"# E3 Dream — {blind_label}{mode.upper()}{date_str}\n"
header += f"*Substrate concealed until rating complete*\n\n---\n\n"
file_content = header + text
url = f"{DREAMS_WEBDAV_BASE}/{filename}"
webdav_put(url, file_content)
print(f"Written: Journal/Dreams/{filename}")
# Write quantitative metrics (substrate concealed)
e3_manifest = {
"date": date_str,
"experiment": "E3",
"protocol": "E3-dreamer-substrate-comparison-protocol.md",
"blind_labels": ["X", "Y"],
"substrate_concealed": True,
"metrics": all_metrics,
"rating_dimensions": ["Texture", "Reach", "Specificity", "Forward-facing"],
"rating_scale": "1-5 per dimension per dream",
}
manifest_url = f"{DREAMS_WEBDAV_BASE}/e3-manifest.json"
webdav_put(manifest_url, json.dumps(e3_manifest, indent=2))
print(f"\nE3 manifest written to Journal/Dreams/e3-manifest.json")
print(f"\n{'='*60}")
print("E3 runs complete. Eight dream files written.")
print("Rate all 8 dreams before reading e3-blind-mapping.json")
print(f"{'='*60}")
if __name__ == "__main__":
main()