diff --git a/scripts/dream.py b/scripts/dream.py index 373a3f3..45c4d02 100644 --- a/scripts/dream.py +++ b/scripts/dream.py @@ -16,6 +16,7 @@ import os import json import sqlite3 import argparse +from collections import Counter from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv @@ -282,9 +283,11 @@ def retrieve_graphiti(mode, task=None, n_results=8, excluded_sources=None): print(f"[Graphiti retrieval error: {e}] — falling back to empty.") return [] -def retrieve(mode, task=None, n_results=8, excluded_sources=None): +def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None): # E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search # Default behavior: pgvector similarity search (unchanged) + # type_filter is experimental and applies to pgvector retrieval only — Graphiti + # facts are not embeddings rows and have no embeddings.type to filter on. substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") if substrate == "graphiti": return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources) @@ -311,23 +314,23 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None): pg = get_pg() cur = pg.cursor() excluded_sources = excluded_sources or set() + where, params = [], [] if excluded_sources: - cur.execute(""" - SELECT document, source, 1 - (embedding <=> %s::vector) as similarity - FROM embeddings - WHERE source NOT IN %s - ORDER BY embedding <=> %s::vector - LIMIT %s - """, (embedding, tuple(excluded_sources), embedding, n_results * 3)) - else: - cur.execute(""" - SELECT document, source, 1 - (embedding <=> %s::vector) as similarity - FROM embeddings - ORDER BY embedding <=> %s::vector - LIMIT %s - """, (embedding, embedding, n_results * 3)) + where.append("source NOT IN %s") + params.append(tuple(excluded_sources)) + if type_filter: + where.append("type = ANY(%s)") + params.append(list(type_filter)) + where_clause = ("WHERE " + " AND ".join(where)) if where else "" + cur.execute(f""" + SELECT document, source, type, 1 - (embedding <=> %s::vector) as similarity + FROM embeddings + {where_clause} + ORDER BY embedding <=> %s::vector + LIMIT %s + """, [embedding, *params, embedding, n_results * 3]) - for doc, source, similarity in cur.fetchall(): + for doc, source, etype, similarity in cur.fetchall(): if not (low <= similarity <= high): continue if source in seen_sources: @@ -337,6 +340,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None): "content": doc, "relevance": similarity, "similarity": similarity, + "type": etype, }) seen_sources.add(source) if len(chunks) >= n_results: @@ -482,7 +486,7 @@ def write_manifest(date_str, stage_data, corpus_data): print(f"Manifest write failed (non-critical): {e}") -def dream_pipeline(): +def dream_pipeline(type_filter=None): """ Full nightly pipeline — interdependent stages. NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis. @@ -501,7 +505,7 @@ def dream_pipeline(): print("\n[NREM] Retrieving...") # NREM is replay-and-consolidation — does not exclude prior traces. # Late REM and Early REM exclude prior content for novelty; NREM does not. - nrem_chunks = retrieve("nrem", excluded_sources=None) + nrem_chunks = retrieve("nrem", excluded_sources=None, type_filter=type_filter) session_retrieved.update(c["source"] for c in nrem_chunks) # Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55} @@ -523,6 +527,10 @@ def dream_pipeline(): "sources": nrem_sources, "distinct_folders": nrem_folders, "folder_count": len(nrem_folders), + # Counter filters None: Graphiti chunks lack `type` (facts, not embeddings rows). + # Pgvector chunks always carry type post-Improvement-#2 backfill. If type + # ever appears as None here, the backfill or writer enforcement has regressed. + "type_distribution": dict(Counter(c.get("type") for c in nrem_chunks if c.get("type"))), "status": "ok", } } @@ -532,7 +540,7 @@ def dream_pipeline(): print("\n[Early REM] Retrieving...") # Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved) # Sources that scored in Early REM band during NREM remain available - early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources) + early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, type_filter=type_filter) session_retrieved.update(c["source"] for c in early_chunks) if not early_chunks: print("[Early REM] No suitable chunks — skipping") @@ -551,13 +559,14 @@ def dream_pipeline(): "sources": early_sources, "distinct_folders": early_folders, "folder_count": len(early_folders), + "type_distribution": dict(Counter(c.get("type") for c in early_chunks if c.get("type"))), "status": "ok", } print(f"[Early REM] Done.\n{early_rem_output[:200]}...") # ── Stage 3: Late REM — informed by NREM + Early REM ────────────────── print("\n[Late REM] Retrieving...") - late_chunks = retrieve("late-rem", excluded_sources=session_retrieved) + late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, type_filter=type_filter) session_retrieved.update(c["source"] for c in late_chunks) if not late_chunks: print("[Late REM] No suitable chunks — skipping") @@ -582,6 +591,7 @@ def dream_pipeline(): "distinct_folders": list(set(late_folders)), "folder_count": len(set(late_folders)), "cross_domain_pairs": cross_domain_pairs, + "type_distribution": dict(Counter(c.get("type") for c in late_chunks if c.get("type"))), "status": "ok", } print(f"[Late REM] Done.\n{late_rem_output[:200]}...") @@ -628,10 +638,10 @@ def dream_pipeline(): return synthesis_file -def dream_lucid(task): +def dream_lucid(task, type_filter=None): """On-demand lucid dream — single mode, used by Dream Now in settings.""" print(f"Lucid dream starting — task: {task[:80] if task else 'none'}") - chunks = retrieve("lucid", task=task) + chunks = retrieve("lucid", task=task, type_filter=type_filter) if not chunks: print("No suitable chunks — aborting") return None @@ -653,13 +663,13 @@ def dream_lucid(task): return filepath -def dream_single(mode, task=None): +def dream_single(mode, task=None, type_filter=None): """ Single mode — used by Dream Now for non-lucid modes. Runs one stage independently (for testing/tuning individual stages). """ print(f"Single mode dream: {mode}") - chunks = retrieve(mode, task=task) + chunks = retrieve(mode, task=task, type_filter=type_filter) if not chunks: print("No suitable chunks — aborting") return None @@ -696,12 +706,19 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="Aaron AI Dreamer") parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"]) parser.add_argument("--task", type=str) + parser.add_argument( + "--type-filter", type=str, default=None, + help="Comma-separated embeddings.type allowlist (e.g. 'document,aaronai_conversation'). " + "Applies to pgvector retrieval only; Graphiti chunks are not filtered. " + "Experimental — default is no filter, no behavior change.", + ) args = parser.parse_args() + type_filter = [t.strip() for t in args.type_filter.split(",")] if args.type_filter else None if args.mode == "lucid": - dream_lucid(args.task or "What should I be thinking about that I am not?") + dream_lucid(args.task or "What should I be thinking about that I am not?", type_filter=type_filter) elif args.mode and args.mode != "pipeline": - dream_single(args.mode, args.task) + dream_single(args.mode, args.task, type_filter=type_filter) else: # Default: full pipeline - dream_pipeline() + dream_pipeline(type_filter=type_filter) diff --git a/scripts/encoding.py b/scripts/encoding.py index 7b91fa4..1c0c7ff 100644 --- a/scripts/encoding.py +++ b/scripts/encoding.py @@ -101,11 +101,24 @@ def chunk_and_embed(text: str, def write_embeddings_batch(conn, batch: list[dict]) -> int: - """Single canonical INSERT. Sets created_at = NOW() server-side. Commits.""" + """Single canonical INSERT. Sets created_at = NOW() server-side. Commits. + + Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so + callers do not need to provide it. The application-layer assertion is the + primary enforcement point for type — the column lacks NOT NULL because + historical NULLs were resolved by the Improvement #2 backfill, and a + Python-level raise gives a faster, more debuggable failure than a + Postgres constraint error. + """ if not batch: return 0 cur = conn.cursor() for row in batch: + if not row.get("type"): + raise ValueError( + f"row {row.get('id')!r} missing 'type'; writers must supply it " + f"(see Improvement #2 in docs/birdai-component-inventory)" + ) cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) @@ -113,6 +126,8 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int: document = EXCLUDED.document, embedding = EXCLUDED.embedding, source = EXCLUDED.source, + type = EXCLUDED.type, + created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at), metadata = EXCLUDED.metadata """, (row["id"], row["document"], row["embedding"], row["source"], row["type"], json.dumps(row["metadata"]))) diff --git a/scripts/ingest_conversations.py b/scripts/ingest_conversations.py index 2bd51cb..3ad7100 100644 --- a/scripts/ingest_conversations.py +++ b/scripts/ingest_conversations.py @@ -126,6 +126,15 @@ def run(): embeddings = embedder.encode(texts, show_progress_bar=False).tolist() for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings): + if not meta.get("type"): + raise ValueError( + f"chunk {chunk_id!r} missing 'type'; writers must supply it " + f"(see Improvement #2 in docs/birdai-component-inventory)" + ) + # ON CONFLICT below intentionally overwrites created_at (unlike encoding.py's + # COALESCE): an Aaron-AI conversation's created_at tracks convo.updated_at, + # which advances on activity. Re-running this script on an active conv + # should refresh the timestamp, not preserve the first-seen one. cur.execute(""" INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) VALUES (%s, %s, %s::vector, %s, %s, %s, %s)