Compare commits

..

10 Commits

Author SHA1 Message Date
aaron 5582549321 dream_observation: drop the 'go quiet' rule from select_mode
The earlier behavior never went quiet — it dreamed every night, even when
that meant repeating itself. The 'return None on null delta' rule was a
synthesis-doc invention (the dreamer-design-spec.md I treated as
authoritative is itself LLM-generated) that didn't match the actual
desired UX. Aaron called this out.

The repetition problem the quiet rule was claimed to solve is already
addressed in the retrieve layer:
- LLM-generated queries from the observation signal vary nightly
- MMR diversity prevents within-night cluster lock-in
- NREM bias toward under-processed chunks (low consolidation_count)
  ensures fresh material gets selected over recently-replayed material

So select_mode now always returns a mode. NREM is the default. Staleness
still routes to Late REM at 3+ days for cross-domain variety. Journal
entries still route to Early REM.
2026-05-22 23:49:27 +00:00
aaron 3ec9a48151 dream_observation: reorder select_mode so 3-day staleness wins over the quiet rule
Bug: the previous order checked the "nothing changed → return None" rule
first, so the spec's "corpus unchanged 3+ days → Late REM (shake things
loose)" branch could never fire. Stasis was permanent — quiet would just
keep returning None forever as long as no new chunks or journals appeared,
regardless of how stale the corpus got.

Fix: check staleness first. Quiet remains the default within the 1-2-day
window the spec implicitly grants for the dreamer to "go quiet rather than
manufacturing novelty." At day 3+, Late REM fires automatically — the
spec's mechanism for breaking out of the silence when the corpus isn't
delivering new material.

Observed symptom that triggered this: dreamer fired 2026-05-21 08:00 and
2026-05-22 08:00, both went quiet. Real cause was no new content (which
is correct quiet behavior for days 1-2), but the bug would have made it
stay quiet indefinitely had we not fixed it before day 3.
2026-05-22 23:18:00 +00:00
aaron 9d09d3fa14 api.py: flush=True on graphiti-push log lines
The background daemon thread that pushes chat turns to Graphiti was using
default-buffered print(), so the success/failure lines never reached the
systemd journal — buffer never flushed because the thread keeps the
interpreter alive. The push itself worked (verified by Episodic nodes
appearing in the graph), just the log was silent.

Surgical fix: pass flush=True on the four print() calls inside _push_chat_turn_
to_graphiti's background worker. Now every push result lands in the journal
as it happens, giving real-time visibility into whether pushes are
succeeding, failing on non-200, hitting a network error, or raising
unexpectedly.

If we add more background-thread logging later, PYTHONUNBUFFERED=1 in the
service environment would solve it globally — but that's overkill for this
one site.
2026-05-20 22:41:02 +00:00
aaron f185ed60cb dream.py: Stage 3+ refactor — LLM-generated queries, MMR, mutable windows, consolidation cursor
Implements the rest of dreamer-design-spec.md's Stage 3 alongside the
prescriptions from the external literature review:

- Hardcoded seed query strings are gone. _llm_generate_queries() produces
  4 mode-appropriate retrieval queries per call from the observation signal
  (Park et al. 2023 reflection pattern). NREM queries probe RECENT additions;
  Early REM bridges associative/emotional threads; Late REM forces cross-
  domain pairs; Lucid decomposes the task. Empirical first-run output:
  queries like "SUNY New Paltz Fall 2026 registration moratorium" instead of
  the fixed "research fabrication teaching practice recent work" — vector
  neighborhood now drifts with what the user has been actually doing.

- TIME_WINDOWS_HOURS makes per-mode retrieval windows mutable
  (dreamer-multimodal-design.md §2's tech-debt item): NREM 72hr / Early REM
  30d / Late REM 90d / Lucid no-window. NULL created_at rows are excluded
  from windowed modes — correct since they predate the cursor by definition.

- NREM bias toward under-processed chunks via "ORDER BY consolidation_count
  ASC" before vector distance. Biologically motivated: sharp-wave-ripple
  replay is tagged/biased, not uniform. Chunks that haven't been replayed
  recently win the tiebreak.

- MMR merge (Carbonell & Goldstein 1998) over the union of all queries'
  candidates. λ=0.5. Directly attacks the cluster-dominance failure mode
  where 8 dossier-narrative variants filled all 8 slots in 5 consecutive
  nights.

- _bump_consolidation_cursor() called after NREM completes. Each source
  used gets consolidation_count += 1 and last_consolidated_at = NOW().
  Tomorrow's signal sees these as more-processed, less under-processed.

- dream_pipeline now runs observe_corpus + select_mode at the top per spec
  lines 27-34. If select_mode returns None — corpus unchanged + no new
  journal entry — pipeline exits with no dream rather than manufacturing
  novelty (spec line 67's "dreamer goes quiet").

Back-compat preserved:
- retrieve()'s signature gains `signal` as optional kwarg; default behavior
  calls observe_corpus() inline so dream_single / dream_lucid keep working
  unchanged.
- Graphiti substrate (E3 experiment) path untouched.
- Manifest schema keeps the "query" field; value is now
  "[llm-generated from observation signal]" so historical manifest
  consumers don't break.
2026-05-20 18:11:07 +00:00
aaron a4735053c2 backfill_consolidation_cursor.py: populate cursor from historical dream manifests
One-off script. Walks Journal/Dreams/dream-manifest-*.json and increments
consolidation_count + sets last_consolidated_at for every (manifest, source)
pair. Idempotent — resets the cursor for any touched sources before
backfilling, so reruns don't double-count.

First run: 7547 embeddings rows updated across 105 unique sources, 416
(source, manifest_date) pairs across all manifests. Distribution now: 422
chunks at count=18 (the dominant dossier-narrative cluster that fills every
NREM in the last 18 days), long tail down to count=1, 12,011 still at 0.

This makes dream_observation.underprocessed_count meaningful — before, all
counts were 0 so the bottom-quartile percentile was 0 and the signal was
degenerate. After, the signal correctly identifies the 12k chunks that have
never been replayed.
2026-05-20 18:04:43 +00:00
aaron f682d8c6a0 dream_observation.py: Stage 1 + 2 of the design spec — observe and select
Implements `dreamer-design-spec.md` lines 27-74: observe_corpus() returns a
signal vector (new_chunks delta, new_journal_entries, recent_questions over
14-day window, days_since_dream, underprocessed_count derived from the new
consolidation cursor); select_mode() returns one of {nrem, early-rem,
late-rem, lucid} or None per the spec's rules. The None return is the spec's
canonical answer to the repetition problem (line 67) — "dreamer goes quiet
rather than manufacturing novelty."

Standalone for now. Not wired into dream_pipeline yet — that happens in the
retrieve() refactor (task #46). dream.py is unchanged in this commit.

Grounded sources cited in module docstring: Friston Active Inference, sleep
research (Stickgold/Walker/Diekelberg & Born), sharp-wave ripples (Buzsáki).
All three appear in BirdAI-Bibliography.md.

Migration prerequisite (already shipped in the prior commit): consolidation
cursor columns last_consolidated_at + consolidation_count added to
embeddings. Backfill from dream-manifest history is task #49.
2026-05-20 17:57:38 +00:00
aaron 151c756b89 api.py: async chat-turn push to Graphiti
After chat() returns, fire-and-forget background thread POSTs the (user
message + assistant response) as one episode to /episodes. Default extraction
(Sonnet). Errors logged, never raised — chat is not gated on the write.

Wall-clock cost in the background is ~20 min per episode against the
current ~4,300-entity graph. The chat experience is unaffected; the graph
catches up with a delay. Search_facts queries reflect new turns once the
sidecar has finished processing them.

Kill-switch: SKIP_GRAPHITI_CHAT_PUSH=1 in the api service environment
disables the push without code changes. Useful if dedup contention surfaces
under sustained load.

Companions to this commit: search_facts tool (e96bf40), orientation indexer
worker (e96bf40), FalkorDB vector index patches (d2ec20e, 313c0f0).
2026-05-20 05:08:07 +00:00
aaron e96bf40b2f plan B: search_facts chat tool + orientation indexer (read-only Graphiti)
After establishing that single-episode Graphiti writes take ~20 min against
the existing graph (the dedup loop is structurally slow regardless of the
patches, the bridge, or the LLM model), the salvage plan is to stop trying
to write to Graphiti and instead:

  1. Use the existing 4,300-entity graph as a read-only fact layer at chat
     time via a new search_facts tool. Graphiti's /search endpoint is fast
     (~15ms direct, ~400ms over HTTP); the graph is stale-as-of-early-May
     but covers most biographical / relational content that "write me a bio"
     and similar queries care about.

  2. Pipe Stage 2's document-level orientations into pgvector via a new
     orientation_indexer worker. Stage 2 already runs and writes orientation
     text to stage_3_queue for every Mistral-processed document; the worker
     reads those, embeds them, and writes one row per source to embeddings
     with metadata->>'kind'='orientation'. retrieve_documents now ranks
     against both chunk text and document-level concept summaries.

Idempotent: the indexer's "is this already indexed" check is an EXISTS
subquery against embeddings, so restarts and partial runs are safe.

Out of scope (deliberately): no Graphiti writes from chat, no Stage 2 ->
Graphiti bridge, no draining the 711-item stage_3_queue backlog into
Graphiti. Rich-extraction posture stays a BirdAI concern.
2026-05-20 05:00:03 +00:00
aaron 313c0f0341 graphiti_service.py: bridge driver._search_ops to driver.search_interface
graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops in
FalkorDriver.__init__ but never assigns it to driver.search_interface.
search_utils.py dispatches on search_interface; without this one-line bridge
it falls back to interpreted-Cypher cosine math doing full table scans for
every entity dedup similarity check.

Combined with the vendored patches in graphiti_patches/ (restored in the
previous commit d2ec20e), this activates FalkorDB's native vector index for
the dedup similarity path. Empirical impact (per the original f645b74 commit
message): single-episode add_episode against a ~4,277-entity graph went from
indefinite hang to ~8.2 seconds.

Surgical restore: cherry-picks only the bridge code from f645b74 — not the
Pattern 1 async job model, not the v2.4 extraction instructions, neither of
which we want. Default extraction posture (taxonomy-naïve) stays the
operating mode. Rich-extraction story remains a BirdAI concern.
2026-05-20 04:06:46 +00:00
aaron d2ec20e373 graphiti_patches: vendored FalkorDB vector index support for graphiti-core 0.29.0
Adds native FalkorDB vector index support to graphiti-core's FalkorDB
driver. Three patched files (graph_queries.py, falkordb_driver.py,
falkordb/operations/search_ops.py) plus apply.sh that backs up venv
files and copies patches over.

Why this exists: graphiti-core 0.29.0 builds similarity queries using
interpreted Cypher cosine math (vec.cosineDistance) which produces a
full-table scan over Entity/RELATES_TO/Community nodes for every search.
At ~4,000+ entities, single-episode add_episode took 8+ minutes for the
resolve-against-existing-graph step and bulk ingest hung indefinitely.
FalkorDB itself supports db.idx.vector.queryNodes and queryRelationships
procedures backed by HNSW indexes; the driver just doesn't use them.

Patches:

1. graph_queries.py — adds get_vector_indices() returning CREATE VECTOR
   INDEX statements for FalkorDB (Entity.name_embedding,
   RELATES_TO.fact_embedding, Community.name_embedding). HNSW with
   cosine similarity. Adds VECTOR_INDEX_CANDIDATE_MULTIPLIER for
   over-fetch when WHERE filters reject some top-k results. Original
   get_vector_cosine_func_query preserved for fallback.

2. falkordb_driver.py — extends build_indices_and_constraints() to call
   get_vector_indices() alongside range and fulltext. Adds cache
   invalidation hook so the search_ops dispatcher re-probes for indexes
   after they're built.

3. falkordb/operations/search_ops.py — adds vector-index dispatcher
   helpers (_falkordb_vector_index_exists with module-level cache,
   _falkordb_vector_node_search_cypher, _falkordb_vector_edge_search_cypher).
   Rewrites the three vector-similarity call sites (Entity.name_embedding,
   RELATES_TO.fact_embedding, Community.name_embedding) to use
   db.idx.vector.queryNodes / queryRelationships when available, fall
   back to interpreted-Cypher cosine math when not. Index existence
   probed once per (label, attribute, entity_type) and cached.

Empirical result: single-episode add_episode against a 4,277-entity
graph went from indefinite hang to 8.2 seconds. Bulk re-ingest of
already-known content (worst case for entity dedup) committed in 60ms.

Activation requires bridging driver._search_ops to driver.search_interface
in the sidecar (see graphiti_service.py). graphiti-core declares
search_interface as the dispatcher attribute but never assigns the
per-driver implementation to it — naming mismatch in their internal
refactor. The bridge is one line in our sidecar's lifespan.

Upstream candidate: this is a known gap (referenced indirectly in
upstream issue #1263 RFC for external vector store overlay). Maintainers'
attention is on Milvus/Qdrant/Pinecone overlay; this is the FalkorDB-
native alternative for users who don't want to run a separate vector DB.
PR after empirical validation in production. Apache-2.0 graphiti-core
source is NOT vendored — backups/ is gitignored to keep the upstream
source out of this repo.
2026-05-20 04:04:24 +00:00
12 changed files with 2712 additions and 65 deletions
+4
View File
@@ -0,0 +1,4 @@
# Local backups created by apply.sh — environment state, not source.
# Keeping these out of version control prevents repo bloat and avoids
# checking in graphiti-core's Apache-2.0 source under our repo's tree.
backups/
+58
View File
@@ -0,0 +1,58 @@
# graphiti-core Patches — FalkorDB Vector Index Support
Vendored patches against graphiti-core 0.29.0 adding native FalkorDB
vector index support. Three files modified, all under
`graphiti_core/driver/falkordb/` and `graphiti_core/graph_queries.py`.
No changes to Neo4j or Kuzu code paths.
## Why this exists
graphiti-core's FalkorDB driver uses interpreted Cypher cosine math
(`vec.cosineDistance(...)`) for similarity search. Each query becomes a
full table scan over Entity/RELATES_TO/Community nodes. At ~4,000+
entities, single-episode ingest's resolve-against-existing-graph step
takes 8+ minutes and bulk ingest hangs FalkorDB. FalkorDB itself
supports `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
procedures backed by HNSW indexes; graphiti-core's driver doesn't use
them.
These patches:
1. Add `get_vector_indices()` to `graph_queries.py` returning CREATE
VECTOR INDEX statements for FalkorDB on Entity.name_embedding,
RELATES_TO.fact_embedding, and Community.name_embedding.
2. Extend `falkordb_driver.py:build_indices_and_constraints()` to create
the vector indexes alongside range and fulltext indexes.
3. Rewrite the three vector-similarity call sites in
`falkordb/operations/search_ops.py` to use
`db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
instead of full-scan cosine math. Over-fetches by a configurable
multiplier to handle filter rejections.
## Files
| Patched file | Source |
|---|---|
| `graphiti_core/graph_queries.py` | Adds `get_vector_indices()` |
| `graphiti_core/driver/falkordb/falkordb_driver.py` | Extends `build_indices_and_constraints` |
| `graphiti_core/driver/falkordb/operations/search_ops.py` | Three query rewrites |
## How to apply
`./apply.sh` — backs up the originals into `./backups/<timestamp>/`
and copies the patched files over.
## How to revert
Move the timestamped backup back over the venv:
cp backups/<ts>/graph_queries.py /home/aaron/aaronai/venv/lib/python3.12/site-packages/graphiti_core/graph_queries.py
# ...etc
## Upstream candidate
Documented gap (issue #1263 references it indirectly via vector store
overlay RFC). Maintainers' attention is on Milvus/external vector DB
overlay; this patch is the FalkorDB-native alternative for users who
don't want a separate vector DB. Consider PR after empirical validation
in production.
+77
View File
@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# apply.sh — Apply the BirdAI vendored graphiti-core patches.
#
# Backs up the original venv files into ./backups/<timestamp>/ before
# overwriting. The backup directory layout mirrors the venv layout so a
# revert is just a tree copy back.
#
# Usage: ./apply.sh
set -euo pipefail
PATCH_DIR="$(cd "$(dirname "$0")" && pwd)"
VENV_BASE="/home/aaron/aaronai/venv/lib/python3.12/site-packages"
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
BACKUP_DIR="$PATCH_DIR/backups/$TIMESTAMP"
# Files to patch — paths relative to graphiti_core/.
FILES=(
"graph_queries.py"
"driver/falkordb_driver.py"
"driver/falkordb/operations/search_ops.py"
)
echo "graphiti-core vendored patch apply — BirdAI"
echo "Patch directory: $PATCH_DIR"
echo "Venv target: $VENV_BASE/graphiti_core/"
echo "Backup to: $BACKUP_DIR"
echo
# Pre-flight: confirm all source patch files exist.
for rel in "${FILES[@]}"; do
if [ ! -f "$PATCH_DIR/graphiti_core/$rel" ]; then
echo "ERROR: missing patch file: $PATCH_DIR/graphiti_core/$rel" >&2
exit 1
fi
done
# Pre-flight: confirm all target venv files exist.
for rel in "${FILES[@]}"; do
if [ ! -f "$VENV_BASE/graphiti_core/$rel" ]; then
echo "ERROR: missing venv file: $VENV_BASE/graphiti_core/$rel" >&2
echo " graphiti-core may not be installed, or version differs from 0.29.0." >&2
exit 1
fi
done
# Backup originals.
echo "[1/3] Backing up originals..."
for rel in "${FILES[@]}"; do
backup_path="$BACKUP_DIR/graphiti_core/$rel"
mkdir -p "$(dirname "$backup_path")"
cp "$VENV_BASE/graphiti_core/$rel" "$backup_path"
echo " backed up: $rel"
done
echo
# Apply patches by copying.
echo "[2/3] Applying patches..."
for rel in "${FILES[@]}"; do
cp "$PATCH_DIR/graphiti_core/$rel" "$VENV_BASE/graphiti_core/$rel"
echo " patched: $rel"
done
echo
# Sanity check: confirm patched files have the marker.
echo "[3/3] Verifying patched files..."
for rel in "${FILES[@]}"; do
if grep -q "PATCHED 2026-05-02" "$VENV_BASE/graphiti_core/$rel"; then
echo " OK: $rel contains patch marker"
else
echo " WARNING: $rel missing patch marker (may be expected for graph_queries.py — its docstring uses the marker only in the module header)"
fi
done
echo
echo "Done. Backup: $BACKUP_DIR"
echo "Restart the sidecar to pick up changes:"
echo " sudo systemctl restart aaronai-graphiti.service"
@@ -0,0 +1,904 @@
"""
Copyright 2024, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from typing import Any
from graphiti_core.driver.driver import GraphProvider
from graphiti_core.driver.falkordb import STOPWORDS
from graphiti_core.driver.operations.search_ops import SearchOperations
from graphiti_core.driver.query_executor import QueryExecutor
from graphiti_core.driver.record_parsers import (
community_node_from_record,
entity_edge_from_record,
entity_node_from_record,
episodic_node_from_record,
)
from graphiti_core.edges import EntityEdge
from graphiti_core.graph_queries import (
get_nodes_query,
get_relationships_query,
get_vector_cosine_func_query,
)
from graphiti_core.models.edges.edge_db_queries import get_entity_edge_return_query
from graphiti_core.models.nodes.node_db_queries import (
COMMUNITY_NODE_RETURN,
EPISODIC_NODE_RETURN,
get_entity_node_return_query,
)
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
from graphiti_core.search.search_filters import (
SearchFilters,
edge_search_filter_query_constructor,
node_search_filter_query_constructor,
)
logger = logging.getLogger(__name__)
MAX_QUERY_LENGTH = 128
# ---------------------------------------------------------------------------
# Vector index dispatcher (PATCHED 2026-05-02, BirdAI vendored patch).
#
# graphiti-core's FalkorDB driver historically composed similarity queries
# using `vec.cosineDistance(...)` in interpreted Cypher, which produces a
# full-table scan for every search. FalkorDB supports native vector indexes
# via `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`;
# this dispatcher uses them when present and falls back to the cosine math
# otherwise.
#
# Index existence is checked once per (label, attribute, entity_type) and
# cached at module scope. The cache should be invalidated whenever
# `build_indices_and_constraints` runs (since indexes may have been created
# or dropped). FalkorDriver.build_indices_and_constraints is patched to
# call `_invalidate_falkordb_vector_index_cache()` after building.
#
# Over-fetch factor (VECTOR_INDEX_CANDIDATE_MULTIPLIER from graph_queries)
# preserves recall when WHERE filters reject some of the top-k candidates.
# ---------------------------------------------------------------------------
from graphiti_core.graph_queries import (
VECTOR_INDEX_CANDIDATE_MULTIPLIER,
get_vector_cosine_func_query,
)
# Cache: key = (label, attribute, entity_type), value = bool
# entity_type is 'NODE' or 'RELATIONSHIP'.
_FALKORDB_VECTOR_INDEX_CACHE: dict[tuple[str, str, str], bool] = {}
def _invalidate_falkordb_vector_index_cache() -> None:
"""Clear the vector-index existence cache. Call after build_indices_and_constraints."""
_FALKORDB_VECTOR_INDEX_CACHE.clear()
async def _falkordb_vector_index_exists(
executor: QueryExecutor,
label: str,
attribute: str,
entity_type: str,
) -> bool:
"""Check whether a FalkorDB vector index exists for the given target.
entity_type is 'NODE' for node-label indexes, 'RELATIONSHIP' for edge-type indexes.
Result is cached at module scope; call _invalidate_falkordb_vector_index_cache()
after building or dropping indexes.
"""
key = (label, attribute, entity_type)
if key in _FALKORDB_VECTOR_INDEX_CACHE:
return _FALKORDB_VECTOR_INDEX_CACHE[key]
try:
records, _, _ = await executor.execute_query(
"CALL db.indexes() YIELD label, properties, types, entitytype "
"RETURN label, properties, types, entitytype"
)
except Exception as e:
# If we cannot enumerate indexes, fall back to "no index" rather than
# propagating the error. The fallback cosine-math path is correct,
# just slower.
logger.warning(f"FalkorDB vector index probe failed; assuming none exist: {e}")
_FALKORDB_VECTOR_INDEX_CACHE[key] = False
return False
found = False
for r in records:
# Records come back as dict-like rows keyed by column name (not
# tuples). Access by string keys matching the YIELD clause above.
rec_label = r.get('label') if hasattr(r, 'get') else r['label']
rec_props = r.get('properties') if hasattr(r, 'get') else r['properties']
rec_types = r.get('types') if hasattr(r, 'get') else r['types']
rec_entitytype = r.get('entitytype') if hasattr(r, 'get') else r['entitytype']
if rec_props is None:
rec_props = []
if rec_types is None:
rec_types = {}
if rec_label != label:
continue
if rec_entitytype is not None and rec_entitytype != entity_type:
continue
if attribute not in rec_props:
continue
# rec_types is a dict like {attribute: ['VECTOR', ...], ...} or sometimes
# a flat list — handle both shapes.
if isinstance(rec_types, dict):
attr_types = rec_types.get(attribute, [])
else:
attr_types = rec_types
if 'VECTOR' in attr_types:
found = True
break
_FALKORDB_VECTOR_INDEX_CACHE[key] = found
return found
def _falkordb_vector_node_search_cypher(
label: str,
embedding_attr: str,
search_vector_param: str,
use_index: bool,
) -> tuple[str, str]:
"""Build the cypher prefix and node-binding for a node-vector search.
Returns (prefix, node_var) where:
- prefix is the Cypher fragment that binds the node variable and a
`score` variable. With index, it's a CALL ... YIELD; without, it's
a MATCH plus WITH cosine math.
- node_var is the variable name the caller's downstream Cypher should
reference (always 'n' here for parity with the existing code).
The caller appends WHERE filters and RETURN/ORDER BY/LIMIT as usual.
The over-fetch parameter `$candidate_k` must be passed by the caller
when use_index is True.
"""
if use_index:
return (
f"CALL db.idx.vector.queryNodes("
f"'{label}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
f") YIELD node, score "
f"WITH node AS n, score "
), "n"
# Fallback: original cosine math path
cosine = get_vector_cosine_func_query(
f"n.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
)
return (
f"MATCH (n:{label}) "
f"WITH n, {cosine} AS score "
), "n"
def _falkordb_vector_edge_search_cypher(
relationship_type: str,
embedding_attr: str,
search_vector_param: str,
use_index: bool,
) -> tuple[str, str]:
"""Build the cypher prefix and edge-binding for an edge-vector search.
Returns (prefix, edge_var). With the index, the procedure binds the
relationship variable; we then MATCH source and target via the existing
edge to recover (n)-[e]->(m). Without the index, it's the original
MATCH-and-cosine path.
Variable name is 'e' for parity with existing code; source/target are
'n' and 'm' respectively, also for parity.
"""
if use_index:
return (
f"CALL db.idx.vector.queryRelationships("
f"'{relationship_type}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
f") YIELD relationship, score "
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
f"WHERE e = relationship "
f"WITH DISTINCT e, n, m, score "
), "e"
# Fallback
cosine = get_vector_cosine_func_query(
f"e.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
)
return (
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
f"WITH DISTINCT e, n, m, {cosine} AS score "
), "e"
# FalkorDB separator characters that break text into tokens
_SEPARATOR_MAP = str.maketrans(
{
',': ' ',
'.': ' ',
'<': ' ',
'>': ' ',
'{': ' ',
'}': ' ',
'[': ' ',
']': ' ',
'"': ' ',
"'": ' ',
':': ' ',
';': ' ',
'!': ' ',
'@': ' ',
'#': ' ',
'$': ' ',
'%': ' ',
'^': ' ',
'&': ' ',
'*': ' ',
'(': ' ',
')': ' ',
'-': ' ',
'+': ' ',
'=': ' ',
'~': ' ',
'?': ' ',
'|': ' ',
'/': ' ',
'\\': ' ',
}
)
def _sanitize(query: str) -> str:
"""Replace FalkorDB special characters with whitespace."""
sanitized = query.translate(_SEPARATOR_MAP)
return ' '.join(sanitized.split())
def _build_falkor_fulltext_query(
query: str,
group_ids: list[str] | None = None,
max_query_length: int = MAX_QUERY_LENGTH,
) -> str:
"""Build a fulltext query string for FalkorDB using RedisSearch syntax."""
if group_ids is None or len(group_ids) == 0:
group_filter = ''
else:
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
group_values = '|'.join(escaped_group_ids)
group_filter = f'(@group_id:{group_values})'
sanitized_query = _sanitize(query)
# Remove stopwords and empty tokens
query_words = sanitized_query.split()
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
sanitized_query = ' | '.join(filtered_words)
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
return ''
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
class FalkorSearchOperations(SearchOperations):
# --- Node search ---
async def node_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
get_nodes_query(
'node_name_and_summary', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ 'YIELD node AS n, score'
+ filter_query
+ """
WITH n, score
ORDER BY score DESC
LIMIT $limit
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
)
records, _, _ = await executor.execute_query(
cypher,
query=fuzzy_query,
limit=limit,
**filter_params,
)
return [entity_node_from_record(r) for r in records]
async def node_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[EntityNode]:
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index when available; fall back to interpreted-Cypher cosine math
# otherwise. The filter clause's position changes between paths
# (after MATCH for fallback, after YIELD for index path), but the
# filter expressions themselves are identical because they reference
# the bound variable `n` either way.
use_index = await _falkordb_vector_index_exists(
executor, 'Entity', 'name_embedding', 'NODE'
)
prefix, _ = _falkordb_vector_node_search_cypher(
'Entity', 'name_embedding', '$search_vector', use_index
)
where_clauses = []
if filter_query:
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**filter_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [entity_node_from_record(r) for r in records]
async def node_bfs_search(
self,
executor: QueryExecutor,
origin_uuids: list[str],
search_filter: SearchFilters,
max_depth: int,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityNode]:
if not origin_uuids or max_depth < 1:
return []
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_queries.append('origin.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' AND ' + (' AND '.join(filter_queries))
cypher = (
f"""
UNWIND $bfs_origin_node_uuids AS origin_uuid
MATCH (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(n:Entity)
WHERE n.group_id = origin.group_id
"""
+ filter_query
+ """
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
+ """
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
bfs_origin_node_uuids=origin_uuids,
limit=limit,
**filter_params,
)
return [entity_node_from_record(r) for r in records]
# --- Edge search ---
async def edge_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityEdge]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
get_relationships_query(
'edge_name_and_fact', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD relationship AS rel, score
MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
"""
+ filter_query
+ """
WITH e, score, n, m
RETURN
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
query=fuzzy_query,
limit=limit,
**filter_params,
)
return [entity_edge_from_record(r) for r in records]
async def edge_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
source_node_uuid: str | None,
target_node_uuid: str | None,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[EntityEdge]:
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
if source_node_uuid is not None:
filter_params['source_uuid'] = source_node_uuid
filter_queries.append('n.uuid = $source_uuid')
if target_node_uuid is not None:
filter_params['target_uuid'] = target_node_uuid
filter_queries.append('m.uuid = $target_uuid')
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index on RELATES_TO.fact_embedding when available. The unindexed
# fallback is the same MATCH-and-cosine math that previously hung
# for 6+ minutes on a 4,000-entity graph; this is the load-bearing
# call site that motivated the patch.
use_index = await _falkordb_vector_index_exists(
executor, 'RELATES_TO', 'fact_embedding', 'RELATIONSHIP'
)
prefix, _ = _falkordb_vector_edge_search_cypher(
'RELATES_TO', 'fact_embedding', '$search_vector', use_index
)
where_clauses = []
if filter_query:
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**filter_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [entity_edge_from_record(r) for r in records]
async def edge_bfs_search(
self,
executor: QueryExecutor,
origin_uuids: list[str],
max_depth: int,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityEdge]:
if not origin_uuids:
return []
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
f"""
UNWIND $bfs_origin_node_uuids AS origin_uuid
MATCH path = (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(:Entity)
UNWIND relationships(path) AS rel
MATCH (n:Entity)-[e:RELATES_TO {{uuid: rel.uuid}}]-(m:Entity)
"""
+ filter_query
+ """
RETURN DISTINCT
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
bfs_origin_node_uuids=origin_uuids,
depth=max_depth,
limit=limit,
**filter_params,
)
return [entity_edge_from_record(r) for r in records]
# --- Episode search ---
async def episode_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters, # noqa: ARG002
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EpisodicNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query += '\nAND e.group_id IN $group_ids'
filter_params['group_ids'] = group_ids
cypher = (
get_nodes_query(
'episode_content', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD node AS episode, score
MATCH (e:Episodic)
WHERE e.uuid = episode.uuid
"""
+ group_filter_query
+ """
RETURN
"""
+ EPISODIC_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher, query=fuzzy_query, limit=limit, **filter_params
)
return [episodic_node_from_record(r) for r in records]
# --- Community search ---
async def community_fulltext_search(
self,
executor: QueryExecutor,
query: str,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[CommunityNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query = 'WHERE c.group_id IN $group_ids'
filter_params['group_ids'] = group_ids
cypher = (
get_nodes_query(
'community_name', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD node AS c, score
WITH c, score
"""
+ group_filter_query
+ """
RETURN
"""
+ COMMUNITY_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher, query=fuzzy_query, limit=limit, **filter_params
)
return [community_node_from_record(r) for r in records]
async def community_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[CommunityNode]:
query_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query += ' WHERE c.group_id IN $group_ids'
query_params['group_ids'] = group_ids
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index on Community.name_embedding when available. Note: the existing
# filter is built into `group_filter_query` (already prefixed with
# ' WHERE ' if non-empty) and uses variable `c`. The dispatcher binds
# the node as `n` for parity with the helper signature, then we
# re-bind to `c` via WITH so the rest of the query is unchanged.
use_index = await _falkordb_vector_index_exists(
executor, 'Community', 'name_embedding', 'NODE'
)
prefix, _ = _falkordb_vector_node_search_cypher(
'Community', 'name_embedding', '$search_vector', use_index
)
prefix = prefix + ' WITH n AS c, score '
where_clauses = []
if group_filter_query:
where_clauses.append(group_filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ COMMUNITY_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**query_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [community_node_from_record(r) for r in records]
# --- Rerankers ---
async def node_distance_reranker(
self,
executor: QueryExecutor,
node_uuids: list[str],
center_node_uuid: str,
min_score: float = 0,
) -> list[EntityNode]:
filtered_uuids = [u for u in node_uuids if u != center_node_uuid]
scores: dict[str, float] = {center_node_uuid: 0.0}
cypher = """
UNWIND $node_uuids AS node_uuid
MATCH (center:Entity {uuid: $center_uuid})-[:RELATES_TO]-(n:Entity {uuid: node_uuid})
RETURN 1 AS score, node_uuid AS uuid
"""
results, _, _ = await executor.execute_query(
cypher,
node_uuids=filtered_uuids,
center_uuid=center_node_uuid,
)
for result in results:
scores[result['uuid']] = result['score']
for uuid in filtered_uuids:
if uuid not in scores:
scores[uuid] = float('inf')
filtered_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
if center_node_uuid in node_uuids:
scores[center_node_uuid] = 0.1
filtered_uuids = [center_node_uuid] + filtered_uuids
reranked_uuids = [u for u in filtered_uuids if (1 / scores[u]) >= min_score]
if not reranked_uuids:
return []
get_query = """
MATCH (n:Entity)
WHERE n.uuid IN $uuids
RETURN
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
return [node_map[u] for u in reranked_uuids if u in node_map]
async def episode_mentions_reranker(
self,
executor: QueryExecutor,
node_uuids: list[str],
min_score: float = 0,
) -> list[EntityNode]:
if not node_uuids:
return []
scores: dict[str, float] = {}
results, _, _ = await executor.execute_query(
"""
UNWIND $node_uuids AS node_uuid
MATCH (episode:Episodic)-[r:MENTIONS]->(n:Entity {uuid: node_uuid})
RETURN count(*) AS score, n.uuid AS uuid
""",
node_uuids=node_uuids,
)
for result in results:
scores[result['uuid']] = result['score']
for uuid in node_uuids:
if uuid not in scores:
scores[uuid] = float('inf')
sorted_uuids = list(node_uuids)
sorted_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
reranked_uuids = [u for u in sorted_uuids if scores[u] >= min_score]
if not reranked_uuids:
return []
get_query = """
MATCH (n:Entity)
WHERE n.uuid IN $uuids
RETURN
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
return [node_map[u] for u in reranked_uuids if u in node_map]
# --- Filter builders ---
def build_node_search_filters(self, search_filters: SearchFilters) -> Any:
filter_queries, filter_params = node_search_filter_query_constructor(
search_filters, GraphProvider.FALKORDB
)
return {'filter_queries': filter_queries, 'filter_params': filter_params}
def build_edge_search_filters(self, search_filters: SearchFilters) -> Any:
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filters, GraphProvider.FALKORDB
)
return {'filter_queries': filter_queries, 'filter_params': filter_params}
# --- Fulltext query builder ---
def build_fulltext_query(
self,
query: str,
group_ids: list[str] | None = None,
max_query_length: int = MAX_QUERY_LENGTH,
) -> str:
return _build_falkor_fulltext_query(query, group_ids, max_query_length)
@@ -0,0 +1,444 @@
"""
Copyright 2024, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from falkordb import Graph as FalkorGraph
from falkordb.asyncio import FalkorDB
else:
try:
from falkordb import Graph as FalkorGraph
from falkordb.asyncio import FalkorDB
except ImportError:
# If falkordb is not installed, raise an ImportError
raise ImportError(
'falkordb is required for FalkorDriver. '
'Install it with: pip install graphiti-core[falkordb]'
) from None
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
from graphiti_core.driver.falkordb import STOPWORDS as STOPWORDS
from graphiti_core.driver.falkordb.operations.community_edge_ops import (
FalkorCommunityEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.community_node_ops import (
FalkorCommunityNodeOperations,
)
from graphiti_core.driver.falkordb.operations.entity_edge_ops import FalkorEntityEdgeOperations
from graphiti_core.driver.falkordb.operations.entity_node_ops import FalkorEntityNodeOperations
from graphiti_core.driver.falkordb.operations.episode_node_ops import FalkorEpisodeNodeOperations
from graphiti_core.driver.falkordb.operations.episodic_edge_ops import FalkorEpisodicEdgeOperations
from graphiti_core.driver.falkordb.operations.graph_ops import FalkorGraphMaintenanceOperations
from graphiti_core.driver.falkordb.operations.has_episode_edge_ops import (
FalkorHasEpisodeEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.next_episode_edge_ops import (
FalkorNextEpisodeEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.saga_node_ops import FalkorSagaNodeOperations
from graphiti_core.driver.falkordb.operations.search_ops import FalkorSearchOperations
from graphiti_core.driver.operations.community_edge_ops import CommunityEdgeOperations
from graphiti_core.driver.operations.community_node_ops import CommunityNodeOperations
from graphiti_core.driver.operations.entity_edge_ops import EntityEdgeOperations
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
from graphiti_core.driver.operations.episode_node_ops import EpisodeNodeOperations
from graphiti_core.driver.operations.episodic_edge_ops import EpisodicEdgeOperations
from graphiti_core.driver.operations.graph_ops import GraphMaintenanceOperations
from graphiti_core.driver.operations.has_episode_edge_ops import HasEpisodeEdgeOperations
from graphiti_core.driver.operations.next_episode_edge_ops import NextEpisodeEdgeOperations
from graphiti_core.driver.operations.saga_node_ops import SagaNodeOperations
from graphiti_core.driver.operations.search_ops import SearchOperations
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices, get_vector_indices
from graphiti_core.helpers import validate_group_ids
from graphiti_core.utils.datetime_utils import convert_datetimes_to_strings
logger = logging.getLogger(__name__)
class FalkorDriverSession(GraphDriverSession):
provider = GraphProvider.FALKORDB
def __init__(self, graph: FalkorGraph):
self.graph = graph
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
# No cleanup needed for Falkor, but method must exist
pass
async def close(self):
# No explicit close needed for FalkorDB, but method must exist
pass
async def execute_write(self, func, *args, **kwargs):
# Directly await the provided async function with `self` as the transaction/session
return await func(self, *args, **kwargs)
async def run(self, query: str | list, **kwargs: Any) -> Any:
# FalkorDB does not support argument for Label Set, so it's converted into an array of queries
if isinstance(query, list):
for cypher, params in query:
params = convert_datetimes_to_strings(params)
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
else:
params = dict(kwargs)
params = convert_datetimes_to_strings(params)
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
return None
class FalkorDriver(GraphDriver):
provider = GraphProvider.FALKORDB
default_group_id: str = '\\_'
fulltext_syntax: str = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries
aoss_client: None = None
def __init__(
self,
host: str = 'localhost',
port: int = 6379,
username: str | None = None,
password: str | None = None,
falkor_db: FalkorDB | None = None,
database: str = 'default_db',
):
"""
Initialize the FalkorDB driver.
FalkorDB is a multi-tenant graph database.
To connect, provide the host and port.
The default parameters assume a local (on-premises) FalkorDB instance.
Args:
host (str): The host where FalkorDB is running.
port (int): The port on which FalkorDB is listening.
username (str | None): The username for authentication (if required).
password (str | None): The password for authentication (if required).
falkor_db (FalkorDB | None): An existing FalkorDB instance to use instead of creating a new one.
database (str): The name of the database to connect to. Defaults to 'default_db'.
"""
super().__init__()
self._database = database
if falkor_db is not None:
# If a FalkorDB instance is provided, use it directly
self.client = falkor_db
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
# Instantiate FalkorDB operations
self._entity_node_ops = FalkorEntityNodeOperations()
self._episode_node_ops = FalkorEpisodeNodeOperations()
self._community_node_ops = FalkorCommunityNodeOperations()
self._saga_node_ops = FalkorSagaNodeOperations()
self._entity_edge_ops = FalkorEntityEdgeOperations()
self._episodic_edge_ops = FalkorEpisodicEdgeOperations()
self._community_edge_ops = FalkorCommunityEdgeOperations()
self._has_episode_edge_ops = FalkorHasEpisodeEdgeOperations()
self._next_episode_edge_ops = FalkorNextEpisodeEdgeOperations()
self._search_ops = FalkorSearchOperations()
self._graph_ops = FalkorGraphMaintenanceOperations()
# Schedule the indices and constraints to be built
try:
# Try to get the current event loop
loop = asyncio.get_running_loop()
# Schedule the build_indices_and_constraints to run
loop.create_task(self.build_indices_and_constraints())
except RuntimeError:
# No event loop running, this will be handled later
pass
# --- Operations properties ---
@property
def entity_node_ops(self) -> EntityNodeOperations:
return self._entity_node_ops
@property
def episode_node_ops(self) -> EpisodeNodeOperations:
return self._episode_node_ops
@property
def community_node_ops(self) -> CommunityNodeOperations:
return self._community_node_ops
@property
def saga_node_ops(self) -> SagaNodeOperations:
return self._saga_node_ops
@property
def entity_edge_ops(self) -> EntityEdgeOperations:
return self._entity_edge_ops
@property
def episodic_edge_ops(self) -> EpisodicEdgeOperations:
return self._episodic_edge_ops
@property
def community_edge_ops(self) -> CommunityEdgeOperations:
return self._community_edge_ops
@property
def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations:
return self._has_episode_edge_ops
@property
def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations:
return self._next_episode_edge_ops
@property
def search_ops(self) -> SearchOperations:
return self._search_ops
@property
def graph_ops(self) -> GraphMaintenanceOperations:
return self._graph_ops
def _get_graph(self, graph_name: str | None) -> FalkorGraph:
# FalkorDB requires a non-None database name for multi-tenant graphs; the default is "default_db"
if graph_name is None:
graph_name = self._database
return self.client.select_graph(graph_name)
async def execute_query(self, cypher_query_, **kwargs: Any):
graph = self._get_graph(self._database)
# Convert datetime objects to ISO strings (FalkorDB does not support datetime objects directly)
params = convert_datetimes_to_strings(dict(kwargs))
try:
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
except Exception as e:
if 'already indexed' in str(e):
# check if index already exists
logger.info(f'Index already exists: {e}')
return None
logger.error(f'Error executing FalkorDB query: {e}\n{cypher_query_}\n{params}')
raise
# Convert the result header to a list of strings
header = [h[1] for h in result.header]
# Convert FalkorDB's result format (list of lists) to the format expected by Graphiti (list of dicts)
records = []
for row in result.result_set:
record = {}
for i, field_name in enumerate(header):
if i < len(row):
record[field_name] = row[i]
else:
# If there are more fields in header than values in row, set to None
record[field_name] = None
records.append(record)
return records, header, None
def session(self, database: str | None = None) -> GraphDriverSession:
return FalkorDriverSession(self._get_graph(database))
async def close(self) -> None:
"""Close the driver connection."""
if hasattr(self.client, 'aclose'):
await self.client.aclose() # type: ignore[reportUnknownMemberType]
elif hasattr(self.client.connection, 'aclose'):
await self.client.connection.aclose()
elif hasattr(self.client.connection, 'close'):
await self.client.connection.close()
async def delete_all_indexes(self) -> None:
result = await self.execute_query('CALL db.indexes()')
if not result:
return
records, _, _ = result
drop_tasks = []
for record in records:
label = record['label']
entity_type = record['entitytype']
for field_name, index_type in record['types'].items():
if 'RANGE' in index_type:
drop_tasks.append(self.execute_query(f'DROP INDEX ON :{label}({field_name})'))
elif 'FULLTEXT' in index_type:
if entity_type == 'NODE':
drop_tasks.append(
self.execute_query(
f'DROP FULLTEXT INDEX FOR (n:{label}) ON (n.{field_name})'
)
)
elif entity_type == 'RELATIONSHIP':
drop_tasks.append(
self.execute_query(
f'DROP FULLTEXT INDEX FOR ()-[e:{label}]-() ON (e.{field_name})'
)
)
if drop_tasks:
await asyncio.gather(*drop_tasks)
async def build_indices_and_constraints(self, delete_existing=False):
if delete_existing:
await self.delete_all_indexes()
# PATCHED 2026-05-02 (BirdAI vendored patch): add vector indexes alongside
# range and fulltext. FalkorDB supports native vector indexes via
# db.idx.vector.queryNodes / queryRelationships; without these, similarity
# search runs as full-table-scan cosine math in interpreted Cypher.
index_queries = (
get_range_indices(self.provider)
+ get_fulltext_indices(self.provider)
+ get_vector_indices(self.provider)
)
for query in index_queries:
await self.execute_query(query)
# Invalidate the search_ops vector-index existence cache so subsequent
# similarity queries re-probe and discover the indexes we just built.
try:
from graphiti_core.driver.falkordb.operations.search_ops import (
_invalidate_falkordb_vector_index_cache,
)
_invalidate_falkordb_vector_index_cache()
except ImportError:
# search_ops module not yet imported (cold start); cache is empty
# by default, so no invalidation needed.
pass
def clone(self, database: str) -> 'GraphDriver':
"""
Returns a shallow copy of this driver with a different default database.
Reuses the same connection (e.g. FalkorDB, Neo4j).
"""
if database == self._database:
cloned = self
elif database == self.default_group_id:
cloned = FalkorDriver(falkor_db=self.client)
else:
# Create a new instance of FalkorDriver with the same connection but a different database
cloned = FalkorDriver(falkor_db=self.client, database=database)
return cloned
async def health_check(self) -> None:
"""Check FalkorDB connectivity by running a simple query."""
try:
await self.execute_query('MATCH (n) RETURN 1 LIMIT 1')
return None
except Exception as e:
print(f'FalkorDB health check failed: {e}')
raise
@staticmethod
def convert_datetimes_to_strings(obj):
if isinstance(obj, dict):
return {k: FalkorDriver.convert_datetimes_to_strings(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [FalkorDriver.convert_datetimes_to_strings(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(FalkorDriver.convert_datetimes_to_strings(item) for item in obj)
elif isinstance(obj, datetime):
return obj.isoformat()
else:
return obj
def sanitize(self, query: str) -> str:
"""
Replace FalkorDB special characters with whitespace.
Based on FalkorDB tokenization rules: ,.<>{}[]"':;!@#$%^&*()-+=~
"""
# FalkorDB separator characters that break text into tokens
separator_map = str.maketrans(
{
',': ' ',
'.': ' ',
'<': ' ',
'>': ' ',
'{': ' ',
'}': ' ',
'[': ' ',
']': ' ',
'"': ' ',
"'": ' ',
':': ' ',
';': ' ',
'!': ' ',
'@': ' ',
'#': ' ',
'$': ' ',
'%': ' ',
'^': ' ',
'&': ' ',
'*': ' ',
'(': ' ',
')': ' ',
'-': ' ',
'+': ' ',
'=': ' ',
'~': ' ',
'?': ' ',
'|': ' ',
'/': ' ',
'\\': ' ',
}
)
sanitized = query.translate(separator_map)
# Clean up multiple spaces
sanitized = ' '.join(sanitized.split())
return sanitized
def build_fulltext_query(
self, query: str, group_ids: list[str] | None = None, max_query_length: int = 128
) -> str:
"""
Build a fulltext query string for FalkorDB using RedisSearch syntax.
FalkorDB uses RedisSearch-like syntax where:
- Field queries use @ prefix: @field:value
- Multiple values for same field: (@field:value1|value2)
- Text search doesn't need @ prefix for content fields
- AND is implicit with space: (@group_id:value) (text)
- OR uses pipe within parentheses: (@group_id:value1|value2)
"""
validate_group_ids(group_ids)
if group_ids is None or len(group_ids) == 0:
group_filter = ''
else:
# Escape group_ids with quotes to prevent RediSearch syntax errors
# with reserved words like "main" or special characters like hyphens
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
group_values = '|'.join(escaped_group_ids)
group_filter = f'(@group_id:{group_values})'
sanitized_query = self.sanitize(query)
# Remove stopwords and empty tokens from the sanitized query
query_words = sanitized_query.split()
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
sanitized_query = ' | '.join(filtered_words)
# If the query is too long return no query
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
return ''
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
@@ -0,0 +1,242 @@
"""
Database query utilities for different graph database backends.
This module provides database-agnostic query generation for Neo4j and FalkorDB,
supporting index creation, fulltext search, and bulk operations.
PATCHED for FalkorDB native vector index support (BirdAI vendored patch,
2026-05-02). Adds:
- get_vector_indices(): CREATE VECTOR INDEX statements for FalkorDB
- get_vector_search_query(): Cypher fragment for vector similarity using
FalkorDB's db.idx.vector procedures, with fallback to cosine math when
the index does not yet exist
- VECTOR_INDEX_CANDIDATE_MULTIPLIER: over-fetch factor for vector index
queries to handle filter rejections after index lookup
No changes to Neo4j or Kuzu code paths.
"""
from typing_extensions import LiteralString
from graphiti_core.driver.driver import GraphProvider
# Mapping from Neo4j fulltext index names to FalkorDB node labels
NEO4J_TO_FALKORDB_MAPPING = {
'node_name_and_summary': 'Entity',
'community_name': 'Community',
'episode_content': 'Episodic',
'edge_name_and_fact': 'RELATES_TO',
}
# Mapping from fulltext index names to Kuzu node labels
INDEX_TO_LABEL_KUZU_MAPPING = {
'node_name_and_summary': 'Entity',
'community_name': 'Community',
'episode_content': 'Episodic',
'edge_name_and_fact': 'RelatesToNode_',
}
# Vector index over-fetch multiplier. When a vector index search is
# combined with WHERE filters (group_id, source_uuid, etc.), some of
# the top-k index results may be filtered out. Over-fetching by this
# factor preserves recall against the final LIMIT after filtering.
# Conservative default; tunable per-deployment by editing this constant
# or via environment-variable override at the driver level (future).
VECTOR_INDEX_CANDIDATE_MULTIPLIER = 5
def get_range_indices(provider: GraphProvider) -> list[LiteralString]:
if provider == GraphProvider.FALKORDB:
return [
# Entity node
'CREATE INDEX FOR (n:Entity) ON (n.uuid, n.group_id, n.name, n.created_at)',
# Episodic node
'CREATE INDEX FOR (n:Episodic) ON (n.uuid, n.group_id, n.created_at, n.valid_at)',
# Community node
'CREATE INDEX FOR (n:Community) ON (n.uuid)',
# Saga node
'CREATE INDEX FOR (n:Saga) ON (n.uuid, n.group_id, n.name)',
# RELATES_TO edge
'CREATE INDEX FOR ()-[e:RELATES_TO]-() ON (e.uuid, e.group_id, e.name, e.created_at, e.expired_at, e.valid_at, e.invalid_at)',
# MENTIONS edge
'CREATE INDEX FOR ()-[e:MENTIONS]-() ON (e.uuid, e.group_id)',
# HAS_MEMBER edge
'CREATE INDEX FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
# HAS_EPISODE edge
'CREATE INDEX FOR ()-[e:HAS_EPISODE]-() ON (e.uuid, e.group_id)',
# NEXT_EPISODE edge
'CREATE INDEX FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid, e.group_id)',
]
if provider == GraphProvider.KUZU:
return []
return [
'CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)',
'CREATE INDEX episode_uuid IF NOT EXISTS FOR (n:Episodic) ON (n.uuid)',
'CREATE INDEX community_uuid IF NOT EXISTS FOR (n:Community) ON (n.uuid)',
'CREATE INDEX saga_uuid IF NOT EXISTS FOR (n:Saga) ON (n.uuid)',
'CREATE INDEX relation_uuid IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.uuid)',
'CREATE INDEX mention_uuid IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.uuid)',
'CREATE INDEX has_member_uuid IF NOT EXISTS FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
'CREATE INDEX has_episode_uuid IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.uuid)',
'CREATE INDEX next_episode_uuid IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid)',
'CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id)',
'CREATE INDEX episode_group_id IF NOT EXISTS FOR (n:Episodic) ON (n.group_id)',
'CREATE INDEX community_group_id IF NOT EXISTS FOR (n:Community) ON (n.group_id)',
'CREATE INDEX saga_group_id IF NOT EXISTS FOR (n:Saga) ON (n.group_id)',
'CREATE INDEX relation_group_id IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.group_id)',
'CREATE INDEX mention_group_id IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.group_id)',
'CREATE INDEX has_episode_group_id IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.group_id)',
'CREATE INDEX next_episode_group_id IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.group_id)',
'CREATE INDEX name_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.name)',
'CREATE INDEX saga_name IF NOT EXISTS FOR (n:Saga) ON (n.name)',
'CREATE INDEX created_at_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.created_at)',
'CREATE INDEX created_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.created_at)',
'CREATE INDEX valid_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.valid_at)',
'CREATE INDEX name_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.name)',
'CREATE INDEX created_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.created_at)',
'CREATE INDEX expired_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.expired_at)',
'CREATE INDEX valid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.valid_at)',
'CREATE INDEX invalid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.invalid_at)',
]
def get_fulltext_indices(provider: GraphProvider) -> list[LiteralString]:
if provider == GraphProvider.FALKORDB:
from typing import cast
from graphiti_core.driver.falkordb import STOPWORDS
# Convert to string representation for embedding in queries
stopwords_str = str(STOPWORDS)
# Use type: ignore to satisfy LiteralString requirement while maintaining single source of truth
return cast(
list[LiteralString],
[
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Episodic',
stopwords: {stopwords_str}
}},
'content', 'source', 'source_description', 'group_id'
)""",
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Entity',
stopwords: {stopwords_str}
}},
'name', 'summary', 'group_id'
)""",
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Community',
stopwords: {stopwords_str}
}},
'name', 'group_id'
)""",
"""CREATE FULLTEXT INDEX FOR ()-[e:RELATES_TO]-() ON (e.name, e.fact, e.group_id)""",
],
)
if provider == GraphProvider.KUZU:
return [
"CALL CREATE_FTS_INDEX('Episodic', 'episode_content', ['content', 'source', 'source_description']);",
"CALL CREATE_FTS_INDEX('Entity', 'node_name_and_summary', ['name', 'summary']);",
"CALL CREATE_FTS_INDEX('Community', 'community_name', ['name']);",
"CALL CREATE_FTS_INDEX('RelatesToNode_', 'edge_name_and_fact', ['name', 'fact']);",
]
return [
"""CREATE FULLTEXT INDEX episode_content IF NOT EXISTS
FOR (e:Episodic) ON EACH [e.content, e.source, e.source_description, e.group_id]""",
"""CREATE FULLTEXT INDEX node_name_and_summary IF NOT EXISTS
FOR (n:Entity) ON EACH [n.name, n.summary, n.group_id]""",
"""CREATE FULLTEXT INDEX community_name IF NOT EXISTS
FOR (n:Community) ON EACH [n.name, n.group_id]""",
"""CREATE FULLTEXT INDEX edge_name_and_fact IF NOT EXISTS
FOR ()-[e:RELATES_TO]-() ON EACH [e.name, e.fact, e.group_id]""",
]
def get_vector_indices(provider: GraphProvider, dimension: int = 384) -> list[LiteralString]:
"""Return CREATE VECTOR INDEX statements for the given provider.
For FalkorDB: creates HNSW vector indexes on Entity.name_embedding,
RELATES_TO.fact_embedding, and Community.name_embedding. Backed by
FalkorDB's native vector index (db.idx.vector.queryNodes /
queryRelationships).
For Neo4j and Kuzu: returns an empty list. Those backends create vector
indexes via different mechanisms (Neo4j auto-creates them when needed
via its vector.similarity.cosine function; Kuzu uses array_cosine_similarity
and does not require pre-built vector indexes for graphiti-core's usage).
Args:
provider: The graph database provider.
dimension: Embedding dimension. Defaults to 384 (all-MiniLM-L6-v2).
Embedders with different dimensions should pass their own value
through driver configuration. graphiti-core's default embedder
is 1536 (OpenAI ada-002); BirdAI uses 384 (sentence-transformers).
Returns:
List of CREATE VECTOR INDEX statements. Idempotent at FalkorDB level
if the index already exists with matching options.
"""
if provider == GraphProvider.FALKORDB:
from typing import cast
return cast(
list[LiteralString],
[
f"CREATE VECTOR INDEX FOR (n:Entity) ON (n.name_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
f"CREATE VECTOR INDEX FOR ()-[e:RELATES_TO]-() ON (e.fact_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
f"CREATE VECTOR INDEX FOR (n:Community) ON (n.name_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
],
)
return []
def get_nodes_query(name: str, query: str, limit: int, provider: GraphProvider) -> str:
if provider == GraphProvider.FALKORDB:
label = NEO4J_TO_FALKORDB_MAPPING[name]
return f"CALL db.idx.fulltext.queryNodes('{label}', {query})"
if provider == GraphProvider.KUZU:
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', {query}, TOP := $limit)"
return f'CALL db.index.fulltext.queryNodes("{name}", {query}, {{limit: $limit}})'
def get_vector_cosine_func_query(vec1, vec2, provider: GraphProvider) -> str:
"""Return a Cypher fragment for cosine similarity score in [0, 1].
PRESERVED for backward compatibility and as fallback when vector indexes
do not yet exist on the FalkorDB backend. New code paths should prefer
get_vector_search_query() which uses the native vector index when
available.
"""
if provider == GraphProvider.FALKORDB:
# FalkorDB uses a different syntax for regular cosine similarity and Neo4j uses normalized cosine similarity
return f'(2 - vec.cosineDistance({vec1}, vecf32({vec2})))/2'
if provider == GraphProvider.KUZU:
return f'array_cosine_similarity({vec1}, {vec2})'
return f'vector.similarity.cosine({vec1}, {vec2})'
def get_relationships_query(name: str, limit: int, provider: GraphProvider) -> str:
if provider == GraphProvider.FALKORDB:
label = NEO4J_TO_FALKORDB_MAPPING[name]
return f"CALL db.idx.fulltext.queryRelationships('{label}', $query)"
if provider == GraphProvider.KUZU:
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', cast($query AS STRING), TOP := $limit)"
return f'CALL db.index.fulltext.queryRelationships("{name}", $query, {{limit: $limit}})'
+131 -1
View File
@@ -141,6 +141,19 @@ consulting" not "my work." Results are unfiltered and ranked by
semantic similarity; judge each chunk for relevance and ignore
irrelevant hits rather than forcing them into the answer.
You also have a search_facts tool that queries a knowledge graph of
atomic facts about Aaron's entities and their relationships. The graph
was populated through early May 2026 and is not currently being
updated; treat it as a *historical* layer that holds biographical
content (career, projects, consulting), exhibition records, key
people, dossier-era claims, and time-stamped facts with explicit
validity windows. For biographical or relational questions ("write
me a bio", "what's the FWN3D / HVAMC relationship", "who did I
consult for at IBM"), call search_facts *in addition to*
retrieve_documents — the two return complementary shapes (atomic
facts vs. document passages). For current-state questions, the
persistent memory file is more authoritative than the graph.
When Aaron asks for a document file — bio, cover letter, statement,
CV section, anything he wants to send or edit outside chat — produce
the full text as your chat reply first. NEVER call save_document on
@@ -440,6 +453,111 @@ DRAFTS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Drafts"
_FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]")
GRAPHITI_URL = os.getenv("GRAPHITI_URL", "http://localhost:8001")
GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
SEARCH_FACTS_TOOL = {
"name": "search_facts",
"description": (
"Search Aaron's knowledge graph for atomic facts about entities and "
"their relationships. The graph holds time-stamped facts captured up "
"to early May 2026 — biographical content (career, projects, "
"consulting), exhibition history, key relationships, dossier-era "
"claims. Returns short sentence-shaped facts with valid_at / "
"invalid_at timestamps so you can distinguish current state from "
"superseded history. Useful for: bios, 'who did I consult for', "
"'what's the relationship between X and Y', any question shaped like "
"a relational lookup. Complements retrieve_documents (which returns "
"longer chunk passages). Call this *in addition to* retrieve_documents "
"for biographical or relational questions — the two return "
"different shapes of evidence. The graph hasn't been updated since "
"early May 2026; for current-state questions, the persistent memory "
"file or recent documents are more authoritative."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The fact-shaped query. Concrete entity names work best.",
},
},
"required": ["query"],
},
}
def _push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message):
"""Async fire-and-forget push of a chat turn into Graphiti. Single episode,
default extraction, no custom_extraction_instructions. Takes ~20 min in
the background against the current ~4,300-entity graph; the chat caller
is not gated on this. Errors are logged, never raised."""
if os.getenv("SKIP_GRAPHITI_CHAT_PUSH"):
return
if not (user_message or "").strip() and not (assistant_message or "").strip():
return
import threading
from datetime import datetime as _dt
def _work():
try:
episode_name = f"chat-{conversation_id[:8]}-{_dt.now().strftime('%Y%m%dT%H%M%S')}"
content = (
f"User: {user_message}\n\n"
f"Assistant: {assistant_message}"
)
payload = {
"name": episode_name,
"content": content,
"source_description": f"chat turn (conversation {conversation_id})",
"timestamp": _dt.now().isoformat(),
"group_id": GRAPHITI_GROUP_ID,
}
# Long timeout — sidecar add_episode against the current graph
# is empirically ~20 min wall-clock. We're patient; chat isn't.
r = requests.post(f"{GRAPHITI_URL}/episodes", json=payload, timeout=1800)
if r.status_code == 200:
print(f"[graphiti-push] turn ingested: {episode_name}", flush=True)
else:
print(f"[graphiti-push] non-200 ({r.status_code}) for {episode_name}: {r.text[:200]}", flush=True)
except requests.RequestException as e:
print(f"[graphiti-push] request failed: {e}", flush=True)
except Exception as e:
print(f"[graphiti-push] unexpected error: {e}", flush=True)
threading.Thread(target=_work, daemon=True).start()
def _execute_search_facts(tool_input):
"""Hit Graphiti /search, format the results as text for Claude."""
query = (tool_input or {}).get("query", "").strip()
if not query:
return "No query provided."
try:
r = requests.get(
f"{GRAPHITI_URL}/search",
params={"query": query, "limit": 8, "group_id": GRAPHITI_GROUP_ID},
timeout=15,
)
except requests.RequestException as e:
return f"search_facts: Graphiti unreachable ({e})."
if r.status_code != 200:
return f"search_facts: Graphiti returned {r.status_code}."
results = r.json().get("results", [])
if not results:
return f"No facts found for {query!r}."
lines = []
for i, f in enumerate(results, 1):
fact = f.get("fact", "").strip()
valid_at = f.get("valid_at") or "?"
invalid_at = f.get("invalid_at")
validity = (f"valid {valid_at}" + (f" → superseded {invalid_at}"
if invalid_at and invalid_at != "None" else ""))
lines.append(f"[{i}] {fact} ({validity})")
return "\n".join(lines)
SAVE_DOCUMENT_TOOL = {
"name": "save_document",
"description": (
@@ -633,7 +751,7 @@ def chat(user_message, conversation_id, settings, client_time=None):
messages = history + [{"role": "user", "content": full_message}]
tools = [RETRIEVE_DOCUMENTS_TOOL, SAVE_DOCUMENT_TOOL]
tools = [RETRIEVE_DOCUMENTS_TOOL, SEARCH_FACTS_TOOL, SAVE_DOCUMENT_TOOL]
if settings.get("web_search", True):
tools.append({"type": "web_search_20250305", "name": "web_search"})
@@ -672,6 +790,13 @@ def chat(user_message, conversation_id, settings, client_time=None):
"tool_use_id": block.id,
"content": result_text,
})
elif block.name == "search_facts":
result_text = _execute_search_facts(block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result_text,
})
elif block.name == "save_document":
result_text = _execute_save_document(block.input)
tool_results.append({
@@ -691,6 +816,11 @@ def chat(user_message, conversation_id, settings, client_time=None):
for block in response.content:
if hasattr(block, "text"):
assistant_message += block.text
# Async fire-and-forget into Graphiti so the turn lands in the
# graph as a single episode for future search_facts queries to
# find. Takes ~20 min wall-clock in the background; chat returns
# immediately. Disable via SKIP_GRAPHITI_CHAT_PUSH=1 if needed.
_push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message)
# Cap citations: accumulated_sources can grow large across multiple
# retrieve_documents calls and not every chunk that came back was
# actually used in the answer. Insertion order preserves rank
+128
View File
@@ -0,0 +1,128 @@
"""One-off: backfill last_consolidated_at + consolidation_count on embeddings
from the dream-manifest-*.json files already in Journal/Dreams/.
Why this exists: the consolidation cursor columns added by the dreamer
redesign migration default to NULL / 0. Without history, the
underprocessed-count signal in dream_observation.observe_corpus() reports
"every chunk is underprocessed" (degenerate percentile), and NREM has no
basis to bias replay toward least-recently-consolidated chunks.
We have ~25 historical dream manifests in Nextcloud/Journal/Dreams/, each
listing the sources retrieved per stage. For each (manifest, source) pair
this script:
- finds matching embeddings rows by source (basename match)
- increments consolidation_count by 1
- updates last_consolidated_at to the manifest date (UTC midnight)
Idempotent: re-running will not double-count because we drop existing
cursor values to NULL/0 before backfilling. Pass --dry-run to print what
would change without writing.
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
PG_DSN = os.getenv("PG_DSN")
DREAMS_DIR = Path("/home/aaron/nextcloud/data/data/aaron/files/Journal/Dreams")
DRY_RUN = "--dry-run" in sys.argv
def get_pg():
return psycopg2.connect(PG_DSN)
def collect_manifest_records():
"""Return a list of (source_basename, manifest_date_utc) tuples from all
dream-manifest-*.json files. One pair per (manifest, source) appearance."""
pairs = []
if not DREAMS_DIR.exists():
return pairs
for path in sorted(DREAMS_DIR.glob("dream-manifest-*.json")):
try:
m = json.loads(path.read_text())
except Exception as e:
print(f" skip {path.name}: {e}")
continue
date_str = m.get("date")
if not date_str:
continue
try:
dt = datetime.fromisoformat(date_str).replace(tzinfo=timezone.utc)
except ValueError:
continue
stages = m.get("stages") or {}
for stage_name in ("nrem", "early_rem", "late_rem", "synthesis"):
stage = stages.get(stage_name) or {}
for src in (stage.get("sources") or []):
if src:
pairs.append((src, dt))
return pairs
def main():
print(f"Mode: {'DRY-RUN' if DRY_RUN else 'APPLY'}")
print(f"Scanning manifests in {DREAMS_DIR}")
pairs = collect_manifest_records()
print(f"Collected {len(pairs)} (source, manifest_date) pairs across all manifests")
if not pairs:
print("Nothing to backfill.")
return
# Aggregate per source: count + latest date
from collections import defaultdict
counts = defaultdict(int)
latest = {}
for src, dt in pairs:
counts[src] += 1
if src not in latest or dt > latest[src]:
latest[src] = dt
print(f"Unique sources to update: {len(counts)}")
# Sample what we'd write
print("Sample (top 5 by appearance count):")
for src, n in sorted(counts.items(), key=lambda kv: -kv[1])[:5]:
print(f" {n:>3} appearances — {src} → last_consolidated_at = {latest[src].date()}")
if DRY_RUN:
print("\nDry-run only. Re-run without --dry-run to apply.")
return
pg = get_pg()
cur = pg.cursor()
# Reset cursor for any sources we're about to backfill so reruns are clean.
print("\nResetting cursor for sources we'll touch...")
sources = list(counts.keys())
cur.execute(
"UPDATE embeddings SET last_consolidated_at = NULL, consolidation_count = 0 "
"WHERE source = ANY(%s)",
(sources,),
)
print(f" reset {cur.rowcount} embeddings rows")
# Apply per-source updates. For each source, set count and latest date.
print("Applying per-source backfill...")
updated_rows = 0
for src, n in counts.items():
cur.execute(
"UPDATE embeddings "
"SET consolidation_count = %s, last_consolidated_at = %s "
"WHERE source = %s",
(n, latest[src], src),
)
updated_rows += cur.rowcount
pg.commit()
pg.close()
print(f"Done. Updated {updated_rows} embeddings rows across {len(counts)} unique sources.")
if __name__ == "__main__":
main()
+342 -64
View File
@@ -23,6 +23,7 @@ from datetime import datetime, timedelta
from dotenv import load_dotenv
import psycopg2
import hashlib
import numpy as np
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
@@ -42,6 +43,26 @@ NEXTCLOUD_USER = os.getenv("NEXTCLOUD_USER", "aaron")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD", "")
DREAMS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Journal/Dreams"
# ─── Retrieval-window config (per dreamer-multimodal-design.md §2) ─────────
# Biological grounding: NREM replays recent traces (24-72 hrs); REM links
# across time on structural similarity, not temporal proximity. Synthesis
# pulls from salience across the full corpus (no window). Spec calls for
# these to be mutable rather than hardcoded — this is the mutable home.
TIME_WINDOWS_HOURS = {
"nrem": 72, # 24-72 hrs, take wider end
"early-rem": 24 * 30, # 30 days
"late-rem": 24 * 90, # 90 days
"lucid": None, # no window
}
# Maximal Marginal Relevance: λ=1 → pure relevance, λ=0 → pure diversity.
# 0.5 is the standard balance; tune later if the dossier-cluster problem
# isn't sufficiently broken up.
MMR_LAMBDA = 0.5
# Fast/cheap model for query generation. Sonnet for synthesis (in synthesize_*).
LLM_QUERY_MODEL = os.getenv("DREAMER_QUERY_MODEL", "claude-haiku-4-5-20251001")
# Similarity ranges calibrated for all-MiniLM-L6-v2
MODE_RANGES = {
"nrem": (0.48, 0.72),
@@ -289,70 +310,293 @@ def _get_embedder():
from sentence_transformers import SentenceTransformer
return SentenceTransformer("all-MiniLM-L6-v2")
def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged)
# type_filter is experimental and applies to pgvector retrieval only — Graphiti
# facts are not embeddings rows and have no embeddings.type to filter on.
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources)
embedder = _get_embedder()
low, high = MODE_RANGES[mode]
def _llm_generate_queries(mode, signal, task=None, n_queries=4):
"""Park et al. 2023 reflection-style query generation. Feeds the LLM the
observation signal + a mode-specific framing; emits N retrieval queries
that probe different corners of the recent corpus instead of the same
hardcoded string every night. Sources cited in dream_observation.py.
Falls back to recent_questions from the signal if the LLM call fails."""
import anthropic
if task:
query = task
elif mode == "late-rem":
delta = observe_corpus()
topics = delta.get("recent_topics", [])
query = topics[0] if topics else "practice place memory making"
elif mode == "early-rem":
query = "career decision personal change what matters next"
# Lucid mode: decompose the user's task into sub-queries
prompt = (
f"Decompose this user task into {n_queries} distinct sub-questions, "
f"each suitable as a retrieval query against Aaron's personal corpus.\n\n"
f"TASK: {task}\n\n"
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
)
else:
query = "research fabrication teaching practice recent work"
mode_framings = {
"nrem": (
"NREM is replay-and-consolidation of RECENT traces. Generate queries "
"that probe what Aaron has been working on or capturing in the last "
"few days. Concrete entities — project names, course codes, named "
"subjects. The dreamer is re-touching specific recent material to "
"strengthen schema connections, not finding novel content."
),
"early-rem": (
"Early REM is associative bridging with emotional/personal register. "
"Generate queries that surface unresolved themes, career questions, "
"ongoing personal threads — material that connects intellectual and "
"emotional dimensions. Tone: thoughtful friend, not researcher."
),
"late-rem": (
"Late REM tests novel connections across DISTANT material. Generate "
"queries that pair concrete subjects from DIFFERENT domains of Aaron's "
"work (e.g., one from academic teaching, one from consulting, one from "
"creative practice) to probe for surprising structural similarity. "
"Cross-domain is required."
),
}
framing = mode_framings.get(mode, mode_framings["nrem"])
questions_snippet = "\n".join(
f" - {q[:200]}" for q in signal.get("recent_questions", [])[:8]
) or " (no recent user questions)"
journal_snippet = ", ".join(signal.get("new_journal_entries", [])[:5]) or "(none)"
days_str = (
f"{signal['days_since_dream']:.1f}"
if signal.get("days_since_dream") not in (None, float("inf"))
else "infinite (first dream)"
)
prompt = (
f"You generate retrieval queries for an Active Inference dreamer. The "
f"dreamer surfaces prediction errors — gaps between Aaron's model and "
f"reality — not summaries or generic associations.\n\n"
f"MODE: {mode}\n"
f"FRAMING: {framing}\n\n"
f"OBSERVATION SIGNAL:\n"
f"- Days since last dream: {days_str}\n"
f"- New chunks since last dream: {signal.get('new_chunks', 0)}\n"
f"- New journal entries: {journal_snippet}\n"
f"- Underprocessed chunks pool: {signal.get('underprocessed_count', 0):,}\n\n"
f"RECENT USER QUESTIONS (last 14 days, top 8):\n{questions_snippet}\n\n"
f"Generate {n_queries} retrieval queries. Requirements:\n"
f"- Use concrete entities, named projects, course codes, specific topics "
f"— NOT generic phrasing like 'research work practice'\n"
f"- Each query probes a DIFFERENT corner of recent activity\n"
f"- Match the {mode} framing\n"
f"- 5-15 words each\n\n"
f'Output JSON ONLY: {{"queries": ["...", "...", ...]}}'
)
embedding = embedder.encode([query]).tolist()[0]
chunks = []
seen_sources = set()
try:
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
resp = client.messages.create(
model=LLM_QUERY_MODEL,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
text = "".join(b.text for b in resp.content if hasattr(b, "text")).strip()
if text.startswith("```"):
text = text.split("```", 2)[1]
if text.startswith("json"):
text = text[4:]
text = text.strip()
data = json.loads(text)
queries = data.get("queries", [])
if isinstance(queries, list) and queries:
return [str(q).strip() for q in queries[:n_queries] if str(q).strip()]
except Exception as e:
print(f"[dream] LLM query generation failed ({e}); falling back to recent questions")
fallback = signal.get("recent_questions", [])[:n_queries] if signal else []
return fallback or [task or "recent activity decisions thinking"]
def _mmr_select(candidate_embeddings, query_embedding, n, lambda_=MMR_LAMBDA):
"""Maximal Marginal Relevance — greedy selection that balances relevance
against pairwise diversity. Carbonell & Goldstein 1998. Used to prevent
cluster lock-in (e.g., 8 dossier-narrative variants filling all 8 slots).
candidate_embeddings: (N, D) numpy array
query_embedding: (D,) numpy array
Returns: list of indices into candidate_embeddings, len ≤ n."""
if len(candidate_embeddings) == 0:
return []
n = min(n, len(candidate_embeddings))
cands = candidate_embeddings / (np.linalg.norm(candidate_embeddings, axis=1, keepdims=True) + 1e-9)
q = query_embedding / (np.linalg.norm(query_embedding) + 1e-9)
relevance = cands @ q
selected = []
remaining = list(range(len(cands)))
while len(selected) < n and remaining:
if not selected:
best = max(remaining, key=lambda i: relevance[i])
else:
sel = cands[selected]
scores = {
i: lambda_ * relevance[i] - (1 - lambda_) * float((cands[i] @ sel.T).max())
for i in remaining
}
best = max(scores, key=scores.get)
selected.append(best)
remaining.remove(best)
return selected
def _bump_consolidation_cursor(chunks):
"""Increment consolidation_count + set last_consolidated_at=NOW() for each
source represented in chunks. Called from dream_pipeline after NREM
completes. Per sharp-wave-ripples biology, NREM does the actual
consolidation; REM is associative use, so we only bump on NREM."""
if not chunks:
return
sources = list({c["source"] for c in chunks if c.get("source")})
if not sources:
return
try:
pg = get_pg()
cur = pg.cursor()
excluded_sources = excluded_sources or set()
where, params = [], []
if excluded_sources:
where.append("source NOT IN %s")
params.append(tuple(excluded_sources))
if type_filter:
where.append("type = ANY(%s)")
params.append(list(type_filter))
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
cur.execute(f"""
SELECT document, source, type, 1 - (embedding <=> %s::vector) as similarity
FROM embeddings
{where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [embedding, *params, embedding, n_results * 3])
for doc, source, etype, similarity in cur.fetchall():
if not (low <= similarity <= high):
continue
if source in seen_sources:
continue
chunks.append({
"source": source or "unknown",
"content": doc,
"relevance": similarity,
"similarity": similarity,
"type": etype,
})
seen_sources.add(source)
if len(chunks) >= n_results:
break
cur.execute(
"UPDATE embeddings "
"SET consolidation_count = consolidation_count + 1, "
" last_consolidated_at = NOW() "
"WHERE source = ANY(%s)",
(sources,),
)
pg.commit()
pg.close()
except Exception as e:
print(f"pgvector retrieval error: {e}")
print(f"[dream] cursor bump failed (non-fatal): {e}")
def retrieve(mode, task=None, n_results=8, excluded_sources=None,
type_filter=None, signal=None):
"""Refactored retrieval — see dreamer-design-spec.md Stage 3 + the
external-literature prescription in birdai-dreamer-exclusion-finding-2026-05-02.md.
Changes from the prior hardcoded-query version:
- Queries are LLM-generated from the observation signal (Park et al.
reflection pattern) instead of fixed strings. Solves the "same 8 sources
every night" failure where fixed seeds locked into one neighborhood.
- Per-mode time windows (24-72hr NREM / 30d Early REM / 90d Late REM)
filter candidates before vector search. Spec calls for these to be
mutable; they live in TIME_WINDOWS_HOURS.
- NREM biases toward under-processed chunks (low consolidation_count).
Biologically motivated: sharp-wave ripples tag what to replay, not
uniform sampling.
- Multiple queries (4 by default) → over-fetch → MMR merge for
within-night diversity. Prevents cluster domination.
signal is the observation-signal dict from dream_observation.observe_corpus().
If None, observe_corpus is called inline (back-compat for ad-hoc invocation).
"""
# E3 substrate experiment unchanged
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results,
excluded_sources=excluded_sources)
if signal is None:
from dream_observation import observe_corpus as _obs
signal = _obs()
queries = _llm_generate_queries(mode, signal, task=task, n_queries=4)
if not queries:
print(f"[dream:{mode}] no queries generated; bailing")
return []
print(f"[dream:{mode}] generated queries: {queries}")
embedder = _get_embedder()
excluded_sources = excluded_sources or set()
window_hours = TIME_WINDOWS_HOURS.get(mode)
per_query_n = 12 # over-fetch for MMR
candidates = []
seen_ids = set()
try:
pg = get_pg()
cur = pg.cursor()
for q in queries:
q_emb = embedder.encode([q]).tolist()[0]
where, params = [], []
if excluded_sources:
where.append("source NOT IN %s")
params.append(tuple(excluded_sources))
if type_filter:
where.append("type = ANY(%s)")
params.append(list(type_filter))
if window_hours is not None:
# created_at is TEXT (legacy); cast it. NULL created_at fails
# the comparison so legacy rows are excluded from windowed
# modes — correct: NULL means "indexed before cursor existed,"
# which by definition is older than any window.
where.append(
f"(created_at IS NOT NULL AND "
f"created_at::timestamptz > NOW() - INTERVAL '{int(window_hours)} hours')"
)
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
# NREM bias: order by consolidation_count ASC first (under-processed
# chunks win the tiebreak before vector distance). Other modes:
# vector distance only.
order_clause = (
"ORDER BY consolidation_count ASC, embedding <=> %s::vector"
if mode == "nrem"
else "ORDER BY embedding <=> %s::vector"
)
cur.execute(f"""
SELECT id, document, source, type, embedding,
1 - (embedding <=> %s::vector) as similarity
FROM embeddings
{where_clause}
{order_clause}
LIMIT %s
""", [q_emb, *params, q_emb, per_query_n])
for row in cur.fetchall():
if row[0] in seen_ids:
continue
seen_ids.add(row[0])
emb = row[4]
# pgvector returns embeddings as string "[...]" by default
if isinstance(emb, str):
emb = np.array([float(x) for x in emb.strip("[]").split(",")])
else:
emb = np.array(emb)
candidates.append({
"id": row[0],
"content": row[1],
"source": row[2] or "unknown",
"type": row[3],
"embedding": emb,
"similarity": float(row[5]),
})
pg.close()
except Exception as e:
import traceback
print(f"[dream:{mode}] retrieval SQL error: {e}")
traceback.print_exc()
return []
if not candidates:
print(f"[dream:{mode}] zero candidates after filters")
return []
# MMR over the union, using the first query as pivot for the relevance term.
# Averaging query embeddings would be theoretically cleaner but adds
# complexity for marginal benefit at this scale.
pivot_emb = np.array(embedder.encode([queries[0]]).tolist()[0])
cand_embs = np.array([c["embedding"] for c in candidates])
selected_idx = _mmr_select(cand_embs, pivot_emb, n=n_results * 2)
# Post-MMR source-level dedup (multi-chunk same source collapses to one).
chunks = []
seen_sources = set()
for i in selected_idx:
c = candidates[i]
if c["source"] in seen_sources:
continue
seen_sources.add(c["source"])
chunks.append({
"source": c["source"],
"content": c["content"],
"relevance": c["similarity"],
"similarity": c["similarity"],
"type": c["type"],
})
if len(chunks) >= n_results:
break
return chunks
@@ -496,6 +740,12 @@ def dream_pipeline(type_filter=None):
"""
Full nightly pipeline — interdependent stages.
NREM output feeds Early REM. Both feed Late REM. All three feed Synthesis.
Per dreamer-design-spec.md, this now runs Stage 1 (observe) and Stage 2
(select) first. If select_mode returns None — corpus unchanged and no new
journal entry — the dreamer goes quiet rather than manufacturing novelty.
Otherwise NREM/Early-REM/Late-REM run with LLM-generated queries seeded
from the observation signal.
"""
print(f"Dreamer pipeline starting — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
@@ -503,21 +753,47 @@ def dream_pipeline(type_filter=None):
state.pop("retrieved_sources", None) # legacy key; session-scoped novelty now
session_retrieved = set()
delta = observe_corpus()
print(f"Corpus: {delta['new_chunks']} new chunks, {delta['days_since_dream']:.1f} days since last dream")
print("Novelty: session-scoped (no across-night exclusion)")
# ── Stage 1 + 2: Observe + Select ──────────────────────────────────────
from dream_observation import observe_corpus as _obs, select_mode as _select
signal = _obs()
print(
f"Signal: new_chunks={signal['new_chunks']}, "
f"new_journal={len(signal['new_journal_entries'])}, "
f"days_since={signal['days_since_dream']:.1f}, "
f"underprocessed={signal['underprocessed_count']:,}"
)
selected = _select(signal)
if selected is None:
print("[select_mode] None — nothing worth dreaming about tonight (going quiet)")
# Update last-dream-attempted-at but not last_dream — caller can distinguish
# an actual dream from a skipped night by looking at last_dream_file or
# checking the manifest dir.
state["last_select_quiet_at"] = datetime.now().isoformat()
save_dreamer_state(state)
return None
print(f"[select_mode] → {selected}")
# ── Stage 1: NREM ──────────────────────────────────────────────────────
# The pipeline always runs all three modes for the manifest's continuity.
# select_mode's choice signals the *primary* focus; the others still run
# but draw from their own mode-appropriate windows.
primary_mode = selected
# ── Stage 3: NREM ──────────────────────────────────────────────────────
print("\n[NREM] Retrieving...")
# NREM is replay-and-consolidation — does not exclude prior traces.
# Late REM and Early REM exclude prior content for novelty; NREM does not.
nrem_chunks = retrieve("nrem", excluded_sources=None, type_filter=type_filter)
nrem_chunks = retrieve("nrem", excluded_sources=None,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in nrem_chunks)
# Track sources that scored above Early REM ceiling — these are the only ones Early REM should exclude
nrem_high_sources = {c["source"] for c in nrem_chunks if c["similarity"] > 0.55}
if not nrem_chunks:
print("[NREM] No suitable chunks — aborting pipeline")
return None
# Cursor bump: NREM is the consolidation stage. Each appearance increments
# consolidation_count + updates last_consolidated_at, so the next dream's
# observation sees these sources as less under-processed.
_bump_consolidation_cursor(nrem_chunks)
print(f"[NREM] Retrieved {len(nrem_chunks)} chunks. Synthesizing...")
nrem_output = synthesize_nrem(nrem_chunks)
@@ -528,7 +804,7 @@ def dream_pipeline(type_filter=None):
"nrem": {
"chunks_retrieved": len(nrem_chunks),
"avg_similarity": round(sum(c["relevance"] for c in nrem_chunks) / len(nrem_chunks), 3),
"query": "research fabrication teaching practice recent work",
"query": "[llm-generated from observation signal]",
"word_count": len(nrem_output.split()),
"sources": nrem_sources,
"distinct_folders": nrem_folders,
@@ -546,7 +822,8 @@ def dream_pipeline(type_filter=None):
print("\n[Early REM] Retrieving...")
# Early REM excludes previously retrieved + NREM high-scorers only (not full session_retrieved)
# Sources that scored in Early REM band during NREM remain available
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources, type_filter=type_filter)
early_chunks = retrieve("early-rem", excluded_sources=nrem_high_sources,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in early_chunks)
if not early_chunks:
print("[Early REM] No suitable chunks — skipping")
@@ -560,7 +837,7 @@ def dream_pipeline(type_filter=None):
stage_data["early_rem"] = {
"chunks_retrieved": len(early_chunks),
"avg_similarity": round(sum(c["relevance"] for c in early_chunks) / len(early_chunks), 3),
"query": "career decision personal change what matters next",
"query": "[llm-generated from observation signal]",
"word_count": len(early_rem_output.split()),
"sources": early_sources,
"distinct_folders": early_folders,
@@ -572,7 +849,8 @@ def dream_pipeline(type_filter=None):
# ── Stage 3: Late REM — informed by NREM + Early REM ──────────────────
print("\n[Late REM] Retrieving...")
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved, type_filter=type_filter)
late_chunks = retrieve("late-rem", excluded_sources=session_retrieved,
type_filter=type_filter, signal=signal)
session_retrieved.update(c["source"] for c in late_chunks)
if not late_chunks:
print("[Late REM] No suitable chunks — skipping")
@@ -591,7 +869,7 @@ def dream_pipeline(type_filter=None):
stage_data["late_rem"] = {
"chunks_retrieved": len(late_chunks),
"avg_similarity": round(sum(c["relevance"] for c in late_chunks) / len(late_chunks), 3),
"query": "practice place memory making",
"query": "[llm-generated from observation signal]",
"word_count": len(late_rem_output.split()),
"sources": late_sources,
"distinct_folders": list(set(late_folders)),
+235
View File
@@ -0,0 +1,235 @@
"""
Dreamer Stages 1 + 2 — Observe and Select.
Implements `dreamer-design-spec.md`'s Stage 1 (observe_corpus) and Stage 2
(select_mode). These have been latent in dream.py — observe_corpus existed
in skeletal form but its output was largely unused; select_mode did not
exist at all. The dreamer always ran all stages with hardcoded queries.
Per spec (lines 2734 of dreamer-design-spec.md):
delta = observe_corpus()
selected_mode = select_mode(delta, task, project)
if selected_mode is None:
return # nothing worth dreaming
The "returns None — dreamer goes quiet rather than manufacturing novelty"
semantics (spec line 67) is the canonical answer to the repetition problem
documented in birdai-dreamer-exclusion-finding-2026-05-02.md.
Grounded in:
- Active Inference (Friston 2010, 2017) — observe error, choose action that
minimizes free energy. The dreamer is a prediction-error machine; observe
what's diverged from the model, dream about that.
- Sleep stages (Stickgold 2005; Walker 2017; Diekelberg & Born 2010) — NREM
for replay of new traces, REM for associative cross-cluster integration.
- Sharp-wave ripples (Buzsáki, Wilson) — biology tags WHAT to replay
(under-processed chunks); not uniform. Implemented via the consolidation
cursor on the embeddings table.
"""
import json
import os
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
# ─── Paths ──────────────────────────────────────────────────────────────────
PG_DSN = os.getenv("PG_DSN")
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
DREAMER_STATE = str(Path.home() / "aaronai" / "dreamer_state.json")
JOURNAL_DAILY = "/home/aaron/nextcloud/data/data/aaron/files/Journal/Daily"
# ─── Thresholds ─────────────────────────────────────────────────────────────
# Per spec, these become settings-panel controls eventually. For now they're
# constants here; moving them to a config module is task #48.
NEW_CHUNK_THRESHOLD = 5 # below this, NREM not warranted on novelty alone
STALENESS_TRIGGER_DAYS = 3 # corpus quiet ≥3 days → Late REM ("shake things loose")
QUESTION_LOOKBACK_DAYS = 14 # spec line 61: "the last 14 days"
UNDERPROCESSED_PERCENTILE = 0.25 # bottom quartile of consolidation_count
# ─── Helpers ────────────────────────────────────────────────────────────────
def _get_pg():
return psycopg2.connect(PG_DSN)
def _load_json(path, default):
try:
return json.loads(Path(path).read_text())
except Exception:
return default
def _recent_user_questions(days=QUESTION_LOOKBACK_DAYS, limit=20):
"""Pull recent user-turn content from conversations.db. The spec calls
these 'live questions' — what Aaron has been asking about. They become
seed material for the REM modes."""
try:
conn = sqlite3.connect(CONVERSATIONS_DB)
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
cur = conn.cursor()
cur.execute(
"""
SELECT m.content FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.role = 'user' AND c.updated_at > ?
ORDER BY m.timestamp DESC LIMIT ?
""",
(cutoff, limit),
)
rows = cur.fetchall()
conn.close()
return [r[0][:280] for r in rows]
except Exception:
return []
def _new_journal_entries(since_ts):
"""Files in Journal/Daily/ created or modified since the last dream.
Journal entries with emotional/personal register route to Early REM per
the spec (line 71)."""
journal_path = Path(JOURNAL_DAILY)
if not journal_path.exists():
return []
new = []
for p in journal_path.rglob("*.md"):
try:
if p.stat().st_mtime > since_ts:
new.append(str(p.relative_to(journal_path)))
except OSError:
continue
return new
def _new_chunks_count(since_ts):
"""Files in the watcher state with mtime > last_dream. The spec calls
this 'what changed' (line 58). Used as the NREM novelty signal."""
state = _load_json(WATCHER_STATE, {})
count = 0
for _path, mtime in state.items():
try:
if float(mtime) > since_ts:
count += 1
except (ValueError, TypeError):
continue
return count
def _underprocessed_chunk_count():
"""Chunks below the underprocessed percentile by consolidation_count.
Biologically motivated: sharp-wave ripples bias replay toward novel /
under-encoded experience, not uniform sampling. We give NREM a pool of
'least-replayed' chunks to draw from in Stage 3."""
try:
pg = _get_pg()
cur = pg.cursor()
cur.execute(
"""
WITH t AS (
SELECT percentile_cont(%s) WITHIN GROUP (ORDER BY consolidation_count)
AS threshold
FROM embeddings
)
SELECT COUNT(*) FROM embeddings, t
WHERE consolidation_count <= t.threshold
""",
(UNDERPROCESSED_PERCENTILE,),
)
result = cur.fetchone()[0]
pg.close()
return int(result or 0)
except Exception:
return 0
# ─── Stage 1: observe_corpus ────────────────────────────────────────────────
def observe_corpus():
"""Build the signal vector consumed by select_mode and (downstream) by
retrieve. Concrete observations only — no interpretation. Each key is
a direct measurement from the corpus, watcher, journal, or conversation
log.
Returns a dict with:
now_ts -- current Unix timestamp
last_dream_ts -- last completed dream timestamp (0 if never)
days_since_dream -- float; inf if never dreamed
new_chunks -- count of files newer than last_dream
new_journal_entries -- list of Journal/Daily/*.md filenames since last_dream
recent_questions -- user-turn content from last 14 days
underprocessed_count -- chunks in the bottom 25% by consolidation_count
"""
state = _load_json(DREAMER_STATE, {})
last_dream_ts = float(state.get("last_dream_timestamp", 0) or 0)
now_ts = datetime.now().timestamp()
return {
"now_ts": now_ts,
"last_dream_ts": last_dream_ts,
"days_since_dream": (now_ts - last_dream_ts) / 86400 if last_dream_ts else float("inf"),
"new_chunks": _new_chunks_count(last_dream_ts),
"new_journal_entries": _new_journal_entries(last_dream_ts),
"recent_questions": _recent_user_questions(),
"underprocessed_count": _underprocessed_chunk_count(),
}
# ─── Stage 2: select_mode ───────────────────────────────────────────────────
def select_mode(signal, task=None, explicit_mode=None):
"""Return one of {'nrem', 'early-rem', 'late-rem', 'lucid'}. Never None.
The dreamer fires every scheduled night. The earlier "go quiet on null
delta" rule was a synthesis-doc invention that didn't match the actual
desired UX — the original dreamer always dreamed, even if it repeated
itself. The cure for repetition lives in the retrieve layer
(LLM-generated queries from the observation signal, MMR diversity,
cursor bias toward under-processed chunks), not in skipping nights.
Routing logic:
- explicit_mode argument wins
- task supplied → 'lucid' (question-anchored)
- days_since_dream ≥ STALENESS_TRIGGER_DAYS → 'late-rem' (shake loose
via cross-domain pairs when nothing's been added in a while)
- new journal entry → 'early-rem' (emotional/personal register)
- default → 'nrem' (replay-and-consolidation; always has something to
do because the corpus always has under-processed chunks)
"""
if explicit_mode:
return explicit_mode
if task:
return "lucid"
days_since = signal["days_since_dream"]
new_journal = signal["new_journal_entries"]
if days_since >= STALENESS_TRIGGER_DAYS:
return "late-rem"
if new_journal:
return "early-rem"
return "nrem"
# ─── CLI for manual inspection ──────────────────────────────────────────────
if __name__ == "__main__":
signal = observe_corpus()
short = {k: v for k, v in signal.items() if k != "recent_questions"}
print("Signal (excluding recent_questions):")
print(json.dumps(short, indent=2, default=str))
print(f"\nRecent user questions ({len(signal['recent_questions'])}):")
for q in signal["recent_questions"][:5]:
print(f" - {q[:140]}")
mode = select_mode(signal)
print(f"\nselect_mode() → {mode!r}")
+11
View File
@@ -75,6 +75,17 @@ async def lifespan(app: FastAPI):
max_coroutines=2,
)
await graphiti_instance.build_indices_and_constraints()
# Bridge driver._search_ops to driver.search_interface — graphiti-core 0.29.0
# builds FalkorSearchOperations as driver._search_ops in FalkorDriver.__init__
# but never assigns it to driver.search_interface. search_utils.py dispatches
# on driver.search_interface; without this assignment it falls back to
# interpreted-Cypher cosine math (full table scans). Together with the
# vendored patches in graphiti_patches/, this activates FalkorDB's native
# vector index for entity dedup similarity search.
if (hasattr(graphiti_instance.driver, "_search_ops")
and graphiti_instance.driver.search_interface is None):
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
yield
await graphiti_instance.close()
+136
View File
@@ -0,0 +1,136 @@
"""
Orientation Indexer — feeds Stage 2's document-level orientations into pgvector
so they're searchable alongside chunk text by the retrieve_documents tool.
Each completed row in stage_3_queue has an `orientation` string (active_frames
+ frame_relationships + extraction_orientation + one_sentence_summary) that
describes the document at a conceptual level. Indexing it as its own row in
the embeddings table gives the cross-encoder a second surface to rank against
"what is this document about" rather than just "what does this chunk say."
This worker is part of the "read-only Graphiti + orientation-into-pgvector"
plan B that replaced the Stage 3 → Graphiti write path. The graph layer is
queried directly via the search_facts chat tool; orientations land here.
State tracking: a row is considered indexed if the embeddings table already
holds a row with source=<source> and metadata->>'kind'='orientation'. The
worker is idempotent — restart-safe, resumable.
Runs as systemd: aaronai-orientation-indexer.service
"""
import logging
import os
import sys
import time
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
from sentence_transformers import SentenceTransformer
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
sys.path.insert(0, str(Path(__file__).parent))
from encoding import write_embeddings_batch
PG_DSN = os.getenv("PG_DSN")
EMBED_MODEL = "all-MiniLM-L6-v2"
BATCH_SIZE = 25
POLL_INTERVAL_SECS = 30
LOG_FILE = "/var/log/aaronai/orientation-indexer.log"
HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE, mode="a")],
)
log = logging.getLogger("orientation-indexer")
def get_pg():
return psycopg2.connect(PG_DSN)
def fetch_unindexed(cur, limit):
"""Pull stage_3_queue rows with a non-null orientation whose orientation
hasn't been written to the embeddings table yet."""
cur.execute(
"""
SELECT s.source, s.orientation
FROM stage_3_queue s
WHERE s.orientation IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM embeddings e
WHERE e.source = s.source
AND e.metadata->>'kind' = 'orientation'
)
ORDER BY s.enqueued_at
LIMIT %s
""",
(limit,),
)
return cur.fetchall()
def _row_for(source: str, orientation: str, embedding) -> dict:
"""Build an embeddings row for the orientation. id is deterministic so
re-runs don't create duplicates if the unique check above ever races."""
import hashlib
chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient"
return {
"id": chunk_id,
"document": orientation,
"embedding": embedding,
"source": source,
"type": "document",
"metadata": {
"source": source,
"kind": "orientation",
},
}
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def main():
log.info("Orientation indexer starting...")
log.info(f"Loading embedding model: {EMBED_MODEL}")
embedder = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
while True:
write_heartbeat()
try:
pg = get_pg()
try:
cur = pg.cursor()
rows = fetch_unindexed(cur, BATCH_SIZE)
if not rows:
pg.close()
time.sleep(POLL_INTERVAL_SECS)
continue
orientations = [r[1] for r in rows]
embeddings = embedder.encode(orientations).tolist()
batch = [
_row_for(source, orient, emb)
for (source, orient), emb in zip(rows, embeddings)
]
write_embeddings_batch(pg, batch)
log.info(f"Indexed {len(batch)} orientation(s)")
finally:
pg.close()
except Exception as e:
log.error(f"Indexing loop iteration failed: {e}")
time.sleep(POLL_INTERVAL_SECS)
if __name__ == "__main__":
main()