Compare commits
8 Commits
313c0f0341
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 5582549321 | |||
| 3ec9a48151 | |||
| 9d09d3fa14 | |||
| f185ed60cb | |||
| a4735053c2 | |||
| f682d8c6a0 | |||
| 151c756b89 | |||
| e96bf40b2f |
+131
-1
@@ -141,6 +141,19 @@ consulting" not "my work." Results are unfiltered and ranked by
|
||||
semantic similarity; judge each chunk for relevance and ignore
|
||||
irrelevant hits rather than forcing them into the answer.
|
||||
|
||||
You also have a search_facts tool that queries a knowledge graph of
|
||||
atomic facts about Aaron's entities and their relationships. The graph
|
||||
was populated through early May 2026 and is not currently being
|
||||
updated; treat it as a *historical* layer that holds biographical
|
||||
content (career, projects, consulting), exhibition records, key
|
||||
people, dossier-era claims, and time-stamped facts with explicit
|
||||
validity windows. For biographical or relational questions ("write
|
||||
me a bio", "what's the FWN3D / HVAMC relationship", "who did I
|
||||
consult for at IBM"), call search_facts *in addition to*
|
||||
retrieve_documents — the two return complementary shapes (atomic
|
||||
facts vs. document passages). For current-state questions, the
|
||||
persistent memory file is more authoritative than the graph.
|
||||
|
||||
When Aaron asks for a document file — bio, cover letter, statement,
|
||||
CV section, anything he wants to send or edit outside chat — produce
|
||||
the full text as your chat reply first. NEVER call save_document on
|
||||
@@ -440,6 +453,111 @@ DRAFTS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Drafts"
|
||||
_FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]")
|
||||
|
||||
|
||||
GRAPHITI_URL = os.getenv("GRAPHITI_URL", "http://localhost:8001")
|
||||
GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
|
||||
|
||||
|
||||
SEARCH_FACTS_TOOL = {
|
||||
"name": "search_facts",
|
||||
"description": (
|
||||
"Search Aaron's knowledge graph for atomic facts about entities and "
|
||||
"their relationships. The graph holds time-stamped facts captured up "
|
||||
"to early May 2026 — biographical content (career, projects, "
|
||||
"consulting), exhibition history, key relationships, dossier-era "
|
||||
"claims. Returns short sentence-shaped facts with valid_at / "
|
||||
"invalid_at timestamps so you can distinguish current state from "
|
||||
"superseded history. Useful for: bios, 'who did I consult for', "
|
||||
"'what's the relationship between X and Y', any question shaped like "
|
||||
"a relational lookup. Complements retrieve_documents (which returns "
|
||||
"longer chunk passages). Call this *in addition to* retrieve_documents "
|
||||
"for biographical or relational questions — the two return "
|
||||
"different shapes of evidence. The graph hasn't been updated since "
|
||||
"early May 2026; for current-state questions, the persistent memory "
|
||||
"file or recent documents are more authoritative."
|
||||
),
|
||||
"input_schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "The fact-shaped query. Concrete entity names work best.",
|
||||
},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message):
|
||||
"""Async fire-and-forget push of a chat turn into Graphiti. Single episode,
|
||||
default extraction, no custom_extraction_instructions. Takes ~20 min in
|
||||
the background against the current ~4,300-entity graph; the chat caller
|
||||
is not gated on this. Errors are logged, never raised."""
|
||||
if os.getenv("SKIP_GRAPHITI_CHAT_PUSH"):
|
||||
return
|
||||
if not (user_message or "").strip() and not (assistant_message or "").strip():
|
||||
return
|
||||
import threading
|
||||
from datetime import datetime as _dt
|
||||
|
||||
def _work():
|
||||
try:
|
||||
episode_name = f"chat-{conversation_id[:8]}-{_dt.now().strftime('%Y%m%dT%H%M%S')}"
|
||||
content = (
|
||||
f"User: {user_message}\n\n"
|
||||
f"Assistant: {assistant_message}"
|
||||
)
|
||||
payload = {
|
||||
"name": episode_name,
|
||||
"content": content,
|
||||
"source_description": f"chat turn (conversation {conversation_id})",
|
||||
"timestamp": _dt.now().isoformat(),
|
||||
"group_id": GRAPHITI_GROUP_ID,
|
||||
}
|
||||
# Long timeout — sidecar add_episode against the current graph
|
||||
# is empirically ~20 min wall-clock. We're patient; chat isn't.
|
||||
r = requests.post(f"{GRAPHITI_URL}/episodes", json=payload, timeout=1800)
|
||||
if r.status_code == 200:
|
||||
print(f"[graphiti-push] turn ingested: {episode_name}", flush=True)
|
||||
else:
|
||||
print(f"[graphiti-push] non-200 ({r.status_code}) for {episode_name}: {r.text[:200]}", flush=True)
|
||||
except requests.RequestException as e:
|
||||
print(f"[graphiti-push] request failed: {e}", flush=True)
|
||||
except Exception as e:
|
||||
print(f"[graphiti-push] unexpected error: {e}", flush=True)
|
||||
|
||||
threading.Thread(target=_work, daemon=True).start()
|
||||
|
||||
|
||||
def _execute_search_facts(tool_input):
|
||||
"""Hit Graphiti /search, format the results as text for Claude."""
|
||||
query = (tool_input or {}).get("query", "").strip()
|
||||
if not query:
|
||||
return "No query provided."
|
||||
try:
|
||||
r = requests.get(
|
||||
f"{GRAPHITI_URL}/search",
|
||||
params={"query": query, "limit": 8, "group_id": GRAPHITI_GROUP_ID},
|
||||
timeout=15,
|
||||
)
|
||||
except requests.RequestException as e:
|
||||
return f"search_facts: Graphiti unreachable ({e})."
|
||||
if r.status_code != 200:
|
||||
return f"search_facts: Graphiti returned {r.status_code}."
|
||||
results = r.json().get("results", [])
|
||||
if not results:
|
||||
return f"No facts found for {query!r}."
|
||||
lines = []
|
||||
for i, f in enumerate(results, 1):
|
||||
fact = f.get("fact", "").strip()
|
||||
valid_at = f.get("valid_at") or "?"
|
||||
invalid_at = f.get("invalid_at")
|
||||
validity = (f"valid {valid_at}" + (f" → superseded {invalid_at}"
|
||||
if invalid_at and invalid_at != "None" else ""))
|
||||
lines.append(f"[{i}] {fact} ({validity})")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
SAVE_DOCUMENT_TOOL = {
|
||||
"name": "save_document",
|
||||
"description": (
|
||||
@@ -633,7 +751,7 @@ def chat(user_message, conversation_id, settings, client_time=None):
|
||||
|
||||
messages = history + [{"role": "user", "content": full_message}]
|
||||
|
||||
tools = [RETRIEVE_DOCUMENTS_TOOL, SAVE_DOCUMENT_TOOL]
|
||||
tools = [RETRIEVE_DOCUMENTS_TOOL, SEARCH_FACTS_TOOL, SAVE_DOCUMENT_TOOL]
|
||||
if settings.get("web_search", True):
|
||||
tools.append({"type": "web_search_20250305", "name": "web_search"})
|
||||
|
||||
@@ -672,6 +790,13 @@ def chat(user_message, conversation_id, settings, client_time=None):
|
||||
"tool_use_id": block.id,
|
||||
"content": result_text,
|
||||
})
|
||||
elif block.name == "search_facts":
|
||||
result_text = _execute_search_facts(block.input)
|
||||
tool_results.append({
|
||||
"type": "tool_result",
|
||||
"tool_use_id": block.id,
|
||||
"content": result_text,
|
||||
})
|
||||
elif block.name == "save_document":
|
||||
result_text = _execute_save_document(block.input)
|
||||
tool_results.append({
|
||||
@@ -691,6 +816,11 @@ def chat(user_message, conversation_id, settings, client_time=None):
|
||||
for block in response.content:
|
||||
if hasattr(block, "text"):
|
||||
assistant_message += block.text
|
||||
# Async fire-and-forget into Graphiti so the turn lands in the
|
||||
# graph as a single episode for future search_facts queries to
|
||||
# find. Takes ~20 min wall-clock in the background; chat returns
|
||||
# immediately. Disable via SKIP_GRAPHITI_CHAT_PUSH=1 if needed.
|
||||
_push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message)
|
||||
# Cap citations: accumulated_sources can grow large across multiple
|
||||
# retrieve_documents calls and not every chunk that came back was
|
||||
# actually used in the answer. Insertion order preserves rank
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
"""One-off: backfill last_consolidated_at + consolidation_count on embeddings
|
||||
from the dream-manifest-*.json files already in Journal/Dreams/.
|
||||
|
||||
Why this exists: the consolidation cursor columns added by the dreamer
|
||||
redesign migration default to NULL / 0. Without history, the
|
||||
underprocessed-count signal in dream_observation.observe_corpus() reports
|
||||
"every chunk is underprocessed" (degenerate percentile), and NREM has no
|
||||
basis to bias replay toward least-recently-consolidated chunks.
|
||||
|
||||
We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each
|
||||
listing the sources retrieved per stage. For each (manifest, source) pair
|
||||
this script:
|
||||
- finds matching embeddings rows by source (basename match)
|
||||
- increments consolidation_count by 1
|
||||
- updates last_consolidated_at to the manifest date (UTC midnight)
|
||||
|
||||
Idempotent: re-running will not double-count because we drop existing
|
||||
cursor values to NULL/0 before backfilling. Pass --dry-run to print what
|
||||
would change without writing.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import psycopg2
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams")
|
||||
DRY_RUN = "--dry-run" in sys.argv
|
||||
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def collect_manifest_records():
|
||||
"""Return a list of (source_basename, manifest_date_utc) tuples from all
|
||||
dream-manifest-*.json files. One pair per (manifest, source) appearance."""
|
||||
pairs = []
|
||||
if not DREAMS_DIR.exists():
|
||||
return pairs
|
||||
for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")):
|
||||
try:
|
||||
m = json.loads(path.read_text())
|
||||
except Exception as e:
|
||||
print(f" skip {path.name}: {e}")
|
||||
continue
|
||||
date_str = m.get("date")
|
||||
if not date_str:
|
||||
continue
|
||||
try:
|
||||
dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
stages = m.get("stages") or {}
|
||||
for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"):
|
||||
stage = stages.get(stage_name) or {}
|
||||
for src in (stage.get("sources") or []):
|
||||
if src:
|
||||
pairs.append((src, dt))
|
||||
return pairs
|
||||
|
||||
|
||||
def main():
|
||||
print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}")
|
||||
print(f"Scanning manifests in {DREAMS_DIR}")
|
||||
pairs = collect_manifest_records()
|
||||
print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests")
|
||||
if not pairs:
|
||||
print("Nothing to backfill.")
|
||||
return
|
||||
|
||||
# Aggregate per source: count + latest date
|
||||
from collections import defaultdict
|
||||
counts = defaultdict(int)
|
||||
latest = {}
|
||||
for src, dt in pairs:
|
||||
counts[src] += 1
|
||||
if src not in latest or dt > latest[src]:
|
||||
latest[src] = dt
|
||||
print(f"Unique sources to update: {len(counts)}")
|
||||
|
||||
# Sample what we'd write
|
||||
print("Sample (top 5 by appearance count):")
|
||||
for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]:
|
||||
print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}")
|
||||
|
||||
if DRY_RUN:
|
||||
print("\nDry-run only. Re-run without --dry-run to apply.")
|
||||
return
|
||||
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
|
||||
# Reset cursor for any sources we're about to backfill so reruns are clean.
|
||||
print("\nResetting cursor for sources we'll touch...")
|
||||
sources = list(counts.keys())
|
||||
cur.execute(
|
||||
"UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 "
|
||||
"WHERE source = ANY(%s)",
|
||||
(sources,),
|
||||
)
|
||||
print(f" reset {cur.rowcount} embeddings rows")
|
||||
|
||||
# Apply per-source updates. For each source, set count and latest date.
|
||||
print("Applying per-source backfill...")
|
||||
updated_rows = 0
|
||||
for src, n in counts.items():
|
||||
cur.execute(
|
||||
"UPDATE embeddings "
|
||||
"SET consolidation_count = %s, last_consolidated_at = %s "
|
||||
"WHERE source = %s",
|
||||
(n, latest[src], src),
|
||||
)
|
||||
updated_rows += cur.rowcount
|
||||
pg.commit()
|
||||
pg.close()
|
||||
print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+342
-64
@@ -23,6 +23,7 @@ from datetime import datetime, timedelta
|
||||
from dotenv import load_dotenv
|
||||
import psycopg2
|
||||
import hashlib
|
||||
import numpy as np
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
@@ -42,6 +43,26 @@ NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
|
||||
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
|
||||
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
|
||||
|
||||
# ─── Retrieval-window config (per dreamer-multimodal-design.md §2) ─────────
|
||||
# Biological grounding: NREM replays recent traces (24-72 hrs); REM links
|
||||
# across time on structural similarity, not temporal proximity. Synthesis
|
||||
# pulls from salience across the full corpus (no window). Spec calls for
|
||||
# these to be mutable rather than hardcoded — this is the mutable home.
|
||||
TIME_WINDOWS_HOURS = {
|
||||
"nrem": 72, # 24-72 hrs, take wider end
|
||||
"early-rem": 24 * 30, # 30 days
|
||||
"late-rem": 24 * 90, # 90 days
|
||||
"lucid": None, # no window
|
||||
}
|
||||
|
||||
# Maximal Marginal Relevance: λ=1 → pure relevance, λ=0 → pure diversity.
|
||||
# 0.5 is the standard balance; tune later if the dossier-cluster problem
|
||||
# isn't sufficiently broken up.
|
||||
MMR_LAMBDA = 0.5
|
||||
|
||||
# Fast/cheap model for query generation. Sonnet for synthesis (in synthesize_*).
|
||||
LLM_QUERY_MODEL = os.getenv("DREAMER_QUERY_MODEL", "claude-haiku-4-5-20251001")
|
||||
|
||||
# Similarity ranges calibrated for all-MiniLM-L6-v2
|
||||
MODE_RANGES = {
|
||||
"nrem": (0.48, 0.72),
|
||||
@@ -289,70 +310,293 @@ def _get_embedder():
|
||||
from sentence_transformers import SentenceTransformer
|
||||
return SentenceTransformer("all-MiniLM-L6-v2")
|
||||
|
||||
def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None):
|
||||
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
|
||||
# Default behavior: pgvector similarity search (unchanged)
|
||||
# type_filter is experimental and applies to pgvector retrieval only — Graphiti
|
||||
# facts are not embeddings rows and have no embeddings.type to filter on.
|
||||
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
|
||||
if substrate == "graphiti":
|
||||
return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources)
|
||||
embedder = _get_embedder()
|
||||
low, high = MODE_RANGES[mode]
|
||||
def _llm_generate_queries(mode, signal, task=None, n_queries=4):
|
||||
"""Park et al. 2023 reflection-style query generation. Feeds the LLM the
|
||||
observation signal + a mode-specific framing; emits N retrieval queries
|
||||
that probe different corners of the recent corpus instead of the same
|
||||
hardcoded string every night. Sources cited in dream_observation.py.
|
||||
|
||||
Falls back to recent_questions from the signal if the LLM call fails."""
|
||||
import anthropic
|
||||
|
||||
if task:
|
||||
query = task
|
||||
elif mode == "late-rem":
|
||||
delta = observe_corpus()
|
||||
topics = delta.get("recent_topics", [])
|
||||
query = topics[0] if topics else "practice place memory making"
|
||||
elif mode == "early-rem":
|
||||
query = "career decision personal change what matters next"
|
||||
# Lucid mode: decompose the user's task into sub-queries
|
||||
prompt = (
|
||||
f"Decompose this user task into {n_queries} distinct sub-questions, "
|
||||
f"each suitable as a retrieval query against Aaron's personal corpus.\n\n"
|
||||
f"TASK: {task}\n\n"
|
||||
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
|
||||
)
|
||||
else:
|
||||
query = "research fabrication teaching practice recent work"
|
||||
mode_framings = {
|
||||
"nrem": (
|
||||
"NREM is replay-and-consolidation of RECENT traces. Generate queries "
|
||||
"that probe what Aaron has been working on or capturing in the last "
|
||||
"few days. Concrete entities — project names, course codes, named "
|
||||
"subjects. The dreamer is re-touching specific recent material to "
|
||||
"strengthen schema connections, not finding novel content."
|
||||
),
|
||||
"early-rem": (
|
||||
"Early REM is associative bridging with emotional/personal register. "
|
||||
"Generate queries that surface unresolved themes, career questions, "
|
||||
"ongoing personal threads — material that connects intellectual and "
|
||||
"emotional dimensions. Tone: thoughtful friend, not researcher."
|
||||
),
|
||||
"late-rem": (
|
||||
"Late REM tests novel connections across DISTANT material. Generate "
|
||||
"queries that pair concrete subjects from DIFFERENT domains of Aaron's "
|
||||
"work (e.g., one from academic teaching, one from consulting, one from "
|
||||
"creative practice) to probe for surprising structural similarity. "
|
||||
"Cross-domain is required."
|
||||
),
|
||||
}
|
||||
framing = mode_framings.get(mode, mode_framings["nrem"])
|
||||
questions_snippet = "\n".join(
|
||||
f" - {q[:200]}" for q in signal.get("recent_questions", [])[:8]
|
||||
) or " (no recent user questions)"
|
||||
journal_snippet = ", ".join(signal.get("new_journal_entries", [])[:5]) or "(none)"
|
||||
days_str = (
|
||||
f"{signal['days_since_dream']:.1f}"
|
||||
if signal.get("days_since_dream") not in (None, float("inf"))
|
||||
else "infinite (first dream)"
|
||||
)
|
||||
prompt = (
|
||||
f"You generate retrieval queries for an Active Inference dreamer. The "
|
||||
f"dreamer surfaces prediction errors — gaps between Aaron's model and "
|
||||
f"reality — not summaries or generic associations.\n\n"
|
||||
f"MODE: {mode}\n"
|
||||
f"FRAMING: {framing}\n\n"
|
||||
f"OBSERVATION SIGNAL:\n"
|
||||
f"- Days since last dream: {days_str}\n"
|
||||
f"- New chunks since last dream: {signal.get('new_chunks', 0)}\n"
|
||||
f"- New journal entries: {journal_snippet}\n"
|
||||
f"- Underprocessed chunks pool: {signal.get('underprocessed_count', 0):,}\n\n"
|
||||
f"RECENT USER QUESTIONS (last 14 days, top 8):\n{questions_snippet}\n\n"
|
||||
f"Generate {n_queries} retrieval queries. Requirements:\n"
|
||||
f"- Use concrete entities, named projects, course codes, specific topics "
|
||||
f"— NOT generic phrasing like 'research work practice'\n"
|
||||
f"- Each query probes a DIFFERENT corner of recent activity\n"
|
||||
f"- Match the {mode} framing\n"
|
||||
f"- 5-15 words each\n\n"
|
||||
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
|
||||
)
|
||||
|
||||
embedding = embedder.encode([query]).tolist()[0]
|
||||
chunks = []
|
||||
seen_sources = set()
|
||||
try:
|
||||
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
|
||||
resp = client.messages.create(
|
||||
model=LLM_QUERY_MODEL,
|
||||
max_tokens=512,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
)
|
||||
text = "".join(b.text for b in resp.content if hasattr(b, "text")).strip()
|
||||
if text.startswith("```"):
|
||||
text = text.split("```", 2)[1]
|
||||
if text.startswith("json"):
|
||||
text = text[4:]
|
||||
text = text.strip()
|
||||
data = json.loads(text)
|
||||
queries = data.get("queries", [])
|
||||
if isinstance(queries, list) and queries:
|
||||
return [str(q).strip() for q in queries[:n_queries] if str(q).strip()]
|
||||
except Exception as e:
|
||||
print(f"[dream] LLM query generation failed ({e}); falling back to recent questions")
|
||||
|
||||
fallback = signal.get("recent_questions", [])[:n_queries] if signal else []
|
||||
return fallback or [task or "recent activity decisions thinking"]
|
||||
|
||||
|
||||
def _mmr_select(candidate_embeddings, query_embedding, n, lambda_=MMR_LAMBDA):
|
||||
"""Maximal Marginal Relevance — greedy selection that balances relevance
|
||||
against pairwise diversity. Carbonell & Goldstein 1998. Used to prevent
|
||||
cluster lock-in (e.g., 8 dossier-narrative variants filling all 8 slots).
|
||||
|
||||
candidate_embeddings: (N, D) numpy array
|
||||
query_embedding: (D,) numpy array
|
||||
Returns: list of indices into candidate_embeddings, len ≤ n."""
|
||||
if len(candidate_embeddings) == 0:
|
||||
return []
|
||||
n = min(n, len(candidate_embeddings))
|
||||
cands = candidate_embeddings / (np.linalg.norm(candidate_embeddings, axis=1, keepdims=True) + 1e-9)
|
||||
q = query_embedding / (np.linalg.norm(query_embedding) + 1e-9)
|
||||
relevance = cands @ q
|
||||
selected = []
|
||||
remaining = list(range(len(cands)))
|
||||
while len(selected) < n and remaining:
|
||||
if not selected:
|
||||
best = max(remaining, key=lambda i: relevance[i])
|
||||
else:
|
||||
sel = cands[selected]
|
||||
scores = {
|
||||
i: lambda_ * relevance[i] - (1 - lambda_) * float((cands[i] @ sel.T).max())
|
||||
for i in remaining
|
||||
}
|
||||
best = max(scores, key=scores.get)
|
||||
selected.append(best)
|
||||
remaining.remove(best)
|
||||
return selected
|
||||
|
||||
|
||||
def _bump_consolidation_cursor(chunks):
|
||||
"""Increment consolidation_count + set last_consolidated_at=NOW() for each
|
||||
source represented in chunks. Called from dream_pipeline after NREM
|
||||
completes. Per sharp-wave-ripples biology, NREM does the actual
|
||||
consolidation; REM is associative use, so we only bump on NREM."""
|
||||
if not chunks:
|
||||
return
|
||||
sources = list({c["source"] for c in chunks if c.get("source")})
|
||||
if not sources:
|
||||
return
|
||||
try:
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
excluded_sources = excluded_sources or set()
|
||||
where, params = [], []
|
||||
if excluded_sources:
|
||||
where.append("source NOT IN %s")
|
||||
params.append(tuple(excluded_sources))
|
||||
if type_filter:
|
||||
where.append("type = ANY(%s)")
|
||||
params.append(list(type_filter))
|
||||
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
|
||||
cur.execute(f"""
|
||||
SELECT document, source, type, 1 - (embedding <=> %s::vector) as similarity
|
||||
FROM embeddings
|
||||
{where_clause}
|
||||
ORDER BY embedding <=> %s::vector
|
||||
LIMIT %s
|
||||
""", [embedding, *params, embedding, n_results * 3])
|
||||
|
||||
for doc, source, etype, similarity in cur.fetchall():
|
||||
if not (low <= similarity <= high):
|
||||
continue
|
||||
if source in seen_sources:
|
||||
continue
|
||||
chunks.append({
|
||||
"source": source or "unknown",
|
||||
"content": doc,
|
||||
"relevance": similarity,
|
||||
"similarity": similarity,
|
||||
"type": etype,
|
||||
})
|
||||
seen_sources.add(source)
|
||||
if len(chunks) >= n_results:
|
||||
break
|
||||
cur.execute(
|
||||
"UPDATE embeddings "
|
||||
"SET consolidation_count = consolidation_count + 1, "
|
||||
" last_consolidated_at = NOW() "
|
||||
"WHERE source = ANY(%s)",
|
||||
(sources,),
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
print(f"pgvector retrieval error: {e}")
|
||||
print(f"[dream] cursor bump failed (non-fatal): {e}")
|
||||
|
||||
|
||||
def retrieve(mode, task=None, n_results=8, excluded_sources=None,
|
||||
type_filter=None, signal=None):
|
||||
"""Refactored retrieval — see dreamer-design-spec.md Stage 3 + the
|
||||
external-literature prescription in birdai-dreamer-exclusion-finding-2026-05-02.md.
|
||||
|
||||
Changes from the prior hardcoded-query version:
|
||||
- Queries are LLM-generated from the observation signal (Park et al.
|
||||
reflection pattern) instead of fixed strings. Solves the "same 8 sources
|
||||
every night" failure where fixed seeds locked into one neighborhood.
|
||||
- Per-mode time windows (24-72hr NREM / 30d Early REM / 90d Late REM)
|
||||
filter candidates before vector search. Spec calls for these to be
|
||||
mutable; they live in TIME_WINDOWS_HOURS.
|
||||
- NREM biases toward under-processed chunks (low consolidation_count).
|
||||
Biologically motivated: sharp-wave ripples tag what to replay, not
|
||||
uniform sampling.
|
||||
- Multiple queries (4 by default) → over-fetch → MMR merge for
|
||||
within-night diversity. Prevents cluster domination.
|
||||
|
||||
signal is the observation-signal dict from dream_observation.observe_corpus().
|
||||
If None, observe_corpus is called inline (back-compat for ad-hoc invocation).
|
||||
"""
|
||||
# E3 substrate experiment unchanged
|
||||
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
|
||||
if substrate == "graphiti":
|
||||
return retrieve_graphiti(mode, task=task, n_results=n_results,
|
||||
excluded_sources=excluded_sources)
|
||||
|
||||
if signal is None:
|
||||
from dream_observation import observe_corpus as _obs
|
||||
signal = _obs()
|
||||
|
||||
queries = _llm_generate_queries(mode, signal, task=task, n_queries=4)
|
||||
if not queries:
|
||||
print(f"[dream:{mode}] no queries generated; bailing")
|
||||
return []
|
||||
print(f"[dream:{mode}] generated queries: {queries}")
|
||||
|
||||
embedder = _get_embedder()
|
||||
excluded_sources = excluded_sources or set()
|
||||
window_hours = TIME_WINDOWS_HOURS.get(mode)
|
||||
per_query_n = 12 # over-fetch for MMR
|
||||
|
||||
candidates = []
|
||||
seen_ids = set()
|
||||
try:
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
for q in queries:
|
||||
q_emb = embedder.encode([q]).tolist()[0]
|
||||
where, params = [], []
|
||||
if excluded_sources:
|
||||
where.append("source NOT IN %s")
|
||||
params.append(tuple(excluded_sources))
|
||||
if type_filter:
|
||||
where.append("type = ANY(%s)")
|
||||
params.append(list(type_filter))
|
||||
if window_hours is not None:
|
||||
# created_at is TEXT (legacy); cast it. NULL created_at fails
|
||||
# the comparison so legacy rows are excluded from windowed
|
||||
# modes — correct: NULL means "indexed before cursor existed,"
|
||||
# which by definition is older than any window.
|
||||
where.append(
|
||||
f"(created_at IS NOT NULL AND "
|
||||
f"created_at::timestamptz > NOW() - INTERVAL '{int(window_hours)} hours')"
|
||||
)
|
||||
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
|
||||
# NREM bias: order by consolidation_count ASC first (under-processed
|
||||
# chunks win the tiebreak before vector distance). Other modes:
|
||||
# vector distance only.
|
||||
order_clause = (
|
||||
"ORDER BY consolidation_count ASC, embedding <=> %s::vector"
|
||||
if mode == "nrem"
|
||||
else "ORDER BY embedding <=> %s::vector"
|
||||
)
|
||||
cur.execute(f"""
|
||||
SELECT id, document, source, type, embedding,
|
||||
1 - (embedding <=> %s::vector) as similarity
|
||||
FROM embeddings
|
||||
{where_clause}
|
||||
{order_clause}
|
||||
LIMIT %s
|
||||
""", [q_emb, *params, q_emb, per_query_n])
|
||||
for row in cur.fetchall():
|
||||
if row[0] in seen_ids:
|
||||
continue
|
||||
seen_ids.add(row[0])
|
||||
emb = row[4]
|
||||
# pgvector returns embeddings as string "[...]" by default
|
||||
if isinstance(emb, str):
|
||||
emb = np.array([float(x) for x in emb.strip("[]").split(",")])
|
||||
else:
|
||||
emb = np.array(emb)
|
||||
candidates.append({
|
||||
"id": row[0],
|
||||
"content": row[1],
|
||||
"source": row[2] or "unknown",
|
||||
"type": row[3],
|
||||
"embedding": emb,
|
||||
"similarity": float(row[5]),
|
||||
})
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
import traceback
|
||||
print(f"[dream:{mode}] retrieval SQL error: {e}")
|
||||
traceback.print_exc()
|
||||
return []
|
||||
|
||||
if not candidates:
|
||||
print(f"[dream:{mode}] zero candidates after filters")
|
||||
return []
|
||||
|
||||
# MMR over the union, using the first query as pivot for the relevance term.
|
||||
# Averaging query embeddings would be theoretically cleaner but adds
|
||||
# complexity for marginal benefit at this scale.
|
||||
pivot_emb = np.array(embedder.encode([queries[0]]).tolist()[0])
|
||||
cand_embs = np.array([c["embedding"] for c in candidates])
|
||||
selected_idx = _mmr_select(cand_embs, pivot_emb, n=n_results * 2)
|
||||
|
||||
# Post-MMR source-level dedup (multi-chunk same source collapses to one).
|
||||
chunks = []
|
||||
seen_sources = set()
|
||||
for i in selected_idx:
|
||||
c = candidates[i]
|
||||
if c["source"] in seen_sources:
|
||||
continue
|
||||
seen_sources.add(c["source"])
|
||||
chunks.append({
|
||||
"source": c["source"],
|
||||
"content": c["content"],
|
||||
"relevance": c["similarity"],
|
||||
"similarity": c["similarity"],
|
||||
"type": c["type"],
|
||||
})
|
||||
if len(chunks) >= n_results:
|
||||
break
|
||||
|
||||
return chunks
|
||||
|
||||
@@ -496,6 +740,12 @@ def dream_pipeline(type_filter=None):
|
||||
"""
|
||||
Full nightly pipeline — interdependent stages.
|
||||
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
|
||||
|
||||
Per dreamer-design-spec.md, this now runs Stage 1 (observe) and Stage 2
|
||||
(select) first. If select_mode returns None — corpus unchanged and no new
|
||||
journal entry — the dreamer goes quiet rather than manufacturing novelty.
|
||||
Otherwise NREM/Early-REM/Late-REM run with LLM-generated queries seeded
|
||||
from the observation signal.
|
||||
"""
|
||||
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
||||
|
||||
@@ -503,21 +753,47 @@ def dream_pipeline(type_filter=None):
|
||||
state.pop("retrieved_sources", None) # legacy key; session-scoped novelty now
|
||||
session_retrieved = set()
|
||||
|
||||
delta = observe_corpus()
|
||||
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
|
||||
print("Novelty: session-scoped (no across-night exclusion)")
|
||||
# ── Stage 1 + 2: Observe + Select ──────────────────────────────────────
|
||||
from dream_observation import observe_corpus as _obs, select_mode as _select
|
||||
signal = _obs()
|
||||
print(
|
||||
f"Signal: new_chunks={signal['new_chunks']}, "
|
||||
f"new_journal={len(signal['new_journal_entries'])}, "
|
||||
f"days_since={signal['days_since_dream']:.1f}, "
|
||||
f"underprocessed={signal['underprocessed_count']:,}"
|
||||
)
|
||||
selected = _select(signal)
|
||||
if selected is None:
|
||||
print("[select_mode] None — nothing worth dreaming about tonight (going quiet)")
|
||||
# Update last-dream-attempted-at but not last_dream — caller can distinguish
|
||||
# an actual dream from a skipped night by looking at last_dream_file or
|
||||
# checking the manifest dir.
|
||||
state["last_select_quiet_at"] = datetime.now().isoformat()
|
||||
save_dreamer_state(state)
|
||||
return None
|
||||
print(f"[select_mode] → {selected}")
|
||||
|
||||
# ── Stage 1: NREM ──────────────────────────────────────────────────────
|
||||
# The pipeline always runs all three modes for the manifest's continuity.
|
||||
# select_mode's choice signals the *primary* focus; the others still run
|
||||
# but draw from their own mode-appropriate windows.
|
||||
primary_mode = selected
|
||||
|
||||
# ── Stage 3: NREM ──────────────────────────────────────────────────────
|
||||
print("\n[NREM] Retrieving...")
|
||||
# NREM is replay-and-consolidation — does not exclude prior traces.
|
||||
# Late REM and Early REM exclude prior content for novelty; NREM does not.
|
||||
nrem_chunks = retrieve("nrem", excluded_sources=None, type_filter=type_filter)
|
||||
nrem_chunks = retrieve("nrem", excluded_sources=None,
|
||||
type_filter=type_filter, signal=signal)
|
||||
session_retrieved.update(c["source"] for c in nrem_chunks)
|
||||
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
|
||||
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
|
||||
if not nrem_chunks:
|
||||
print("[NREM] No suitable chunks — aborting pipeline")
|
||||
return None
|
||||
# Cursor bump: NREM is the consolidation stage. Each appearance increments
|
||||
# consolidation_count + updates last_consolidated_at, so the next dream's
|
||||
# observation sees these sources as less under-processed.
|
||||
_bump_consolidation_cursor(nrem_chunks)
|
||||
|
||||
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
|
||||
nrem_output = synthesize_nrem(nrem_chunks)
|
||||
@@ -528,7 +804,7 @@ def dream_pipeline(type_filter=None):
|
||||
"nrem": {
|
||||
"chunks_retrieved": len(nrem_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
|
||||
"query": "research fabrication teaching practice recent work",
|
||||
"query": "[llm-generated from observation signal]",
|
||||
"word_count": len(nrem_output.split()),
|
||||
"sources": nrem_sources,
|
||||
"distinct_folders": nrem_folders,
|
||||
@@ -546,7 +822,8 @@ def dream_pipeline(type_filter=None):
|
||||
print("\n[Early REM] Retrieving...")
|
||||
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
|
||||
# Sources that scored in Early REM band during NREM remain available
|
||||
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, type_filter=type_filter)
|
||||
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources,
|
||||
type_filter=type_filter, signal=signal)
|
||||
session_retrieved.update(c["source"] for c in early_chunks)
|
||||
if not early_chunks:
|
||||
print("[Early REM] No suitable chunks — skipping")
|
||||
@@ -560,7 +837,7 @@ def dream_pipeline(type_filter=None):
|
||||
stage_data["early_rem"] = {
|
||||
"chunks_retrieved": len(early_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
|
||||
"query": "career decision personal change what matters next",
|
||||
"query": "[llm-generated from observation signal]",
|
||||
"word_count": len(early_rem_output.split()),
|
||||
"sources": early_sources,
|
||||
"distinct_folders": early_folders,
|
||||
@@ -572,7 +849,8 @@ def dream_pipeline(type_filter=None):
|
||||
|
||||
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
|
||||
print("\n[Late REM] Retrieving...")
|
||||
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, type_filter=type_filter)
|
||||
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved,
|
||||
type_filter=type_filter, signal=signal)
|
||||
session_retrieved.update(c["source"] for c in late_chunks)
|
||||
if not late_chunks:
|
||||
print("[Late REM] No suitable chunks — skipping")
|
||||
@@ -591,7 +869,7 @@ def dream_pipeline(type_filter=None):
|
||||
stage_data["late_rem"] = {
|
||||
"chunks_retrieved": len(late_chunks),
|
||||
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
|
||||
"query": "practice place memory making",
|
||||
"query": "[llm-generated from observation signal]",
|
||||
"word_count": len(late_rem_output.split()),
|
||||
"sources": late_sources,
|
||||
"distinct_folders": list(set(late_folders)),
|
||||
|
||||
@@ -0,0 +1,235 @@
|
||||
"""
|
||||
Dreamer Stages 1 + 2 — Observe and Select.
|
||||
|
||||
Implements `dreamer-design-spec.md`'s Stage 1 (observe_corpus) and Stage 2
|
||||
(select_mode). These have been latent in dream.py — observe_corpus existed
|
||||
in skeletal form but its output was largely unused; select_mode did not
|
||||
exist at all. The dreamer always ran all stages with hardcoded queries.
|
||||
|
||||
Per spec (lines 27–34 of dreamer-design-spec.md):
|
||||
delta = observe_corpus()
|
||||
selected_mode = select_mode(delta, task, project)
|
||||
if selected_mode is None:
|
||||
return # nothing worth dreaming
|
||||
|
||||
The "returns None — dreamer goes quiet rather than manufacturing novelty"
|
||||
semantics (spec line 67) is the canonical answer to the repetition problem
|
||||
documented in birdai-dreamer-exclusion-finding-2026-05-02.md.
|
||||
|
||||
Grounded in:
|
||||
- Active Inference (Friston 2010, 2017) — observe error, choose action that
|
||||
minimizes free energy. The dreamer is a prediction-error machine; observe
|
||||
what's diverged from the model, dream about that.
|
||||
- Sleep stages (Stickgold 2005; Walker 2017; Diekelberg & Born 2010) — NREM
|
||||
for replay of new traces, REM for associative cross-cluster integration.
|
||||
- Sharp-wave ripples (Buzsáki, Wilson) — biology tags WHAT to replay
|
||||
(under-processed chunks); not uniform. Implemented via the consolidation
|
||||
cursor on the embeddings table.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import psycopg2
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
# ─── Paths ──────────────────────────────────────────────────────────────────
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
|
||||
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
|
||||
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
|
||||
JOURNAL_DAILY = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
|
||||
|
||||
# ─── Thresholds ─────────────────────────────────────────────────────────────
|
||||
# Per spec, these become settings-panel controls eventually. For now they're
|
||||
# constants here; moving them to a config module is task #48.
|
||||
|
||||
NEW_CHUNK_THRESHOLD = 5 # below this, NREM not warranted on novelty alone
|
||||
STALENESS_TRIGGER_DAYS = 3 # corpus quiet ≥3 days → Late REM ("shake things loose")
|
||||
QUESTION_LOOKBACK_DAYS = 14 # spec line 61: "the last 14 days"
|
||||
UNDERPROCESSED_PERCENTILE = 0.25 # bottom quartile of consolidation_count
|
||||
|
||||
|
||||
# ─── Helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def _load_json(path, default):
|
||||
try:
|
||||
return json.loads(Path(path).read_text())
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
|
||||
def _recent_user_questions(days=QUESTION_LOOKBACK_DAYS, limit=20):
|
||||
"""Pull recent user-turn content from conversations.db. The spec calls
|
||||
these 'live questions' — what Aaron has been asking about. They become
|
||||
seed material for the REM modes."""
|
||||
try:
|
||||
conn = sqlite3.connect(CONVERSATIONS_DB)
|
||||
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT m.content FROM messages m
|
||||
JOIN conversations c ON m.conversation_id = c.id
|
||||
WHERE m.role = 'user' AND c.updated_at > ?
|
||||
ORDER BY m.timestamp DESC LIMIT ?
|
||||
""",
|
||||
(cutoff, limit),
|
||||
)
|
||||
rows = cur.fetchall()
|
||||
conn.close()
|
||||
return [r[0][:280] for r in rows]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _new_journal_entries(since_ts):
|
||||
"""Files in Journal/Daily/ created or modified since the last dream.
|
||||
Journal entries with emotional/personal register route to Early REM per
|
||||
the spec (line 71)."""
|
||||
journal_path = Path(JOURNAL_DAILY)
|
||||
if not journal_path.exists():
|
||||
return []
|
||||
new = []
|
||||
for p in journal_path.rglob("*.md"):
|
||||
try:
|
||||
if p.stat().st_mtime > since_ts:
|
||||
new.append(str(p.relative_to(journal_path)))
|
||||
except OSError:
|
||||
continue
|
||||
return new
|
||||
|
||||
|
||||
def _new_chunks_count(since_ts):
|
||||
"""Files in the watcher state with mtime > last_dream. The spec calls
|
||||
this 'what changed' (line 58). Used as the NREM novelty signal."""
|
||||
state = _load_json(WATCHER_STATE, {})
|
||||
count = 0
|
||||
for _path, mtime in state.items():
|
||||
try:
|
||||
if float(mtime) > since_ts:
|
||||
count += 1
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
return count
|
||||
|
||||
|
||||
def _underprocessed_chunk_count():
|
||||
"""Chunks below the underprocessed percentile by consolidation_count.
|
||||
Biologically motivated: sharp-wave ripples bias replay toward novel /
|
||||
under-encoded experience, not uniform sampling. We give NREM a pool of
|
||||
'least-replayed' chunks to draw from in Stage 3."""
|
||||
try:
|
||||
pg = _get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
WITH t AS (
|
||||
SELECT percentile_cont(%s) WITHIN GROUP (ORDER BY consolidation_count)
|
||||
AS threshold
|
||||
FROM embeddings
|
||||
)
|
||||
SELECT COUNT(*) FROM embeddings, t
|
||||
WHERE consolidation_count <= t.threshold
|
||||
""",
|
||||
(UNDERPROCESSED_PERCENTILE,),
|
||||
)
|
||||
result = cur.fetchone()[0]
|
||||
pg.close()
|
||||
return int(result or 0)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
# ─── Stage 1: observe_corpus ────────────────────────────────────────────────
|
||||
|
||||
def observe_corpus():
|
||||
"""Build the signal vector consumed by select_mode and (downstream) by
|
||||
retrieve. Concrete observations only — no interpretation. Each key is
|
||||
a direct measurement from the corpus, watcher, journal, or conversation
|
||||
log.
|
||||
|
||||
Returns a dict with:
|
||||
now_ts -- current Unix timestamp
|
||||
last_dream_ts -- last completed dream timestamp (0 if never)
|
||||
days_since_dream -- float; inf if never dreamed
|
||||
new_chunks -- count of files newer than last_dream
|
||||
new_journal_entries -- list of Journal/Daily/*.md filenames since last_dream
|
||||
recent_questions -- user-turn content from last 14 days
|
||||
underprocessed_count -- chunks in the bottom 25% by consolidation_count
|
||||
"""
|
||||
state = _load_json(DREAMER_STATE, {})
|
||||
last_dream_ts = float(state.get("last_dream_timestamp", 0) or 0)
|
||||
now_ts = datetime.now().timestamp()
|
||||
|
||||
return {
|
||||
"now_ts": now_ts,
|
||||
"last_dream_ts": last_dream_ts,
|
||||
"days_since_dream": (now_ts - last_dream_ts) / 86400 if last_dream_ts else float("inf"),
|
||||
"new_chunks": _new_chunks_count(last_dream_ts),
|
||||
"new_journal_entries": _new_journal_entries(last_dream_ts),
|
||||
"recent_questions": _recent_user_questions(),
|
||||
"underprocessed_count": _underprocessed_chunk_count(),
|
||||
}
|
||||
|
||||
|
||||
# ─── Stage 2: select_mode ───────────────────────────────────────────────────
|
||||
|
||||
def select_mode(signal, task=None, explicit_mode=None):
|
||||
"""Return one of {'nrem', 'early-rem', 'late-rem', 'lucid'}. Never None.
|
||||
|
||||
The dreamer fires every scheduled night. The earlier "go quiet on null
|
||||
delta" rule was a synthesis-doc invention that didn't match the actual
|
||||
desired UX — the original dreamer always dreamed, even if it repeated
|
||||
itself. The cure for repetition lives in the retrieve layer
|
||||
(LLM-generated queries from the observation signal, MMR diversity,
|
||||
cursor bias toward under-processed chunks), not in skipping nights.
|
||||
|
||||
Routing logic:
|
||||
- explicit_mode argument wins
|
||||
- task supplied → 'lucid' (question-anchored)
|
||||
- days_since_dream ≥ STALENESS_TRIGGER_DAYS → 'late-rem' (shake loose
|
||||
via cross-domain pairs when nothing's been added in a while)
|
||||
- new journal entry → 'early-rem' (emotional/personal register)
|
||||
- default → 'nrem' (replay-and-consolidation; always has something to
|
||||
do because the corpus always has under-processed chunks)
|
||||
"""
|
||||
if explicit_mode:
|
||||
return explicit_mode
|
||||
if task:
|
||||
return "lucid"
|
||||
|
||||
days_since = signal["days_since_dream"]
|
||||
new_journal = signal["new_journal_entries"]
|
||||
|
||||
if days_since >= STALENESS_TRIGGER_DAYS:
|
||||
return "late-rem"
|
||||
|
||||
if new_journal:
|
||||
return "early-rem"
|
||||
|
||||
return "nrem"
|
||||
|
||||
|
||||
# ─── CLI for manual inspection ──────────────────────────────────────────────
|
||||
|
||||
if __name__ == "__main__":
|
||||
signal = observe_corpus()
|
||||
short = {k: v for k, v in signal.items() if k != "recent_questions"}
|
||||
print("Signal (excluding recent_questions):")
|
||||
print(json.dumps(short, indent=2, default=str))
|
||||
print(f"\nRecent user questions ({len(signal['recent_questions'])}):")
|
||||
for q in signal["recent_questions"][:5]:
|
||||
print(f" - {q[:140]}")
|
||||
mode = select_mode(signal)
|
||||
print(f"\nselect_mode() → {mode!r}")
|
||||
@@ -0,0 +1,136 @@
|
||||
"""
|
||||
Orientation Indexer — feeds Stage 2's document-level orientations into pgvector
|
||||
so they're searchable alongside chunk text by the retrieve_documents tool.
|
||||
|
||||
Each completed row in stage_3_queue has an `orientation` string (active_frames
|
||||
+ frame_relationships + extraction_orientation + one_sentence_summary) that
|
||||
describes the document at a conceptual level. Indexing it as its own row in
|
||||
the embeddings table gives the cross-encoder a second surface to rank against
|
||||
— "what is this document about" rather than just "what does this chunk say."
|
||||
|
||||
This worker is part of the "read-only Graphiti + orientation-into-pgvector"
|
||||
plan B that replaced the Stage 3 → Graphiti write path. The graph layer is
|
||||
queried directly via the search_facts chat tool; orientations land here.
|
||||
|
||||
State tracking: a row is considered indexed if the embeddings table already
|
||||
holds a row with source=<source> and metadata->>'kind'='orientation'. The
|
||||
worker is idempotent — restart-safe, resumable.
|
||||
|
||||
Runs as systemd: aaronai-orientation-indexer.service
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
import psycopg2
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from encoding import write_embeddings_batch
|
||||
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
EMBED_MODEL = "all-MiniLM-L6-v2"
|
||||
BATCH_SIZE = 25
|
||||
POLL_INTERVAL_SECS = 30
|
||||
LOG_FILE = "/var/log/aaronai/orientation-indexer.log"
|
||||
HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat"
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s",
|
||||
handlers=[logging.FileHandler(LOG_FILE, mode="a")],
|
||||
)
|
||||
log = logging.getLogger("orientation-indexer")
|
||||
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def fetch_unindexed(cur, limit):
|
||||
"""Pull stage_3_queue rows with a non-null orientation whose orientation
|
||||
hasn't been written to the embeddings table yet."""
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT s.source, s.orientation
|
||||
FROM stage_3_queue s
|
||||
WHERE s.orientation IS NOT NULL
|
||||
AND NOT EXISTS (
|
||||
SELECT 1 FROM embeddings e
|
||||
WHERE e.source = s.source
|
||||
AND e.metadata->>'kind' = 'orientation'
|
||||
)
|
||||
ORDER BY s.enqueued_at
|
||||
LIMIT %s
|
||||
""",
|
||||
(limit,),
|
||||
)
|
||||
return cur.fetchall()
|
||||
|
||||
|
||||
def _row_for(source: str, orientation: str, embedding) -> dict:
|
||||
"""Build an embeddings row for the orientation. id is deterministic so
|
||||
re-runs don't create duplicates if the unique check above ever races."""
|
||||
import hashlib
|
||||
chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient"
|
||||
return {
|
||||
"id": chunk_id,
|
||||
"document": orientation,
|
||||
"embedding": embedding,
|
||||
"source": source,
|
||||
"type": "document",
|
||||
"metadata": {
|
||||
"source": source,
|
||||
"kind": "orientation",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def write_heartbeat():
|
||||
try:
|
||||
Path(HEARTBEAT_FILE).write_text(str(time.time()))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
log.info("Orientation indexer starting...")
|
||||
log.info(f"Loading embedding model: {EMBED_MODEL}")
|
||||
embedder = SentenceTransformer(EMBED_MODEL)
|
||||
log.info("Embedding model ready.")
|
||||
|
||||
while True:
|
||||
write_heartbeat()
|
||||
try:
|
||||
pg = get_pg()
|
||||
try:
|
||||
cur = pg.cursor()
|
||||
rows = fetch_unindexed(cur, BATCH_SIZE)
|
||||
if not rows:
|
||||
pg.close()
|
||||
time.sleep(POLL_INTERVAL_SECS)
|
||||
continue
|
||||
|
||||
orientations = [r[1] for r in rows]
|
||||
embeddings = embedder.encode(orientations).tolist()
|
||||
batch = [
|
||||
_row_for(source, orient, emb)
|
||||
for (source, orient), emb in zip(rows, embeddings)
|
||||
]
|
||||
write_embeddings_batch(pg, batch)
|
||||
log.info(f"Indexed {len(batch)} orientation(s)")
|
||||
finally:
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
log.error(f"Indexing loop iteration failed: {e}")
|
||||
time.sleep(POLL_INTERVAL_SECS)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user