embeddings: enforce type/created_at on writers; manifests carry type_distribution (Improvement #2 part B+C)

Writers now enforce type and created_at:
  - encoding.py: ValueError raised at write_embeddings_batch if row dict lacks
    'type'. created_at remains SQL-supplied (NOW() server-side). ON CONFLICT
    DO UPDATE now also rewrites type=EXCLUDED.type and preserves the original
    created_at via COALESCE(embeddings.created_at, EXCLUDED.created_at) — a
    re-ingest re-classifies type but does not overwrite a backfilled mtime.
  - ingest_conversations.py: same assertion. ON CONFLICT intentionally keeps
    EXCLUDED.created_at semantics (Aaron-AI conversation created_at tracks
    convo.updated_at; re-runs should refresh).
  - Column-level NOT NULL is not added; application-layer raise gives a
    faster, more debuggable failure than a Postgres constraint error.

Retrieval propagates type into chunks:
  - retrieve() SELECT now includes type; chunk dicts carry "type": etype.
  - WHERE clause built dynamically from excluded_sources and the new
    --type-filter CLI arg (experimental, default None, pgvector retrieval
    only — Graphiti chunks have no embeddings.type to filter on).
  - retrieve_graphiti unchanged; its chunks lack the type field.

Manifests carry type_distribution per stage:
  - dream_pipeline writes stage_data[<stage>]["type_distribution"] for nrem,
    early_rem, late_rem — a Counter over chunk types, filtering None so
    Graphiti chunks (when DREAMER_SUBSTRATE=graphiti) don't pollute the
    distribution. Pgvector chunks always carry type post-backfill; if None
    appears, the backfill or writer enforcement has regressed.

Verification:
  B1 force re-ingest of "Finite and infinite games -- James Carse.pdf":
       all 84 chunks preserved created_at=2026-04-27T06:11:55Z
  B2 missing-type assertion raises ValueError, no row leaked to embeddings
  B3 ast.parse(*) clean; EXPLAIN renders for {no excl/no filter,
       type_filter only, excl 2 elems, excl 1 elem edge case, both};
       all five plans use HNSW index scan with correct Filter clauses
  C1 retrieve("nrem") returns 8 chunks each carrying "type" key
  C2 type_distribution = {'document': 5, 'chatgpt_conversation': 3} —
       2 distinct types, 62.5/37.5 split (looser bar: >=2 types,
       no single type >=90%)

The type and created_at fields are now load-bearing: every dream manifest
emits type_distribution per stage. Reverting the backfill makes the
distribution show NULLs at every dream run.
This commit is contained in:
2026-05-04 00:15:43 +00:00
parent 3c7c228db0
commit 7c7b649775
3 changed files with 69 additions and 28 deletions
+44 -27
View File
@@ -16,6 +16,7 @@ import os
import json import json
import sqlite3 import sqlite3
import argparse import argparse
from collections import Counter
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -282,9 +283,11 @@ def retrieve_graphiti(mode, task=None, n_results=8, excluded_sources=None):
print(f"[Graphiti retrieval error: {e}] — falling back to empty.") print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return [] return []
def retrieve(mode, task=None, n_results=8, excluded_sources=None): def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search # E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged) # Default behavior: pgvector similarity search (unchanged)
# type_filter is experimental and applies to pgvector retrieval only — Graphiti
# facts are not embeddings rows and have no embeddings.type to filter on.
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti": if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources) return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources)
@@ -311,23 +314,23 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
pg = get_pg() pg = get_pg()
cur = pg.cursor() cur = pg.cursor()
excluded_sources = excluded_sources or set() excluded_sources = excluded_sources or set()
where, params = [], []
if excluded_sources: if excluded_sources:
cur.execute(""" where.append("source NOT IN %s")
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity params.append(tuple(excluded_sources))
FROM embeddings if type_filter:
WHERE source NOT IN %s where.append("type = ANY(%s)")
ORDER BY embedding <=> %s::vector params.append(list(type_filter))
LIMIT %s where_clause = ("WHERE " + " AND ".join(where)) if where else ""
""", (embedding, tuple(excluded_sources), embedding, n_results * 3)) cur.execute(f"""
else: SELECT document, source, type, 1 - (embedding <=> %s::vector) as similarity
cur.execute(""" FROM embeddings
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity {where_clause}
FROM embeddings ORDER BY embedding <=> %s::vector
ORDER BY embedding <=> %s::vector LIMIT %s
LIMIT %s """, [embedding, *params, embedding, n_results * 3])
""", (embedding, embedding, n_results * 3))
for doc, source, similarity in cur.fetchall(): for doc, source, etype, similarity in cur.fetchall():
if not (low <= similarity <= high): if not (low <= similarity <= high):
continue continue
if source in seen_sources: if source in seen_sources:
@@ -337,6 +340,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None):
"content": doc, "content": doc,
"relevance": similarity, "relevance": similarity,
"similarity": similarity, "similarity": similarity,
"type": etype,
}) })
seen_sources.add(source) seen_sources.add(source)
if len(chunks) >= n_results: if len(chunks) >= n_results:
@@ -482,7 +486,7 @@ def write_manifest(date_str, stage_data, corpus_data):
print(f"Manifest write failed (non-critical): {e}") print(f"Manifest write failed (non-critical): {e}")
def dream_pipeline(): def dream_pipeline(type_filter=None):
""" """
Full nightly pipeline — interdependent stages. Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis. NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
@@ -501,7 +505,7 @@ def dream_pipeline():
print("\n[NREM] Retrieving...") print("\n[NREM] Retrieving...")
# NREM is replay-and-consolidation — does not exclude prior traces. # NREM is replay-and-consolidation — does not exclude prior traces.
# Late REM and Early REM exclude prior content for novelty; NREM does not. # Late REM and Early REM exclude prior content for novelty; NREM does not.
nrem_chunks = retrieve("nrem", excluded_sources=None) nrem_chunks = retrieve("nrem", excluded_sources=None, type_filter=type_filter)
session_retrieved.update(c["source"] for c in nrem_chunks) session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude # Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55} nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
@@ -523,6 +527,10 @@ def dream_pipeline():
"sources": nrem_sources, "sources": nrem_sources,
"distinct_folders": nrem_folders, "distinct_folders": nrem_folders,
"folder_count": len(nrem_folders), "folder_count": len(nrem_folders),
# Counter filters None: Graphiti chunks lack `type` (facts, not embeddings rows).
# Pgvector chunks always carry type post-Improvement-#2 backfill. If type
# ever appears as None here, the backfill or writer enforcement has regressed.
"type_distribution": dict(Counter(c.get("type") for c in nrem_chunks if c.get("type"))),
"status": "ok", "status": "ok",
} }
} }
@@ -532,7 +540,7 @@ def dream_pipeline():
print("\n[Early REM] Retrieving...") print("\n[Early REM] Retrieving...")
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved) # Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available # Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources) early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, type_filter=type_filter)
session_retrieved.update(c["source"] for c in early_chunks) session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks: if not early_chunks:
print("[Early REM] No suitable chunks — skipping") print("[Early REM] No suitable chunks — skipping")
@@ -551,13 +559,14 @@ def dream_pipeline():
"sources": early_sources, "sources": early_sources,
"distinct_folders": early_folders, "distinct_folders": early_folders,
"folder_count": len(early_folders), "folder_count": len(early_folders),
"type_distribution": dict(Counter(c.get("type") for c in early_chunks if c.get("type"))),
"status": "ok", "status": "ok",
} }
print(f"[Early REM] Done.\n{early_rem_output[:200]}...") print(f"[Early REM] Done.\n{early_rem_output[:200]}...")
# ── Stage 3: Late REM — informed by NREM + Early REM ────────────────── # ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...") print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved) late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, type_filter=type_filter)
session_retrieved.update(c["source"] for c in late_chunks) session_retrieved.update(c["source"] for c in late_chunks)
if not late_chunks: if not late_chunks:
print("[Late REM] No suitable chunks — skipping") print("[Late REM] No suitable chunks — skipping")
@@ -582,6 +591,7 @@ def dream_pipeline():
"distinct_folders": list(set(late_folders)), "distinct_folders": list(set(late_folders)),
"folder_count": len(set(late_folders)), "folder_count": len(set(late_folders)),
"cross_domain_pairs": cross_domain_pairs, "cross_domain_pairs": cross_domain_pairs,
"type_distribution": dict(Counter(c.get("type") for c in late_chunks if c.get("type"))),
"status": "ok", "status": "ok",
} }
print(f"[Late REM] Done.\n{late_rem_output[:200]}...") print(f"[Late REM] Done.\n{late_rem_output[:200]}...")
@@ -628,10 +638,10 @@ def dream_pipeline():
return synthesis_file return synthesis_file
def dream_lucid(task): def dream_lucid(task, type_filter=None):
"""On-demand lucid dream — single mode, used by Dream Now in settings.""" """On-demand lucid dream — single mode, used by Dream Now in settings."""
print(f"Lucid dream starting — task: {task[:80] if task else 'none'}") print(f"Lucid dream starting — task: {task[:80] if task else 'none'}")
chunks = retrieve("lucid", task=task) chunks = retrieve("lucid", task=task, type_filter=type_filter)
if not chunks: if not chunks:
print("No suitable chunks — aborting") print("No suitable chunks — aborting")
return None return None
@@ -653,13 +663,13 @@ def dream_lucid(task):
return filepath return filepath
def dream_single(mode, task=None): def dream_single(mode, task=None, type_filter=None):
""" """
Single mode — used by Dream Now for non-lucid modes. Single mode — used by Dream Now for non-lucid modes.
Runs one stage independently (for testing/tuning individual stages). Runs one stage independently (for testing/tuning individual stages).
""" """
print(f"Single mode dream: {mode}") print(f"Single mode dream: {mode}")
chunks = retrieve(mode, task=task) chunks = retrieve(mode, task=task, type_filter=type_filter)
if not chunks: if not chunks:
print("No suitable chunks — aborting") print("No suitable chunks — aborting")
return None return None
@@ -696,12 +706,19 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Aaron AI Dreamer") parser = argparse.ArgumentParser(description="Aaron AI Dreamer")
parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"]) parser.add_argument("--mode", choices=["nrem", "early-rem", "late-rem", "lucid", "pipeline"])
parser.add_argument("--task", type=str) parser.add_argument("--task", type=str)
parser.add_argument(
"--type-filter", type=str, default=None,
help="Comma-separated embeddings.type allowlist (e.g. 'document,aaronai_conversation'). "
"Applies to pgvector retrieval only; Graphiti chunks are not filtered. "
"Experimental — default is no filter, no behavior change.",
)
args = parser.parse_args() args = parser.parse_args()
type_filter = [t.strip() for t in args.type_filter.split(",")] if args.type_filter else None
if args.mode == "lucid": if args.mode == "lucid":
dream_lucid(args.task or "What should I be thinking about that I am not?") dream_lucid(args.task or "What should I be thinking about that I am not?", type_filter=type_filter)
elif args.mode and args.mode != "pipeline": elif args.mode and args.mode != "pipeline":
dream_single(args.mode, args.task) dream_single(args.mode, args.task, type_filter=type_filter)
else: else:
# Default: full pipeline # Default: full pipeline
dream_pipeline() dream_pipeline(type_filter=type_filter)
+16 -1
View File
@@ -101,11 +101,24 @@ def chunk_and_embed(text: str,
def write_embeddings_batch(conn, batch: list[dict]) -> int: def write_embeddings_batch(conn, batch: list[dict]) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits.""" """Single canonical INSERT. Sets created_at = NOW() server-side. Commits.
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
callers do not need to provide it. The application-layer assertion is the
primary enforcement point for type — the column lacks NOT NULL because
historical NULLs were resolved by the Improvement #2 backfill, and a
Python-level raise gives a faster, more debuggable failure than a
Postgres constraint error.
"""
if not batch: if not batch:
return 0 return 0
cur = conn.cursor() cur = conn.cursor()
for row in batch: for row in batch:
if not row.get("type"):
raise ValueError(
f"row {row.get('id')!r} missing 'type'; writers must supply it "
f"(see Improvement #2 in docs/birdai-component-inventory)"
)
cur.execute(""" cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s) VALUES (%s, %s, %s::vector, %s, %s, NOW(), %s)
@@ -113,6 +126,8 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
document = EXCLUDED.document, document = EXCLUDED.document,
embedding = EXCLUDED.embedding, embedding = EXCLUDED.embedding,
source = EXCLUDED.source, source = EXCLUDED.source,
type = EXCLUDED.type,
created_at = COALESCE(embeddings.created_at, EXCLUDED.created_at),
metadata = EXCLUDED.metadata metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"], """, (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"]))) row["source"], row["type"], json.dumps(row["metadata"])))
+9
View File
@@ -126,6 +126,15 @@ def run():
embeddings = embedder.encode(texts, show_progress_bar=False).tolist() embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings): for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
if not meta.get("type"):
raise ValueError(
f"chunk {chunk_id!r} missing 'type'; writers must supply it "
f"(see Improvement #2 in docs/birdai-component-inventory)"
)
# ON CONFLICT below intentionally overwrites created_at (unlike encoding.py's
# COALESCE): an Aaron-AI conversation's created_at tracks convo.updated_at,
# which advances on activity. Re-running this script on an active conv
# should refresh the timestamp, not preserve the first-seen one.
cur.execute(""" cur.execute("""
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata) INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
VALUES (%s, %s, %s::vector, %s, %s, %s, %s) VALUES (%s, %s, %s::vector, %s, %s, %s, %s)