Compare commits
5 Commits
30beeb3a26
...
7f07972109
| Author | SHA1 | Date | |
|---|---|---|---|
| 7f07972109 | |||
| f645b74b1c | |||
| c0e6159b5e | |||
| d7b2a850c4 | |||
| a0bf280075 |
@@ -0,0 +1,4 @@
|
||||
# Local backups created by apply.sh — environment state, not source.
|
||||
# Keeping these out of version control prevents repo bloat and avoids
|
||||
# checking in graphiti-core's Apache-2.0 source under our repo's tree.
|
||||
backups/
|
||||
@@ -0,0 +1,58 @@
|
||||
# graphiti-core Patches — FalkorDB Vector Index Support
|
||||
|
||||
Vendored patches against graphiti-core 0.29.0 adding native FalkorDB
|
||||
vector index support. Three files modified, all under
|
||||
`graphiti_core/driver/falkordb/` and `graphiti_core/graph_queries.py`.
|
||||
No changes to Neo4j or Kuzu code paths.
|
||||
|
||||
## Why this exists
|
||||
|
||||
graphiti-core's FalkorDB driver uses interpreted Cypher cosine math
|
||||
(`vec.cosineDistance(...)`) for similarity search. Each query becomes a
|
||||
full table scan over Entity/RELATES_TO/Community nodes. At ~4,000+
|
||||
entities, single-episode ingest's resolve-against-existing-graph step
|
||||
takes 8+ minutes and bulk ingest hangs FalkorDB. FalkorDB itself
|
||||
supports `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||
procedures backed by HNSW indexes; graphiti-core's driver doesn't use
|
||||
them.
|
||||
|
||||
These patches:
|
||||
|
||||
1. Add `get_vector_indices()` to `graph_queries.py` returning CREATE
|
||||
VECTOR INDEX statements for FalkorDB on Entity.name_embedding,
|
||||
RELATES_TO.fact_embedding, and Community.name_embedding.
|
||||
2. Extend `falkordb_driver.py:build_indices_and_constraints()` to create
|
||||
the vector indexes alongside range and fulltext indexes.
|
||||
3. Rewrite the three vector-similarity call sites in
|
||||
`falkordb/operations/search_ops.py` to use
|
||||
`db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||
instead of full-scan cosine math. Over-fetches by a configurable
|
||||
multiplier to handle filter rejections.
|
||||
|
||||
## Files
|
||||
|
||||
| Patched file | Source |
|
||||
|---|---|
|
||||
| `graphiti_core/graph_queries.py` | Adds `get_vector_indices()` |
|
||||
| `graphiti_core/driver/falkordb/falkordb_driver.py` | Extends `build_indices_and_constraints` |
|
||||
| `graphiti_core/driver/falkordb/operations/search_ops.py` | Three query rewrites |
|
||||
|
||||
## How to apply
|
||||
|
||||
`./apply.sh` — backs up the originals into `./backups/<timestamp>/`
|
||||
and copies the patched files over.
|
||||
|
||||
## How to revert
|
||||
|
||||
Move the timestamped backup back over the venv:
|
||||
|
||||
cp backups/<ts>/graph_queries.py /home/aaron/aaronai/venv/lib/python3.12/site-packages/graphiti_core/graph_queries.py
|
||||
# ...etc
|
||||
|
||||
## Upstream candidate
|
||||
|
||||
Documented gap (issue #1263 references it indirectly via vector store
|
||||
overlay RFC). Maintainers' attention is on Milvus/external vector DB
|
||||
overlay; this patch is the FalkorDB-native alternative for users who
|
||||
don't want a separate vector DB. Consider PR after empirical validation
|
||||
in production.
|
||||
Executable
+77
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env bash
|
||||
# apply.sh — Apply the BirdAI vendored graphiti-core patches.
|
||||
#
|
||||
# Backs up the original venv files into ./backups/<timestamp>/ before
|
||||
# overwriting. The backup directory layout mirrors the venv layout so a
|
||||
# revert is just a tree copy back.
|
||||
#
|
||||
# Usage: ./apply.sh
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
PATCH_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
VENV_BASE="/home/aaron/aaronai/venv/lib/python3.12/site-packages"
|
||||
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
|
||||
BACKUP_DIR="$PATCH_DIR/backups/$TIMESTAMP"
|
||||
|
||||
# Files to patch — paths relative to graphiti_core/.
|
||||
FILES=(
|
||||
"graph_queries.py"
|
||||
"driver/falkordb_driver.py"
|
||||
"driver/falkordb/operations/search_ops.py"
|
||||
)
|
||||
|
||||
echo "graphiti-core vendored patch apply — BirdAI"
|
||||
echo "Patch directory: $PATCH_DIR"
|
||||
echo "Venv target: $VENV_BASE/graphiti_core/"
|
||||
echo "Backup to: $BACKUP_DIR"
|
||||
echo
|
||||
|
||||
# Pre-flight: confirm all source patch files exist.
|
||||
for rel in "${FILES[@]}"; do
|
||||
if [ ! -f "$PATCH_DIR/graphiti_core/$rel" ]; then
|
||||
echo "ERROR: missing patch file: $PATCH_DIR/graphiti_core/$rel" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Pre-flight: confirm all target venv files exist.
|
||||
for rel in "${FILES[@]}"; do
|
||||
if [ ! -f "$VENV_BASE/graphiti_core/$rel" ]; then
|
||||
echo "ERROR: missing venv file: $VENV_BASE/graphiti_core/$rel" >&2
|
||||
echo " graphiti-core may not be installed, or version differs from 0.29.0." >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Backup originals.
|
||||
echo "[1/3] Backing up originals..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
backup_path="$BACKUP_DIR/graphiti_core/$rel"
|
||||
mkdir -p "$(dirname "$backup_path")"
|
||||
cp "$VENV_BASE/graphiti_core/$rel" "$backup_path"
|
||||
echo " backed up: $rel"
|
||||
done
|
||||
echo
|
||||
|
||||
# Apply patches by copying.
|
||||
echo "[2/3] Applying patches..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
cp "$PATCH_DIR/graphiti_core/$rel" "$VENV_BASE/graphiti_core/$rel"
|
||||
echo " patched: $rel"
|
||||
done
|
||||
echo
|
||||
|
||||
# Sanity check: confirm patched files have the marker.
|
||||
echo "[3/3] Verifying patched files..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
if grep -q "PATCHED 2026-05-02" "$VENV_BASE/graphiti_core/$rel"; then
|
||||
echo " OK: $rel contains patch marker"
|
||||
else
|
||||
echo " WARNING: $rel missing patch marker (may be expected for graph_queries.py — its docstring uses the marker only in the module header)"
|
||||
fi
|
||||
done
|
||||
echo
|
||||
echo "Done. Backup: $BACKUP_DIR"
|
||||
echo "Restart the sidecar to pick up changes:"
|
||||
echo " sudo systemctl restart aaronai-graphiti.service"
|
||||
@@ -0,0 +1,904 @@
|
||||
"""
|
||||
Copyright 2024, Zep Software, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from graphiti_core.driver.driver import GraphProvider
|
||||
from graphiti_core.driver.falkordb import STOPWORDS
|
||||
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||
from graphiti_core.driver.query_executor import QueryExecutor
|
||||
from graphiti_core.driver.record_parsers import (
|
||||
community_node_from_record,
|
||||
entity_edge_from_record,
|
||||
entity_node_from_record,
|
||||
episodic_node_from_record,
|
||||
)
|
||||
from graphiti_core.edges import EntityEdge
|
||||
from graphiti_core.graph_queries import (
|
||||
get_nodes_query,
|
||||
get_relationships_query,
|
||||
get_vector_cosine_func_query,
|
||||
)
|
||||
from graphiti_core.models.edges.edge_db_queries import get_entity_edge_return_query
|
||||
from graphiti_core.models.nodes.node_db_queries import (
|
||||
COMMUNITY_NODE_RETURN,
|
||||
EPISODIC_NODE_RETURN,
|
||||
get_entity_node_return_query,
|
||||
)
|
||||
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
|
||||
from graphiti_core.search.search_filters import (
|
||||
SearchFilters,
|
||||
edge_search_filter_query_constructor,
|
||||
node_search_filter_query_constructor,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_QUERY_LENGTH = 128
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Vector index dispatcher (PATCHED 2026-05-02, BirdAI vendored patch).
|
||||
#
|
||||
# graphiti-core's FalkorDB driver historically composed similarity queries
|
||||
# using `vec.cosineDistance(...)` in interpreted Cypher, which produces a
|
||||
# full-table scan for every search. FalkorDB supports native vector indexes
|
||||
# via `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`;
|
||||
# this dispatcher uses them when present and falls back to the cosine math
|
||||
# otherwise.
|
||||
#
|
||||
# Index existence is checked once per (label, attribute, entity_type) and
|
||||
# cached at module scope. The cache should be invalidated whenever
|
||||
# `build_indices_and_constraints` runs (since indexes may have been created
|
||||
# or dropped). FalkorDriver.build_indices_and_constraints is patched to
|
||||
# call `_invalidate_falkordb_vector_index_cache()` after building.
|
||||
#
|
||||
# Over-fetch factor (VECTOR_INDEX_CANDIDATE_MULTIPLIER from graph_queries)
|
||||
# preserves recall when WHERE filters reject some of the top-k candidates.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from graphiti_core.graph_queries import (
|
||||
VECTOR_INDEX_CANDIDATE_MULTIPLIER,
|
||||
get_vector_cosine_func_query,
|
||||
)
|
||||
|
||||
# Cache: key = (label, attribute, entity_type), value = bool
|
||||
# entity_type is 'NODE' or 'RELATIONSHIP'.
|
||||
_FALKORDB_VECTOR_INDEX_CACHE: dict[tuple[str, str, str], bool] = {}
|
||||
|
||||
|
||||
def _invalidate_falkordb_vector_index_cache() -> None:
|
||||
"""Clear the vector-index existence cache. Call after build_indices_and_constraints."""
|
||||
_FALKORDB_VECTOR_INDEX_CACHE.clear()
|
||||
|
||||
|
||||
async def _falkordb_vector_index_exists(
|
||||
executor: QueryExecutor,
|
||||
label: str,
|
||||
attribute: str,
|
||||
entity_type: str,
|
||||
) -> bool:
|
||||
"""Check whether a FalkorDB vector index exists for the given target.
|
||||
|
||||
entity_type is 'NODE' for node-label indexes, 'RELATIONSHIP' for edge-type indexes.
|
||||
Result is cached at module scope; call _invalidate_falkordb_vector_index_cache()
|
||||
after building or dropping indexes.
|
||||
"""
|
||||
key = (label, attribute, entity_type)
|
||||
if key in _FALKORDB_VECTOR_INDEX_CACHE:
|
||||
return _FALKORDB_VECTOR_INDEX_CACHE[key]
|
||||
|
||||
try:
|
||||
records, _, _ = await executor.execute_query(
|
||||
"CALL db.indexes() YIELD label, properties, types, entitytype "
|
||||
"RETURN label, properties, types, entitytype"
|
||||
)
|
||||
except Exception as e:
|
||||
# If we cannot enumerate indexes, fall back to "no index" rather than
|
||||
# propagating the error. The fallback cosine-math path is correct,
|
||||
# just slower.
|
||||
logger.warning(f"FalkorDB vector index probe failed; assuming none exist: {e}")
|
||||
_FALKORDB_VECTOR_INDEX_CACHE[key] = False
|
||||
return False
|
||||
|
||||
found = False
|
||||
for r in records:
|
||||
# Records come back as dict-like rows keyed by column name (not
|
||||
# tuples). Access by string keys matching the YIELD clause above.
|
||||
rec_label = r.get('label') if hasattr(r, 'get') else r['label']
|
||||
rec_props = r.get('properties') if hasattr(r, 'get') else r['properties']
|
||||
rec_types = r.get('types') if hasattr(r, 'get') else r['types']
|
||||
rec_entitytype = r.get('entitytype') if hasattr(r, 'get') else r['entitytype']
|
||||
if rec_props is None:
|
||||
rec_props = []
|
||||
if rec_types is None:
|
||||
rec_types = {}
|
||||
|
||||
if rec_label != label:
|
||||
continue
|
||||
if rec_entitytype is not None and rec_entitytype != entity_type:
|
||||
continue
|
||||
if attribute not in rec_props:
|
||||
continue
|
||||
|
||||
# rec_types is a dict like {attribute: ['VECTOR', ...], ...} or sometimes
|
||||
# a flat list — handle both shapes.
|
||||
if isinstance(rec_types, dict):
|
||||
attr_types = rec_types.get(attribute, [])
|
||||
else:
|
||||
attr_types = rec_types
|
||||
if 'VECTOR' in attr_types:
|
||||
found = True
|
||||
break
|
||||
|
||||
_FALKORDB_VECTOR_INDEX_CACHE[key] = found
|
||||
return found
|
||||
|
||||
|
||||
def _falkordb_vector_node_search_cypher(
|
||||
label: str,
|
||||
embedding_attr: str,
|
||||
search_vector_param: str,
|
||||
use_index: bool,
|
||||
) -> tuple[str, str]:
|
||||
"""Build the cypher prefix and node-binding for a node-vector search.
|
||||
|
||||
Returns (prefix, node_var) where:
|
||||
- prefix is the Cypher fragment that binds the node variable and a
|
||||
`score` variable. With index, it's a CALL ... YIELD; without, it's
|
||||
a MATCH plus WITH cosine math.
|
||||
- node_var is the variable name the caller's downstream Cypher should
|
||||
reference (always 'n' here for parity with the existing code).
|
||||
|
||||
The caller appends WHERE filters and RETURN/ORDER BY/LIMIT as usual.
|
||||
The over-fetch parameter `$candidate_k` must be passed by the caller
|
||||
when use_index is True.
|
||||
"""
|
||||
if use_index:
|
||||
return (
|
||||
f"CALL db.idx.vector.queryNodes("
|
||||
f"'{label}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||
f") YIELD node, score "
|
||||
f"WITH node AS n, score "
|
||||
), "n"
|
||||
# Fallback: original cosine math path
|
||||
cosine = get_vector_cosine_func_query(
|
||||
f"n.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||
)
|
||||
return (
|
||||
f"MATCH (n:{label}) "
|
||||
f"WITH n, {cosine} AS score "
|
||||
), "n"
|
||||
|
||||
|
||||
def _falkordb_vector_edge_search_cypher(
|
||||
relationship_type: str,
|
||||
embedding_attr: str,
|
||||
search_vector_param: str,
|
||||
use_index: bool,
|
||||
) -> tuple[str, str]:
|
||||
"""Build the cypher prefix and edge-binding for an edge-vector search.
|
||||
|
||||
Returns (prefix, edge_var). With the index, the procedure binds the
|
||||
relationship variable; we then MATCH source and target via the existing
|
||||
edge to recover (n)-[e]->(m). Without the index, it's the original
|
||||
MATCH-and-cosine path.
|
||||
|
||||
Variable name is 'e' for parity with existing code; source/target are
|
||||
'n' and 'm' respectively, also for parity.
|
||||
"""
|
||||
if use_index:
|
||||
return (
|
||||
f"CALL db.idx.vector.queryRelationships("
|
||||
f"'{relationship_type}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||
f") YIELD relationship, score "
|
||||
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||
f"WHERE e = relationship "
|
||||
f"WITH DISTINCT e, n, m, score "
|
||||
), "e"
|
||||
# Fallback
|
||||
cosine = get_vector_cosine_func_query(
|
||||
f"e.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||
)
|
||||
return (
|
||||
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||
f"WITH DISTINCT e, n, m, {cosine} AS score "
|
||||
), "e"
|
||||
|
||||
|
||||
|
||||
# FalkorDB separator characters that break text into tokens
|
||||
_SEPARATOR_MAP = str.maketrans(
|
||||
{
|
||||
',': ' ',
|
||||
'.': ' ',
|
||||
'<': ' ',
|
||||
'>': ' ',
|
||||
'{': ' ',
|
||||
'}': ' ',
|
||||
'[': ' ',
|
||||
']': ' ',
|
||||
'"': ' ',
|
||||
"'": ' ',
|
||||
':': ' ',
|
||||
';': ' ',
|
||||
'!': ' ',
|
||||
'@': ' ',
|
||||
'#': ' ',
|
||||
'$': ' ',
|
||||
'%': ' ',
|
||||
'^': ' ',
|
||||
'&': ' ',
|
||||
'*': ' ',
|
||||
'(': ' ',
|
||||
')': ' ',
|
||||
'-': ' ',
|
||||
'+': ' ',
|
||||
'=': ' ',
|
||||
'~': ' ',
|
||||
'?': ' ',
|
||||
'|': ' ',
|
||||
'/': ' ',
|
||||
'\\': ' ',
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _sanitize(query: str) -> str:
|
||||
"""Replace FalkorDB special characters with whitespace."""
|
||||
sanitized = query.translate(_SEPARATOR_MAP)
|
||||
return ' '.join(sanitized.split())
|
||||
|
||||
|
||||
def _build_falkor_fulltext_query(
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
max_query_length: int = MAX_QUERY_LENGTH,
|
||||
) -> str:
|
||||
"""Build a fulltext query string for FalkorDB using RedisSearch syntax."""
|
||||
if group_ids is None or len(group_ids) == 0:
|
||||
group_filter = ''
|
||||
else:
|
||||
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||
group_values = '|'.join(escaped_group_ids)
|
||||
group_filter = f'(@group_id:{group_values})'
|
||||
|
||||
sanitized_query = _sanitize(query)
|
||||
|
||||
# Remove stopwords and empty tokens
|
||||
query_words = sanitized_query.split()
|
||||
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||
sanitized_query = ' | '.join(filtered_words)
|
||||
|
||||
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||
return ''
|
||||
|
||||
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||
return full_query
|
||||
|
||||
|
||||
class FalkorSearchOperations(SearchOperations):
|
||||
# --- Node search ---
|
||||
|
||||
async def node_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'node_name_and_summary', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ 'YIELD node AS n, score'
|
||||
+ filter_query
|
||||
+ """
|
||||
WITH n, score
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
query=fuzzy_query,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
async def node_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[EntityNode]:
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index when available; fall back to interpreted-Cypher cosine math
|
||||
# otherwise. The filter clause's position changes between paths
|
||||
# (after MATCH for fallback, after YIELD for index path), but the
|
||||
# filter expressions themselves are identical because they reference
|
||||
# the bound variable `n` either way.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'Entity', 'name_embedding', 'NODE'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||
'Entity', 'name_embedding', '$search_vector', use_index
|
||||
)
|
||||
where_clauses = []
|
||||
if filter_query:
|
||||
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**filter_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
async def node_bfs_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
origin_uuids: list[str],
|
||||
search_filter: SearchFilters,
|
||||
max_depth: int,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityNode]:
|
||||
if not origin_uuids or max_depth < 1:
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_queries.append('origin.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' AND ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
f"""
|
||||
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||
MATCH (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(n:Entity)
|
||||
WHERE n.group_id = origin.group_id
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
bfs_origin_node_uuids=origin_uuids,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
# --- Edge search ---
|
||||
|
||||
async def edge_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityEdge]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
get_relationships_query(
|
||||
'edge_name_and_fact', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD relationship AS rel, score
|
||||
MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
WITH e, score, n, m
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
query=fuzzy_query,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
async def edge_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
source_node_uuid: str | None,
|
||||
target_node_uuid: str | None,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[EntityEdge]:
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
if source_node_uuid is not None:
|
||||
filter_params['source_uuid'] = source_node_uuid
|
||||
filter_queries.append('n.uuid = $source_uuid')
|
||||
|
||||
if target_node_uuid is not None:
|
||||
filter_params['target_uuid'] = target_node_uuid
|
||||
filter_queries.append('m.uuid = $target_uuid')
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index on RELATES_TO.fact_embedding when available. The unindexed
|
||||
# fallback is the same MATCH-and-cosine math that previously hung
|
||||
# for 6+ minutes on a 4,000-entity graph; this is the load-bearing
|
||||
# call site that motivated the patch.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'RELATES_TO', 'fact_embedding', 'RELATIONSHIP'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_edge_search_cypher(
|
||||
'RELATES_TO', 'fact_embedding', '$search_vector', use_index
|
||||
)
|
||||
where_clauses = []
|
||||
if filter_query:
|
||||
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**filter_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
async def edge_bfs_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
origin_uuids: list[str],
|
||||
max_depth: int,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityEdge]:
|
||||
if not origin_uuids:
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
f"""
|
||||
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||
MATCH path = (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(:Entity)
|
||||
UNWIND relationships(path) AS rel
|
||||
MATCH (n:Entity)-[e:RELATES_TO {{uuid: rel.uuid}}]-(m:Entity)
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
RETURN DISTINCT
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
bfs_origin_node_uuids=origin_uuids,
|
||||
depth=max_depth,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
# --- Episode search ---
|
||||
|
||||
async def episode_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters, # noqa: ARG002
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EpisodicNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_params: dict[str, Any] = {}
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query += '\nAND e.group_id IN $group_ids'
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'episode_content', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD node AS episode, score
|
||||
MATCH (e:Episodic)
|
||||
WHERE e.uuid = episode.uuid
|
||||
"""
|
||||
+ group_filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ EPISODIC_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||
)
|
||||
|
||||
return [episodic_node_from_record(r) for r in records]
|
||||
|
||||
# --- Community search ---
|
||||
|
||||
async def community_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[CommunityNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_params: dict[str, Any] = {}
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query = 'WHERE c.group_id IN $group_ids'
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'community_name', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD node AS c, score
|
||||
WITH c, score
|
||||
"""
|
||||
+ group_filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ COMMUNITY_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||
)
|
||||
|
||||
return [community_node_from_record(r) for r in records]
|
||||
|
||||
async def community_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[CommunityNode]:
|
||||
query_params: dict[str, Any] = {}
|
||||
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query += ' WHERE c.group_id IN $group_ids'
|
||||
query_params['group_ids'] = group_ids
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index on Community.name_embedding when available. Note: the existing
|
||||
# filter is built into `group_filter_query` (already prefixed with
|
||||
# ' WHERE ' if non-empty) and uses variable `c`. The dispatcher binds
|
||||
# the node as `n` for parity with the helper signature, then we
|
||||
# re-bind to `c` via WITH so the rest of the query is unchanged.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'Community', 'name_embedding', 'NODE'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||
'Community', 'name_embedding', '$search_vector', use_index
|
||||
)
|
||||
prefix = prefix + ' WITH n AS c, score '
|
||||
where_clauses = []
|
||||
if group_filter_query:
|
||||
where_clauses.append(group_filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ COMMUNITY_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**query_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [community_node_from_record(r) for r in records]
|
||||
|
||||
# --- Rerankers ---
|
||||
|
||||
async def node_distance_reranker(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
node_uuids: list[str],
|
||||
center_node_uuid: str,
|
||||
min_score: float = 0,
|
||||
) -> list[EntityNode]:
|
||||
filtered_uuids = [u for u in node_uuids if u != center_node_uuid]
|
||||
scores: dict[str, float] = {center_node_uuid: 0.0}
|
||||
|
||||
cypher = """
|
||||
UNWIND $node_uuids AS node_uuid
|
||||
MATCH (center:Entity {uuid: $center_uuid})-[:RELATES_TO]-(n:Entity {uuid: node_uuid})
|
||||
RETURN 1 AS score, node_uuid AS uuid
|
||||
"""
|
||||
|
||||
results, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
node_uuids=filtered_uuids,
|
||||
center_uuid=center_node_uuid,
|
||||
)
|
||||
|
||||
for result in results:
|
||||
scores[result['uuid']] = result['score']
|
||||
|
||||
for uuid in filtered_uuids:
|
||||
if uuid not in scores:
|
||||
scores[uuid] = float('inf')
|
||||
|
||||
filtered_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||
|
||||
if center_node_uuid in node_uuids:
|
||||
scores[center_node_uuid] = 0.1
|
||||
filtered_uuids = [center_node_uuid] + filtered_uuids
|
||||
|
||||
reranked_uuids = [u for u in filtered_uuids if (1 / scores[u]) >= min_score]
|
||||
|
||||
if not reranked_uuids:
|
||||
return []
|
||||
|
||||
get_query = """
|
||||
MATCH (n:Entity)
|
||||
WHERE n.uuid IN $uuids
|
||||
RETURN
|
||||
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
|
||||
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||
|
||||
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||
|
||||
async def episode_mentions_reranker(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
node_uuids: list[str],
|
||||
min_score: float = 0,
|
||||
) -> list[EntityNode]:
|
||||
if not node_uuids:
|
||||
return []
|
||||
|
||||
scores: dict[str, float] = {}
|
||||
|
||||
results, _, _ = await executor.execute_query(
|
||||
"""
|
||||
UNWIND $node_uuids AS node_uuid
|
||||
MATCH (episode:Episodic)-[r:MENTIONS]->(n:Entity {uuid: node_uuid})
|
||||
RETURN count(*) AS score, n.uuid AS uuid
|
||||
""",
|
||||
node_uuids=node_uuids,
|
||||
)
|
||||
|
||||
for result in results:
|
||||
scores[result['uuid']] = result['score']
|
||||
|
||||
for uuid in node_uuids:
|
||||
if uuid not in scores:
|
||||
scores[uuid] = float('inf')
|
||||
|
||||
sorted_uuids = list(node_uuids)
|
||||
sorted_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||
|
||||
reranked_uuids = [u for u in sorted_uuids if scores[u] >= min_score]
|
||||
|
||||
if not reranked_uuids:
|
||||
return []
|
||||
|
||||
get_query = """
|
||||
MATCH (n:Entity)
|
||||
WHERE n.uuid IN $uuids
|
||||
RETURN
|
||||
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
|
||||
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||
|
||||
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||
|
||||
# --- Filter builders ---
|
||||
|
||||
def build_node_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filters, GraphProvider.FALKORDB
|
||||
)
|
||||
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||
|
||||
def build_edge_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filters, GraphProvider.FALKORDB
|
||||
)
|
||||
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||
|
||||
# --- Fulltext query builder ---
|
||||
|
||||
def build_fulltext_query(
|
||||
self,
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
max_query_length: int = MAX_QUERY_LENGTH,
|
||||
) -> str:
|
||||
return _build_falkor_fulltext_query(query, group_ids, max_query_length)
|
||||
@@ -0,0 +1,444 @@
|
||||
"""
|
||||
Copyright 2024, Zep Software, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from falkordb import Graph as FalkorGraph
|
||||
from falkordb.asyncio import FalkorDB
|
||||
else:
|
||||
try:
|
||||
from falkordb import Graph as FalkorGraph
|
||||
from falkordb.asyncio import FalkorDB
|
||||
except ImportError:
|
||||
# If falkordb is not installed, raise an ImportError
|
||||
raise ImportError(
|
||||
'falkordb is required for FalkorDriver. '
|
||||
'Install it with: pip install graphiti-core[falkordb]'
|
||||
) from None
|
||||
|
||||
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
|
||||
from graphiti_core.driver.falkordb import STOPWORDS as STOPWORDS
|
||||
from graphiti_core.driver.falkordb.operations.community_edge_ops import (
|
||||
FalkorCommunityEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.community_node_ops import (
|
||||
FalkorCommunityNodeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.entity_edge_ops import FalkorEntityEdgeOperations
|
||||
from graphiti_core.driver.falkordb.operations.entity_node_ops import FalkorEntityNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.episode_node_ops import FalkorEpisodeNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.episodic_edge_ops import FalkorEpisodicEdgeOperations
|
||||
from graphiti_core.driver.falkordb.operations.graph_ops import FalkorGraphMaintenanceOperations
|
||||
from graphiti_core.driver.falkordb.operations.has_episode_edge_ops import (
|
||||
FalkorHasEpisodeEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.next_episode_edge_ops import (
|
||||
FalkorNextEpisodeEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.saga_node_ops import FalkorSagaNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.search_ops import FalkorSearchOperations
|
||||
from graphiti_core.driver.operations.community_edge_ops import CommunityEdgeOperations
|
||||
from graphiti_core.driver.operations.community_node_ops import CommunityNodeOperations
|
||||
from graphiti_core.driver.operations.entity_edge_ops import EntityEdgeOperations
|
||||
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
|
||||
from graphiti_core.driver.operations.episode_node_ops import EpisodeNodeOperations
|
||||
from graphiti_core.driver.operations.episodic_edge_ops import EpisodicEdgeOperations
|
||||
from graphiti_core.driver.operations.graph_ops import GraphMaintenanceOperations
|
||||
from graphiti_core.driver.operations.has_episode_edge_ops import HasEpisodeEdgeOperations
|
||||
from graphiti_core.driver.operations.next_episode_edge_ops import NextEpisodeEdgeOperations
|
||||
from graphiti_core.driver.operations.saga_node_ops import SagaNodeOperations
|
||||
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices, get_vector_indices
|
||||
from graphiti_core.helpers import validate_group_ids
|
||||
from graphiti_core.utils.datetime_utils import convert_datetimes_to_strings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FalkorDriverSession(GraphDriverSession):
|
||||
provider = GraphProvider.FALKORDB
|
||||
|
||||
def __init__(self, graph: FalkorGraph):
|
||||
self.graph = graph
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
# No cleanup needed for Falkor, but method must exist
|
||||
pass
|
||||
|
||||
async def close(self):
|
||||
# No explicit close needed for FalkorDB, but method must exist
|
||||
pass
|
||||
|
||||
async def execute_write(self, func, *args, **kwargs):
|
||||
# Directly await the provided async function with `self` as the transaction/session
|
||||
return await func(self, *args, **kwargs)
|
||||
|
||||
async def run(self, query: str | list, **kwargs: Any) -> Any:
|
||||
# FalkorDB does not support argument for Label Set, so it's converted into an array of queries
|
||||
if isinstance(query, list):
|
||||
for cypher, params in query:
|
||||
params = convert_datetimes_to_strings(params)
|
||||
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
|
||||
else:
|
||||
params = dict(kwargs)
|
||||
params = convert_datetimes_to_strings(params)
|
||||
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
|
||||
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
|
||||
return None
|
||||
|
||||
|
||||
class FalkorDriver(GraphDriver):
|
||||
provider = GraphProvider.FALKORDB
|
||||
default_group_id: str = '\\_'
|
||||
fulltext_syntax: str = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries
|
||||
aoss_client: None = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = 'localhost',
|
||||
port: int = 6379,
|
||||
username: str | None = None,
|
||||
password: str | None = None,
|
||||
falkor_db: FalkorDB | None = None,
|
||||
database: str = 'default_db',
|
||||
):
|
||||
"""
|
||||
Initialize the FalkorDB driver.
|
||||
|
||||
FalkorDB is a multi-tenant graph database.
|
||||
To connect, provide the host and port.
|
||||
The default parameters assume a local (on-premises) FalkorDB instance.
|
||||
|
||||
Args:
|
||||
host (str): The host where FalkorDB is running.
|
||||
port (int): The port on which FalkorDB is listening.
|
||||
username (str | None): The username for authentication (if required).
|
||||
password (str | None): The password for authentication (if required).
|
||||
falkor_db (FalkorDB | None): An existing FalkorDB instance to use instead of creating a new one.
|
||||
database (str): The name of the database to connect to. Defaults to 'default_db'.
|
||||
"""
|
||||
super().__init__()
|
||||
self._database = database
|
||||
if falkor_db is not None:
|
||||
# If a FalkorDB instance is provided, use it directly
|
||||
self.client = falkor_db
|
||||
else:
|
||||
self.client = FalkorDB(host=host, port=port, username=username, password=password)
|
||||
|
||||
# Instantiate FalkorDB operations
|
||||
self._entity_node_ops = FalkorEntityNodeOperations()
|
||||
self._episode_node_ops = FalkorEpisodeNodeOperations()
|
||||
self._community_node_ops = FalkorCommunityNodeOperations()
|
||||
self._saga_node_ops = FalkorSagaNodeOperations()
|
||||
self._entity_edge_ops = FalkorEntityEdgeOperations()
|
||||
self._episodic_edge_ops = FalkorEpisodicEdgeOperations()
|
||||
self._community_edge_ops = FalkorCommunityEdgeOperations()
|
||||
self._has_episode_edge_ops = FalkorHasEpisodeEdgeOperations()
|
||||
self._next_episode_edge_ops = FalkorNextEpisodeEdgeOperations()
|
||||
self._search_ops = FalkorSearchOperations()
|
||||
self._graph_ops = FalkorGraphMaintenanceOperations()
|
||||
|
||||
# Schedule the indices and constraints to be built
|
||||
try:
|
||||
# Try to get the current event loop
|
||||
loop = asyncio.get_running_loop()
|
||||
# Schedule the build_indices_and_constraints to run
|
||||
loop.create_task(self.build_indices_and_constraints())
|
||||
except RuntimeError:
|
||||
# No event loop running, this will be handled later
|
||||
pass
|
||||
|
||||
# --- Operations properties ---
|
||||
|
||||
@property
|
||||
def entity_node_ops(self) -> EntityNodeOperations:
|
||||
return self._entity_node_ops
|
||||
|
||||
@property
|
||||
def episode_node_ops(self) -> EpisodeNodeOperations:
|
||||
return self._episode_node_ops
|
||||
|
||||
@property
|
||||
def community_node_ops(self) -> CommunityNodeOperations:
|
||||
return self._community_node_ops
|
||||
|
||||
@property
|
||||
def saga_node_ops(self) -> SagaNodeOperations:
|
||||
return self._saga_node_ops
|
||||
|
||||
@property
|
||||
def entity_edge_ops(self) -> EntityEdgeOperations:
|
||||
return self._entity_edge_ops
|
||||
|
||||
@property
|
||||
def episodic_edge_ops(self) -> EpisodicEdgeOperations:
|
||||
return self._episodic_edge_ops
|
||||
|
||||
@property
|
||||
def community_edge_ops(self) -> CommunityEdgeOperations:
|
||||
return self._community_edge_ops
|
||||
|
||||
@property
|
||||
def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations:
|
||||
return self._has_episode_edge_ops
|
||||
|
||||
@property
|
||||
def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations:
|
||||
return self._next_episode_edge_ops
|
||||
|
||||
@property
|
||||
def search_ops(self) -> SearchOperations:
|
||||
return self._search_ops
|
||||
|
||||
@property
|
||||
def graph_ops(self) -> GraphMaintenanceOperations:
|
||||
return self._graph_ops
|
||||
|
||||
def _get_graph(self, graph_name: str | None) -> FalkorGraph:
|
||||
# FalkorDB requires a non-None database name for multi-tenant graphs; the default is "default_db"
|
||||
if graph_name is None:
|
||||
graph_name = self._database
|
||||
return self.client.select_graph(graph_name)
|
||||
|
||||
async def execute_query(self, cypher_query_, **kwargs: Any):
|
||||
graph = self._get_graph(self._database)
|
||||
|
||||
# Convert datetime objects to ISO strings (FalkorDB does not support datetime objects directly)
|
||||
params = convert_datetimes_to_strings(dict(kwargs))
|
||||
|
||||
try:
|
||||
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
|
||||
except Exception as e:
|
||||
if 'already indexed' in str(e):
|
||||
# check if index already exists
|
||||
logger.info(f'Index already exists: {e}')
|
||||
return None
|
||||
logger.error(f'Error executing FalkorDB query: {e}\n{cypher_query_}\n{params}')
|
||||
raise
|
||||
|
||||
# Convert the result header to a list of strings
|
||||
header = [h[1] for h in result.header]
|
||||
|
||||
# Convert FalkorDB's result format (list of lists) to the format expected by Graphiti (list of dicts)
|
||||
records = []
|
||||
for row in result.result_set:
|
||||
record = {}
|
||||
for i, field_name in enumerate(header):
|
||||
if i < len(row):
|
||||
record[field_name] = row[i]
|
||||
else:
|
||||
# If there are more fields in header than values in row, set to None
|
||||
record[field_name] = None
|
||||
records.append(record)
|
||||
|
||||
return records, header, None
|
||||
|
||||
def session(self, database: str | None = None) -> GraphDriverSession:
|
||||
return FalkorDriverSession(self._get_graph(database))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the driver connection."""
|
||||
if hasattr(self.client, 'aclose'):
|
||||
await self.client.aclose() # type: ignore[reportUnknownMemberType]
|
||||
elif hasattr(self.client.connection, 'aclose'):
|
||||
await self.client.connection.aclose()
|
||||
elif hasattr(self.client.connection, 'close'):
|
||||
await self.client.connection.close()
|
||||
|
||||
async def delete_all_indexes(self) -> None:
|
||||
result = await self.execute_query('CALL db.indexes()')
|
||||
if not result:
|
||||
return
|
||||
|
||||
records, _, _ = result
|
||||
drop_tasks = []
|
||||
|
||||
for record in records:
|
||||
label = record['label']
|
||||
entity_type = record['entitytype']
|
||||
|
||||
for field_name, index_type in record['types'].items():
|
||||
if 'RANGE' in index_type:
|
||||
drop_tasks.append(self.execute_query(f'DROP INDEX ON :{label}({field_name})'))
|
||||
elif 'FULLTEXT' in index_type:
|
||||
if entity_type == 'NODE':
|
||||
drop_tasks.append(
|
||||
self.execute_query(
|
||||
f'DROP FULLTEXT INDEX FOR (n:{label}) ON (n.{field_name})'
|
||||
)
|
||||
)
|
||||
elif entity_type == 'RELATIONSHIP':
|
||||
drop_tasks.append(
|
||||
self.execute_query(
|
||||
f'DROP FULLTEXT INDEX FOR ()-[e:{label}]-() ON (e.{field_name})'
|
||||
)
|
||||
)
|
||||
|
||||
if drop_tasks:
|
||||
await asyncio.gather(*drop_tasks)
|
||||
|
||||
async def build_indices_and_constraints(self, delete_existing=False):
|
||||
if delete_existing:
|
||||
await self.delete_all_indexes()
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): add vector indexes alongside
|
||||
# range and fulltext. FalkorDB supports native vector indexes via
|
||||
# db.idx.vector.queryNodes / queryRelationships; without these, similarity
|
||||
# search runs as full-table-scan cosine math in interpreted Cypher.
|
||||
index_queries = (
|
||||
get_range_indices(self.provider)
|
||||
+ get_fulltext_indices(self.provider)
|
||||
+ get_vector_indices(self.provider)
|
||||
)
|
||||
for query in index_queries:
|
||||
await self.execute_query(query)
|
||||
# Invalidate the search_ops vector-index existence cache so subsequent
|
||||
# similarity queries re-probe and discover the indexes we just built.
|
||||
try:
|
||||
from graphiti_core.driver.falkordb.operations.search_ops import (
|
||||
_invalidate_falkordb_vector_index_cache,
|
||||
)
|
||||
_invalidate_falkordb_vector_index_cache()
|
||||
except ImportError:
|
||||
# search_ops module not yet imported (cold start); cache is empty
|
||||
# by default, so no invalidation needed.
|
||||
pass
|
||||
|
||||
def clone(self, database: str) -> 'GraphDriver':
|
||||
"""
|
||||
Returns a shallow copy of this driver with a different default database.
|
||||
Reuses the same connection (e.g. FalkorDB, Neo4j).
|
||||
"""
|
||||
if database == self._database:
|
||||
cloned = self
|
||||
elif database == self.default_group_id:
|
||||
cloned = FalkorDriver(falkor_db=self.client)
|
||||
else:
|
||||
# Create a new instance of FalkorDriver with the same connection but a different database
|
||||
cloned = FalkorDriver(falkor_db=self.client, database=database)
|
||||
|
||||
return cloned
|
||||
|
||||
async def health_check(self) -> None:
|
||||
"""Check FalkorDB connectivity by running a simple query."""
|
||||
try:
|
||||
await self.execute_query('MATCH (n) RETURN 1 LIMIT 1')
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f'FalkorDB health check failed: {e}')
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def convert_datetimes_to_strings(obj):
|
||||
if isinstance(obj, dict):
|
||||
return {k: FalkorDriver.convert_datetimes_to_strings(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, list):
|
||||
return [FalkorDriver.convert_datetimes_to_strings(item) for item in obj]
|
||||
elif isinstance(obj, tuple):
|
||||
return tuple(FalkorDriver.convert_datetimes_to_strings(item) for item in obj)
|
||||
elif isinstance(obj, datetime):
|
||||
return obj.isoformat()
|
||||
else:
|
||||
return obj
|
||||
|
||||
def sanitize(self, query: str) -> str:
|
||||
"""
|
||||
Replace FalkorDB special characters with whitespace.
|
||||
Based on FalkorDB tokenization rules: ,.<>{}[]"':;!@#$%^&*()-+=~
|
||||
"""
|
||||
# FalkorDB separator characters that break text into tokens
|
||||
separator_map = str.maketrans(
|
||||
{
|
||||
',': ' ',
|
||||
'.': ' ',
|
||||
'<': ' ',
|
||||
'>': ' ',
|
||||
'{': ' ',
|
||||
'}': ' ',
|
||||
'[': ' ',
|
||||
']': ' ',
|
||||
'"': ' ',
|
||||
"'": ' ',
|
||||
':': ' ',
|
||||
';': ' ',
|
||||
'!': ' ',
|
||||
'@': ' ',
|
||||
'#': ' ',
|
||||
'$': ' ',
|
||||
'%': ' ',
|
||||
'^': ' ',
|
||||
'&': ' ',
|
||||
'*': ' ',
|
||||
'(': ' ',
|
||||
')': ' ',
|
||||
'-': ' ',
|
||||
'+': ' ',
|
||||
'=': ' ',
|
||||
'~': ' ',
|
||||
'?': ' ',
|
||||
'|': ' ',
|
||||
'/': ' ',
|
||||
'\\': ' ',
|
||||
}
|
||||
)
|
||||
sanitized = query.translate(separator_map)
|
||||
# Clean up multiple spaces
|
||||
sanitized = ' '.join(sanitized.split())
|
||||
return sanitized
|
||||
|
||||
def build_fulltext_query(
|
||||
self, query: str, group_ids: list[str] | None = None, max_query_length: int = 128
|
||||
) -> str:
|
||||
"""
|
||||
Build a fulltext query string for FalkorDB using RedisSearch syntax.
|
||||
FalkorDB uses RedisSearch-like syntax where:
|
||||
- Field queries use @ prefix: @field:value
|
||||
- Multiple values for same field: (@field:value1|value2)
|
||||
- Text search doesn't need @ prefix for content fields
|
||||
- AND is implicit with space: (@group_id:value) (text)
|
||||
- OR uses pipe within parentheses: (@group_id:value1|value2)
|
||||
"""
|
||||
validate_group_ids(group_ids)
|
||||
|
||||
if group_ids is None or len(group_ids) == 0:
|
||||
group_filter = ''
|
||||
else:
|
||||
# Escape group_ids with quotes to prevent RediSearch syntax errors
|
||||
# with reserved words like "main" or special characters like hyphens
|
||||
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||
group_values = '|'.join(escaped_group_ids)
|
||||
group_filter = f'(@group_id:{group_values})'
|
||||
|
||||
sanitized_query = self.sanitize(query)
|
||||
|
||||
# Remove stopwords and empty tokens from the sanitized query
|
||||
query_words = sanitized_query.split()
|
||||
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||
sanitized_query = ' | '.join(filtered_words)
|
||||
|
||||
# If the query is too long return no query
|
||||
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||
return ''
|
||||
|
||||
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||
|
||||
return full_query
|
||||
@@ -0,0 +1,242 @@
|
||||
"""
|
||||
Database query utilities for different graph database backends.
|
||||
|
||||
This module provides database-agnostic query generation for Neo4j and FalkorDB,
|
||||
supporting index creation, fulltext search, and bulk operations.
|
||||
|
||||
PATCHED for FalkorDB native vector index support (BirdAI vendored patch,
|
||||
2026-05-02). Adds:
|
||||
- get_vector_indices(): CREATE VECTOR INDEX statements for FalkorDB
|
||||
- get_vector_search_query(): Cypher fragment for vector similarity using
|
||||
FalkorDB's db.idx.vector procedures, with fallback to cosine math when
|
||||
the index does not yet exist
|
||||
- VECTOR_INDEX_CANDIDATE_MULTIPLIER: over-fetch factor for vector index
|
||||
queries to handle filter rejections after index lookup
|
||||
|
||||
No changes to Neo4j or Kuzu code paths.
|
||||
"""
|
||||
|
||||
from typing_extensions import LiteralString
|
||||
|
||||
from graphiti_core.driver.driver import GraphProvider
|
||||
|
||||
# Mapping from Neo4j fulltext index names to FalkorDB node labels
|
||||
NEO4J_TO_FALKORDB_MAPPING = {
|
||||
'node_name_and_summary': 'Entity',
|
||||
'community_name': 'Community',
|
||||
'episode_content': 'Episodic',
|
||||
'edge_name_and_fact': 'RELATES_TO',
|
||||
}
|
||||
# Mapping from fulltext index names to Kuzu node labels
|
||||
INDEX_TO_LABEL_KUZU_MAPPING = {
|
||||
'node_name_and_summary': 'Entity',
|
||||
'community_name': 'Community',
|
||||
'episode_content': 'Episodic',
|
||||
'edge_name_and_fact': 'RelatesToNode_',
|
||||
}
|
||||
|
||||
# Vector index over-fetch multiplier. When a vector index search is
|
||||
# combined with WHERE filters (group_id, source_uuid, etc.), some of
|
||||
# the top-k index results may be filtered out. Over-fetching by this
|
||||
# factor preserves recall against the final LIMIT after filtering.
|
||||
# Conservative default; tunable per-deployment by editing this constant
|
||||
# or via environment-variable override at the driver level (future).
|
||||
VECTOR_INDEX_CANDIDATE_MULTIPLIER = 5
|
||||
|
||||
|
||||
def get_range_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
return [
|
||||
# Entity node
|
||||
'CREATE INDEX FOR (n:Entity) ON (n.uuid, n.group_id, n.name, n.created_at)',
|
||||
# Episodic node
|
||||
'CREATE INDEX FOR (n:Episodic) ON (n.uuid, n.group_id, n.created_at, n.valid_at)',
|
||||
# Community node
|
||||
'CREATE INDEX FOR (n:Community) ON (n.uuid)',
|
||||
# Saga node
|
||||
'CREATE INDEX FOR (n:Saga) ON (n.uuid, n.group_id, n.name)',
|
||||
# RELATES_TO edge
|
||||
'CREATE INDEX FOR ()-[e:RELATES_TO]-() ON (e.uuid, e.group_id, e.name, e.created_at, e.expired_at, e.valid_at, e.invalid_at)',
|
||||
# MENTIONS edge
|
||||
'CREATE INDEX FOR ()-[e:MENTIONS]-() ON (e.uuid, e.group_id)',
|
||||
# HAS_MEMBER edge
|
||||
'CREATE INDEX FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||
# HAS_EPISODE edge
|
||||
'CREATE INDEX FOR ()-[e:HAS_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||
# NEXT_EPISODE edge
|
||||
'CREATE INDEX FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||
]
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return []
|
||||
|
||||
return [
|
||||
'CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)',
|
||||
'CREATE INDEX episode_uuid IF NOT EXISTS FOR (n:Episodic) ON (n.uuid)',
|
||||
'CREATE INDEX community_uuid IF NOT EXISTS FOR (n:Community) ON (n.uuid)',
|
||||
'CREATE INDEX saga_uuid IF NOT EXISTS FOR (n:Saga) ON (n.uuid)',
|
||||
'CREATE INDEX relation_uuid IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.uuid)',
|
||||
'CREATE INDEX mention_uuid IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.uuid)',
|
||||
'CREATE INDEX has_member_uuid IF NOT EXISTS FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||
'CREATE INDEX has_episode_uuid IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.uuid)',
|
||||
'CREATE INDEX next_episode_uuid IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid)',
|
||||
'CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id)',
|
||||
'CREATE INDEX episode_group_id IF NOT EXISTS FOR (n:Episodic) ON (n.group_id)',
|
||||
'CREATE INDEX community_group_id IF NOT EXISTS FOR (n:Community) ON (n.group_id)',
|
||||
'CREATE INDEX saga_group_id IF NOT EXISTS FOR (n:Saga) ON (n.group_id)',
|
||||
'CREATE INDEX relation_group_id IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.group_id)',
|
||||
'CREATE INDEX mention_group_id IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.group_id)',
|
||||
'CREATE INDEX has_episode_group_id IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.group_id)',
|
||||
'CREATE INDEX next_episode_group_id IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.group_id)',
|
||||
'CREATE INDEX name_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.name)',
|
||||
'CREATE INDEX saga_name IF NOT EXISTS FOR (n:Saga) ON (n.name)',
|
||||
'CREATE INDEX created_at_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.created_at)',
|
||||
'CREATE INDEX created_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.created_at)',
|
||||
'CREATE INDEX valid_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.valid_at)',
|
||||
'CREATE INDEX name_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.name)',
|
||||
'CREATE INDEX created_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.created_at)',
|
||||
'CREATE INDEX expired_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.expired_at)',
|
||||
'CREATE INDEX valid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.valid_at)',
|
||||
'CREATE INDEX invalid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.invalid_at)',
|
||||
]
|
||||
|
||||
|
||||
def get_fulltext_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
from typing import cast
|
||||
|
||||
from graphiti_core.driver.falkordb import STOPWORDS
|
||||
|
||||
# Convert to string representation for embedding in queries
|
||||
stopwords_str = str(STOPWORDS)
|
||||
|
||||
# Use type: ignore to satisfy LiteralString requirement while maintaining single source of truth
|
||||
return cast(
|
||||
list[LiteralString],
|
||||
[
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Episodic',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'content', 'source', 'source_description', 'group_id'
|
||||
)""",
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Entity',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'name', 'summary', 'group_id'
|
||||
)""",
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Community',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'name', 'group_id'
|
||||
)""",
|
||||
"""CREATE FULLTEXT INDEX FOR ()-[e:RELATES_TO]-() ON (e.name, e.fact, e.group_id)""",
|
||||
],
|
||||
)
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return [
|
||||
"CALL CREATE_FTS_INDEX('Episodic', 'episode_content', ['content', 'source', 'source_description']);",
|
||||
"CALL CREATE_FTS_INDEX('Entity', 'node_name_and_summary', ['name', 'summary']);",
|
||||
"CALL CREATE_FTS_INDEX('Community', 'community_name', ['name']);",
|
||||
"CALL CREATE_FTS_INDEX('RelatesToNode_', 'edge_name_and_fact', ['name', 'fact']);",
|
||||
]
|
||||
|
||||
return [
|
||||
"""CREATE FULLTEXT INDEX episode_content IF NOT EXISTS
|
||||
FOR (e:Episodic) ON EACH [e.content, e.source, e.source_description, e.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX node_name_and_summary IF NOT EXISTS
|
||||
FOR (n:Entity) ON EACH [n.name, n.summary, n.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX community_name IF NOT EXISTS
|
||||
FOR (n:Community) ON EACH [n.name, n.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX edge_name_and_fact IF NOT EXISTS
|
||||
FOR ()-[e:RELATES_TO]-() ON EACH [e.name, e.fact, e.group_id]""",
|
||||
]
|
||||
|
||||
|
||||
def get_vector_indices(provider: GraphProvider, dimension: int = 384) -> list[LiteralString]:
|
||||
"""Return CREATE VECTOR INDEX statements for the given provider.
|
||||
|
||||
For FalkorDB: creates HNSW vector indexes on Entity.name_embedding,
|
||||
RELATES_TO.fact_embedding, and Community.name_embedding. Backed by
|
||||
FalkorDB's native vector index (db.idx.vector.queryNodes /
|
||||
queryRelationships).
|
||||
|
||||
For Neo4j and Kuzu: returns an empty list. Those backends create vector
|
||||
indexes via different mechanisms (Neo4j auto-creates them when needed
|
||||
via its vector.similarity.cosine function; Kuzu uses array_cosine_similarity
|
||||
and does not require pre-built vector indexes for graphiti-core's usage).
|
||||
|
||||
Args:
|
||||
provider: The graph database provider.
|
||||
dimension: Embedding dimension. Defaults to 384 (all-MiniLM-L6-v2).
|
||||
Embedders with different dimensions should pass their own value
|
||||
through driver configuration. graphiti-core's default embedder
|
||||
is 1536 (OpenAI ada-002); BirdAI uses 384 (sentence-transformers).
|
||||
|
||||
Returns:
|
||||
List of CREATE VECTOR INDEX statements. Idempotent at FalkorDB level
|
||||
if the index already exists with matching options.
|
||||
"""
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
from typing import cast
|
||||
return cast(
|
||||
list[LiteralString],
|
||||
[
|
||||
f"CREATE VECTOR INDEX FOR (n:Entity) ON (n.name_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
f"CREATE VECTOR INDEX FOR ()-[e:RELATES_TO]-() ON (e.fact_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
f"CREATE VECTOR INDEX FOR (n:Community) ON (n.name_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
],
|
||||
)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def get_nodes_query(name: str, query: str, limit: int, provider: GraphProvider) -> str:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||
return f"CALL db.idx.fulltext.queryNodes('{label}', {query})"
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', {query}, TOP := $limit)"
|
||||
|
||||
return f'CALL db.index.fulltext.queryNodes("{name}", {query}, {{limit: $limit}})'
|
||||
|
||||
|
||||
def get_vector_cosine_func_query(vec1, vec2, provider: GraphProvider) -> str:
|
||||
"""Return a Cypher fragment for cosine similarity score in [0, 1].
|
||||
|
||||
PRESERVED for backward compatibility and as fallback when vector indexes
|
||||
do not yet exist on the FalkorDB backend. New code paths should prefer
|
||||
get_vector_search_query() which uses the native vector index when
|
||||
available.
|
||||
"""
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
# FalkorDB uses a different syntax for regular cosine similarity and Neo4j uses normalized cosine similarity
|
||||
return f'(2 - vec.cosineDistance({vec1}, vecf32({vec2})))/2'
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return f'array_cosine_similarity({vec1}, {vec2})'
|
||||
|
||||
return f'vector.similarity.cosine({vec1}, {vec2})'
|
||||
|
||||
|
||||
def get_relationships_query(name: str, limit: int, provider: GraphProvider) -> str:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||
return f"CALL db.idx.fulltext.queryRelationships('{label}', $query)"
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', cast($query AS STRING), TOP := $limit)"
|
||||
|
||||
return f'CALL db.index.fulltext.queryRelationships("{name}", $query, {{limit: $limit}})'
|
||||
@@ -0,0 +1,55 @@
|
||||
-- Migration: 20260502-001_async_job_model
|
||||
-- Purpose: Pattern 1 async job model — sidecar processes ingest jobs serially
|
||||
-- via Postgres-backed queue. Worker submits and polls rather than
|
||||
-- blocking on synchronous HTTP response.
|
||||
--
|
||||
-- Architectural rationale: tonight's smoke test (2026-05-02 ~01:40-01:50 UTC)
|
||||
-- diagnosed that bulk ingest against a 4,222-entity graph commits successfully
|
||||
-- but the worker's HTTP read-timeout fires before the response returns. Three
|
||||
-- days of "saga deadlock" failures were false negatives — the work succeeded;
|
||||
-- the worker just stopped listening. Pattern 1 separates submission from
|
||||
-- completion observation so the worker can't false-negative this way.
|
||||
--
|
||||
-- The job model is also the natural data source for Phase A items 6-7
|
||||
-- (metrics tables) — graphiti_jobs records duration, status transitions,
|
||||
-- and per-job summary that those tables will aggregate.
|
||||
--
|
||||
-- Idempotent: safe to re-run.
|
||||
|
||||
-- Job state for sidecar's async ingest queue.
|
||||
-- One row per submitted bulk-or-single ingest. Sidecar reads queued jobs
|
||||
-- on startup to resume after restart. Worker polls status until terminal.
|
||||
CREATE TABLE IF NOT EXISTS graphiti_jobs (
|
||||
job_id UUID PRIMARY KEY,
|
||||
job_type TEXT NOT NULL CHECK (job_type IN ('bulk', 'single')),
|
||||
payload JSONB NOT NULL, -- full submitted request body
|
||||
status TEXT NOT NULL DEFAULT 'queued' -- 'queued'|'running'|'committed'|'failed'
|
||||
CHECK (status IN ('queued', 'running', 'committed', 'failed')),
|
||||
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
started_at TIMESTAMPTZ,
|
||||
finished_at TIMESTAMPTZ,
|
||||
error TEXT, -- non-null when status='failed'
|
||||
summary JSONB, -- {nodes: N, edges: N, episodes: N}
|
||||
submitted_by TEXT -- worker name for traceability
|
||||
);
|
||||
|
||||
-- Index supporting sidecar's "pick next queued job" query
|
||||
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_queued
|
||||
ON graphiti_jobs (enqueued_at)
|
||||
WHERE status = 'queued';
|
||||
|
||||
-- Index supporting worker's "poll my job by id" query (PK already does this,
|
||||
-- but explicit index aids ANALYZE behavior on small tables)
|
||||
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_status
|
||||
ON graphiti_jobs (status);
|
||||
|
||||
-- Stage 3 queue gains a reference to the sidecar job processing the row.
|
||||
-- When set, worker polls graphiti_jobs.status rather than blocking on HTTP.
|
||||
-- NULL means: row not yet submitted, or pre-Pattern-1 row.
|
||||
ALTER TABLE stage_3_queue
|
||||
ADD COLUMN IF NOT EXISTS external_job_id UUID;
|
||||
|
||||
-- Index for "find rows that submitted but didn't complete" recovery scans
|
||||
CREATE INDEX IF NOT EXISTS idx_stage_3_queue_external_job
|
||||
ON stage_3_queue (external_job_id)
|
||||
WHERE external_job_id IS NOT NULL AND completed_at IS NULL AND failed_at IS NULL;
|
||||
+419
-69
@@ -1,14 +1,44 @@
|
||||
"""
|
||||
Aaron AI — Graphiti Sidecar Service
|
||||
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
|
||||
Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model)
|
||||
|
||||
Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest
|
||||
submission and completion are decoupled. Submitters POST to /episodes or
|
||||
/episodes/bulk and receive a job_id; an in-process background worker
|
||||
processes jobs serially against the graph; submitters poll GET /jobs/{id}
|
||||
until terminal status.
|
||||
|
||||
Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk
|
||||
ingest against the 4,222-entity graph commits successfully even when the
|
||||
worker's HTTP read-timeout fires. The synchronous interface was producing
|
||||
false-negative failures — work succeeded but the worker stopped listening.
|
||||
Pattern 1 separates submission from completion observation so the worker
|
||||
can't false-negative this way.
|
||||
|
||||
Architectural commitments:
|
||||
- One in-flight job per sidecar (per graph). Concurrent jobs against the
|
||||
same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk
|
||||
(no transaction boundary, no internal coordination). Concurrent
|
||||
multi-tenancy is "run multiple sidecars," not "make one sidecar
|
||||
concurrency-safe across graphs."
|
||||
- Postgres-backed job state. Survives sidecar restart. On startup the
|
||||
sidecar resets any 'running' rows to 'queued' (their previous run died);
|
||||
the background worker picks them up naturally.
|
||||
- Both /episodes and /episodes/bulk are async-shaped for parity. graphiti-
|
||||
core operations underneath (add_episode, add_episode_bulk) are unchanged.
|
||||
- The bulk pathway is preserved — load-bearing for first-run corpus
|
||||
migration. Single-episode is preserved — load-bearing for state-
|
||||
superseding content per the Stage 2/3 routing rule.
|
||||
|
||||
Port 8001 (internal only). No OpenAI dependency.
|
||||
"""
|
||||
|
||||
import os, logging, sys, traceback
|
||||
import os, logging, sys, asyncio, traceback, uuid, json
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
@@ -31,8 +61,18 @@ FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
|
||||
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
|
||||
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
|
||||
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1")
|
||||
os.environ["EMBEDDING_DIM"] = "384"
|
||||
|
||||
# Background worker configuration. Polls Postgres for queued jobs every
|
||||
# WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design;
|
||||
# no concurrency primitive beyond the serial loop. The sleep is brief
|
||||
# enough to feel responsive but long enough to avoid burning CPU on an
|
||||
# empty queue.
|
||||
WORKER_POLL_INTERVAL = 2.0
|
||||
|
||||
|
||||
def get_llm_client():
|
||||
from graphiti_core.llm_client.config import LLMConfig
|
||||
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
|
||||
@@ -50,16 +90,286 @@ def get_llm_client():
|
||||
return GroqClient(config)
|
||||
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
|
||||
|
||||
graphiti_instance = None
|
||||
|
||||
async def get_graphiti():
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
return graphiti_instance
|
||||
graphiti_instance = None
|
||||
worker_task = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Postgres job-state helpers. Synchronous psycopg2 calls inside async
|
||||
# functions: each call opens a fresh connection, runs one statement, closes.
|
||||
# Acceptable here because traffic is low (single-digit jobs/min steady state)
|
||||
# and the simplicity is worth more than connection pooling. If this ever
|
||||
# becomes a bottleneck, swap to asyncpg or psycopg3 async.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def _job_insert(job_id: str, job_type: str, payload: dict) -> None:
|
||||
"""Write a new job row in 'queued' status."""
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by)
|
||||
VALUES (%s, %s, %s::jsonb, 'queued', %s)
|
||||
""",
|
||||
(job_id, job_type, json.dumps(payload), SIDECAR_NAME),
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _job_get(job_id: str) -> dict | None:
|
||||
"""Read a single job by id. Returns None if not found."""
|
||||
pg = _pg()
|
||||
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT job_id, job_type, status, enqueued_at, started_at, finished_at,
|
||||
error, summary, submitted_by
|
||||
FROM graphiti_jobs
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(job_id,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
pg.close()
|
||||
if row is None:
|
||||
return None
|
||||
# Convert UUID, datetimes for JSON serialization
|
||||
return {
|
||||
"job_id": str(row["job_id"]),
|
||||
"job_type": row["job_type"],
|
||||
"status": row["status"],
|
||||
"enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None,
|
||||
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
|
||||
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else None,
|
||||
"error": row["error"],
|
||||
"summary": row["summary"],
|
||||
"submitted_by": row["submitted_by"],
|
||||
}
|
||||
|
||||
|
||||
def _job_claim_next() -> dict | None:
|
||||
"""Atomically claim the oldest queued job for processing.
|
||||
|
||||
Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances
|
||||
(future multi-tenant deployment) don't fight over the same row. For
|
||||
single-sidecar deployments this is just a clean atomic transition.
|
||||
|
||||
Returns the full job row (including payload) or None if queue is empty.
|
||||
"""
|
||||
pg = _pg()
|
||||
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute(
|
||||
"""
|
||||
WITH next_job AS (
|
||||
SELECT job_id
|
||||
FROM graphiti_jobs
|
||||
WHERE status = 'queued'
|
||||
ORDER BY enqueued_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
UPDATE graphiti_jobs g
|
||||
SET status = 'running', started_at = NOW()
|
||||
FROM next_job
|
||||
WHERE g.job_id = next_job.job_id
|
||||
RETURNING g.job_id, g.job_type, g.payload
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
pg.commit()
|
||||
pg.close()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"job_id": str(row["job_id"]),
|
||||
"job_type": row["job_type"],
|
||||
"payload": row["payload"], # already a dict via JSONB
|
||||
}
|
||||
|
||||
|
||||
def _job_complete(job_id: str, summary: dict) -> None:
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'committed', finished_at = NOW(), summary = %s::jsonb
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(json.dumps(summary), job_id),
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _job_fail(job_id: str, error: str) -> None:
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'failed', finished_at = NOW(), error = %s
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(error[:2000], job_id), # truncate to keep error column reasonable
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _startup_recovery() -> int:
|
||||
"""Reset any 'running' jobs to 'queued' on startup.
|
||||
|
||||
Rationale: if the sidecar died while processing a job, that row is
|
||||
stuck in 'running' with no process advancing it. The right behavior
|
||||
on restart is to retry. graphiti-core's add_episode_bulk and
|
||||
add_episode are idempotent against the graph (dedup handles duplicate
|
||||
submission), so re-running a job is safe — at worst, a second run
|
||||
incurs API spend on resolve calls that no-op against an already-
|
||||
committed entity set.
|
||||
|
||||
Returns the count of recovered jobs.
|
||||
"""
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'queued', started_at = NULL
|
||||
WHERE status = 'running'
|
||||
"""
|
||||
)
|
||||
count = cur.rowcount
|
||||
pg.commit()
|
||||
pg.close()
|
||||
return count
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Background worker — single asyncio task running for the sidecar lifetime.
|
||||
# Processes one job at a time. No concurrency. Restart recovery is handled
|
||||
# by _startup_recovery() before this task starts.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def background_worker():
|
||||
"""Serial job processor. Polls graphiti_jobs, processes one at a time."""
|
||||
log.info("Background worker started")
|
||||
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||
|
||||
while True:
|
||||
try:
|
||||
claimed = _job_claim_next()
|
||||
if claimed is None:
|
||||
await asyncio.sleep(WORKER_POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
job_id = claimed["job_id"]
|
||||
job_type = claimed["job_type"]
|
||||
payload = claimed["payload"]
|
||||
|
||||
log.info(f"Processing job {job_id} (type={job_type})")
|
||||
start = datetime.now()
|
||||
|
||||
try:
|
||||
if job_type == "bulk":
|
||||
summary = await _process_bulk_job(payload, EpisodeType, RawEpisode)
|
||||
elif job_type == "single":
|
||||
summary = await _process_single_job(payload, EpisodeType)
|
||||
else:
|
||||
raise ValueError(f"Unknown job_type: {job_type}")
|
||||
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
summary["duration_seconds"] = duration
|
||||
_job_complete(job_id, summary)
|
||||
log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}")
|
||||
except Exception as e:
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
err = f"{type(e).__name__}: {e}"
|
||||
log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}")
|
||||
_job_fail(job_id, err)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
log.info("Background worker cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
# Defensive: don't let the worker loop die from an unexpected error.
|
||||
# Log it, sleep briefly, continue.
|
||||
log.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
|
||||
await asyncio.sleep(5.0)
|
||||
|
||||
|
||||
async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict:
|
||||
"""Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest."""
|
||||
raw_episodes = []
|
||||
for ep in payload["episodes"]:
|
||||
ref_time = (
|
||||
datetime.fromisoformat(ep["timestamp"])
|
||||
if ep.get("timestamp") else datetime.now()
|
||||
)
|
||||
raw_episodes.append(RawEpisode(
|
||||
name=ep["name"],
|
||||
content=ep["content"],
|
||||
source_description=ep.get("source_description", ""),
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
))
|
||||
|
||||
kwargs = dict(
|
||||
bulk_episodes=raw_episodes,
|
||||
group_id=payload.get("group_id") or GROUP_ID,
|
||||
saga=payload.get("saga"),
|
||||
)
|
||||
if payload.get("custom_extraction_instructions") is not None:
|
||||
kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"]
|
||||
|
||||
result = await graphiti_instance.add_episode_bulk(**kwargs)
|
||||
|
||||
return {
|
||||
"type": "bulk",
|
||||
"episodes": len(result.episodes) if result and result.episodes else len(raw_episodes),
|
||||
"nodes": len(result.nodes) if result and result.nodes else 0,
|
||||
"edges": len(result.edges) if result and result.edges else 0,
|
||||
}
|
||||
|
||||
|
||||
async def _process_single_job(payload: dict, EpisodeType) -> dict:
|
||||
"""Run add_episode for a 'single' job. Payload mirrors EpisodeRequest."""
|
||||
ref_time = (
|
||||
datetime.fromisoformat(payload["timestamp"])
|
||||
if payload.get("timestamp") else datetime.now()
|
||||
)
|
||||
kwargs = dict(
|
||||
name=payload["name"],
|
||||
episode_body=payload["content"],
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
source_description=payload.get("source_description", ""),
|
||||
group_id=payload.get("group_id") or GROUP_ID,
|
||||
custom_extraction_instructions=payload.get("custom_extraction_instructions"),
|
||||
)
|
||||
if payload.get("saga") is not None:
|
||||
kwargs["saga"] = payload["saga"]
|
||||
|
||||
await graphiti_instance.add_episode(**kwargs)
|
||||
return {"type": "single", "episodes": 1}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lifespan & app
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global graphiti_instance
|
||||
global graphiti_instance, worker_task
|
||||
|
||||
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
|
||||
log.info("Loading embedding and reranker models...")
|
||||
from st_embedder import SentenceTransformerEmbedder
|
||||
@@ -75,11 +385,51 @@ async def lifespan(app: FastAPI):
|
||||
max_coroutines=2,
|
||||
)
|
||||
await graphiti_instance.build_indices_and_constraints()
|
||||
|
||||
# PATCHED 2026-05-02: bridge the per-driver SearchOperations to the
|
||||
# search_interface attribute that search_utils.py dispatches on.
|
||||
# graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops
|
||||
# but never assigns it to driver.search_interface — naming mismatch
|
||||
# between the two halves of the codebase. Without this, search_utils.py
|
||||
# falls through to interpreted-Cypher cosine math (full-table scan) even
|
||||
# when our patched FalkorSearchOperations exists. Setting search_interface
|
||||
# activates the per-driver vector-index path.
|
||||
if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None:
|
||||
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
|
||||
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
|
||||
|
||||
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
|
||||
|
||||
# Recover any jobs left 'running' from a previous sidecar instance.
|
||||
# They become 'queued' again and the background worker picks them up.
|
||||
recovered = _startup_recovery()
|
||||
if recovered > 0:
|
||||
log.info(f"Startup recovery: reset {recovered} running job(s) to queued")
|
||||
|
||||
# Start the background job worker.
|
||||
worker_task = asyncio.create_task(background_worker())
|
||||
log.info("Sidecar ready — accepting job submissions on :8001")
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown: cancel worker, close graphiti.
|
||||
if worker_task is not None:
|
||||
worker_task.cancel()
|
||||
try:
|
||||
await worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await graphiti_instance.close()
|
||||
|
||||
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
|
||||
|
||||
app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request models — preserved from v1.0 with no payload-shape changes. The
|
||||
# only API change is the response shape: instead of blocking until
|
||||
# graphiti-core returns, submission endpoints return a job_id immediately.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class BulkEpisodeItem(BaseModel):
|
||||
name: str
|
||||
@@ -92,11 +442,6 @@ class BulkEpisodeRequest(BaseModel):
|
||||
episodes: list[BulkEpisodeItem]
|
||||
group_id: str | None = None
|
||||
saga: str | None = None
|
||||
# Batch-level extraction guidance. graphiti-core inserts this into the
|
||||
# entity-extraction and edge-extraction prompts only — NOT into dedup
|
||||
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
|
||||
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
|
||||
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
|
||||
custom_extraction_instructions: str | None = None
|
||||
|
||||
|
||||
@@ -109,72 +454,76 @@ class EpisodeRequest(BaseModel):
|
||||
custom_extraction_instructions: str | None = None
|
||||
saga: str | None = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
|
||||
return {
|
||||
"ok": True,
|
||||
"provider": LLM_PROVIDER,
|
||||
"group": GROUP_ID,
|
||||
"sidecar": SIDECAR_NAME,
|
||||
"version": "2.0",
|
||||
}
|
||||
|
||||
@app.post("/episodes")
|
||||
async def add_episode(req: EpisodeRequest):
|
||||
g = await get_graphiti()
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
try:
|
||||
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
|
||||
kwargs = dict(
|
||||
name=req.name,
|
||||
episode_body=req.content,
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
source_description=req.source_description,
|
||||
group_id=req.group_id or GROUP_ID,
|
||||
custom_extraction_instructions=req.custom_extraction_instructions,
|
||||
)
|
||||
# Saga is supported on graphiti-core add_episode but kept optional
|
||||
# so older callers don't need to know about it.
|
||||
if req.saga is not None:
|
||||
kwargs["saga"] = req.saga
|
||||
await g.add_episode(**kwargs)
|
||||
return {"ok": True}
|
||||
except Exception as e:
|
||||
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.post("/episodes/bulk")
|
||||
async def add_episodes_bulk(req: BulkEpisodeRequest):
|
||||
g = await get_graphiti()
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||
raw_episodes = []
|
||||
for ep in req.episodes:
|
||||
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
|
||||
raw_episodes.append(RawEpisode(
|
||||
name=ep.name,
|
||||
content=ep.content,
|
||||
source_description=ep.source_description,
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
))
|
||||
async def submit_bulk(req: BulkEpisodeRequest):
|
||||
"""Submit a bulk ingest job. Returns job_id for polling.
|
||||
|
||||
Job is processed serially by the sidecar's background worker; one
|
||||
bulk-or-single job at a time per graph. No HTTP read-timeout
|
||||
blocking. Submitter polls GET /jobs/{job_id} until terminal status.
|
||||
"""
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
|
||||
job_id = str(uuid.uuid4())
|
||||
payload = req.model_dump()
|
||||
try:
|
||||
kwargs = dict(
|
||||
bulk_episodes=raw_episodes,
|
||||
group_id=req.group_id or GROUP_ID,
|
||||
saga=req.saga or None,
|
||||
)
|
||||
# Pass-through only when set, so callers that don't supply
|
||||
# instructions get graphiti-core's default behavior unchanged.
|
||||
if req.custom_extraction_instructions is not None:
|
||||
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
|
||||
result = await g.add_episode_bulk(**kwargs)
|
||||
return {"ok": True, "count": len(raw_episodes)}
|
||||
_job_insert(job_id, "bulk", payload)
|
||||
except Exception as e:
|
||||
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||
|
||||
return {"job_id": job_id, "status": "queued"}
|
||||
|
||||
|
||||
@app.post("/episodes")
|
||||
async def submit_single(req: EpisodeRequest):
|
||||
"""Submit a single-episode ingest job. Returns job_id for polling."""
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
|
||||
job_id = str(uuid.uuid4())
|
||||
payload = req.model_dump()
|
||||
try:
|
||||
_job_insert(job_id, "single", payload)
|
||||
except Exception as e:
|
||||
log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||
|
||||
return {"job_id": job_id, "status": "queued"}
|
||||
|
||||
|
||||
@app.get("/jobs/{job_id}")
|
||||
async def get_job(job_id: str):
|
||||
"""Poll a job's status. Returns 404 if job not found."""
|
||||
job = _job_get(job_id)
|
||||
if job is None:
|
||||
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
|
||||
return job
|
||||
|
||||
|
||||
@app.get("/search")
|
||||
async def search(query: str, limit: int = 8, group_id: str | None = None):
|
||||
g = await get_graphiti()
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
try:
|
||||
results = await g.search(
|
||||
results = await graphiti_instance.search(
|
||||
query=query,
|
||||
num_results=limit,
|
||||
group_ids=[group_id or GROUP_ID],
|
||||
@@ -195,6 +544,7 @@ async def search(query: str, limit: int = 8, group_id: str | None = None):
|
||||
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
|
||||
|
||||
@@ -220,8 +220,15 @@ def enqueue_stage3(pg, source, full_text, orientation, metadata,
|
||||
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
|
||||
state_type_rationale = EXCLUDED.state_type_rationale,
|
||||
enqueued_at = NOW(),
|
||||
-- Reset all run-state fields on re-enqueue. Without this,
|
||||
-- stale started_at from a prior attempt makes the row
|
||||
-- invisible to the Stage 3 worker's claim filter (which
|
||||
-- typically uses started_at IS NULL).
|
||||
started_at = NULL,
|
||||
completed_at = NULL,
|
||||
failed_at = NULL,
|
||||
failure_reason = NULL,
|
||||
external_job_id = NULL,
|
||||
attempts = 0
|
||||
""", (source, full_text, orientation, json.dumps(metadata),
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
|
||||
+118
-6
@@ -1,6 +1,7 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
|
||||
+ Encoder Instructions (v1.0)
|
||||
|
||||
Polls stage_3_queue, routes each row to one of two ingest pathways based on
|
||||
state-type classification produced by Stage 2:
|
||||
@@ -12,6 +13,18 @@ state-type classification produced by Stage 2:
|
||||
confidence in {medium, high}. Per-chunk POST to /episodes with shared
|
||||
saga tag, full edge invalidation, per-chunk timeout/retry independence.
|
||||
|
||||
Both pathways pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar via
|
||||
custom_extraction_instructions, which graphiti-core inserts into entity
|
||||
and edge extraction prompts (NOT dedup prompts — that's intentional under
|
||||
the encoder-stays-naive commitment).
|
||||
|
||||
Architectural posture: the encoder is content-naïve. It does not draw on
|
||||
prior knowledge of the user, the substrate, or the cycle's accumulated
|
||||
work. Schema and personality live in the cycle's consolidated substrate,
|
||||
where the dream phase shapes them. The encoder produces source-grounded
|
||||
ground truth for the cycle to work from. See EXTRACTION_INSTRUCTIONS_V1
|
||||
below for the extraction guidance text.
|
||||
|
||||
Routing rationale: the single-episode pathway is the correct API per
|
||||
graphiti-core's docs for content that supersedes prior facts (it does
|
||||
edge invalidation that bulk skips). It costs more per chunk because of
|
||||
@@ -67,7 +80,7 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
|
||||
RETRY_ATTEMPTS = 2
|
||||
POLL_INTERVAL = 5
|
||||
INGEST_TIMEOUT = 600
|
||||
WORKER_VERSION = "2.3"
|
||||
WORKER_VERSION = "2.4"
|
||||
|
||||
# Match Stage 1 chunking parameters
|
||||
CHUNK_SIZE_WORDS = 500
|
||||
@@ -84,6 +97,87 @@ MAX_CHUNKS_PER_SAGA = 10
|
||||
# the expensive pathway.
|
||||
HIGH_TRUST_CONFIDENCE = ("medium", "high")
|
||||
|
||||
# Encoder extraction guidance v1.0 — see module docstring for posture rationale.
|
||||
# Passed to graphiti-core via custom_extraction_instructions on both ingest
|
||||
# pathways. Inserted into entity-extraction and edge-extraction prompts only;
|
||||
# does NOT enter dedup prompts. Encoder-stays-naïve commitment is structural,
|
||||
# not versioned: this text gets refined over time but the encoder does not
|
||||
# acquire substrate context as the cycle matures.
|
||||
EXTRACTION_INSTRUCTIONS_V1 = """\
|
||||
EXTRACTION GUIDANCE — BirdAI cascade
|
||||
|
||||
The encoder's job is faithful capture from this chunk's text. It does
|
||||
not draw on prior knowledge of the user, the substrate, or the cycle's
|
||||
accumulated work. Schema, personality, and inferred context live in
|
||||
the cycle's consolidated substrate, where the dream phase shapes them
|
||||
through prediction-error replay and speculation. The encoder stays
|
||||
content-naïve so the cycle has source-grounded ground truth to work
|
||||
from.
|
||||
|
||||
The orientation produced by an upstream pass describes content shape,
|
||||
not content interpretation. Use it as forward-facing guidance for what
|
||||
to attend to in this document. Do not let it bound or limit what you
|
||||
extract.
|
||||
|
||||
PREDICATE NAMING
|
||||
|
||||
Produce semantic predicates that describe the actual relationship the
|
||||
text states. Use verbs or verb phrases — "wrote", "advised", "founded",
|
||||
"works at", "led to", "contradicts", "is autobiographical to" — not
|
||||
generic placeholders. Reserve generic forms (for example, "relates to"
|
||||
or "mentions") for cases where the text genuinely does not specify a
|
||||
more particular relationship. The verb is the load-bearing part of
|
||||
the fact; preserving it is what makes the relationship queryable later.
|
||||
|
||||
EXTRACTION POSTURE
|
||||
|
||||
Extract from this chunk's text as if each entity is encountered fresh.
|
||||
Do not try to reconcile entities you find here with entities that
|
||||
might already exist elsewhere in the graph. Redundant entity instances
|
||||
are acceptable. Cross-document entity resolution is downstream cycle
|
||||
work, not extraction work.
|
||||
|
||||
When the same entity appears multiple times within this chunk with
|
||||
slightly different spellings — a common artifact of voice transcription —
|
||||
prefer the more frequent or more canonical-looking form. Do not invent
|
||||
canonical forms; choose among the variants the text actually contains.
|
||||
|
||||
EXTRACT FROM THE SOURCE
|
||||
|
||||
Extract relationships the text states or strongly implies through
|
||||
direct linguistic markers ("X led to Y", "X works for Y", "X met Y at
|
||||
Z"). Do not extend extraction to relationships the text neither states
|
||||
nor directly implies. Inferred relationships are produced by the
|
||||
cycle's dream phase as speculative edges with explicit low-confidence
|
||||
tagging, where they can be evaluated and either ratified or pruned by
|
||||
subsequent cycle work. Encoding-time inference, mixed in with source-
|
||||
grounded extraction, would lose the speculation/source distinction the
|
||||
cycle's consolidation work relies on.
|
||||
|
||||
DO NOT PRE-EMPT CYCLE WORK
|
||||
|
||||
Do not omit relationships because they seem redundant with prior
|
||||
extractions or with the existing graph. Cross-document entity
|
||||
resolution and edge consolidation are downstream cycle operations;
|
||||
redundant extraction at this stage is intentional. Extracting the
|
||||
same fact from multiple sources gives the cycle's consolidation work
|
||||
the recurrence signal it relies on.
|
||||
|
||||
EXTRACTION DEPTH
|
||||
|
||||
Use the orientation's frame_relationships and extraction_orientation
|
||||
fields to inform what to attend to. If the orientation describes
|
||||
cross-domain relational content, look for relationships that bridge
|
||||
those domains explicitly, with named predicates for the bridging.
|
||||
If the orientation describes single-domain technical content, look
|
||||
for the structural relationships internal to that domain.
|
||||
|
||||
Extract every entity and every relationship the text states. Do not
|
||||
summarize, do not filter, do not omit content because it seems
|
||||
incidental. The orientation tells you what to look for; the source
|
||||
text tells you what is there.
|
||||
"""
|
||||
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
@@ -193,6 +287,8 @@ def ingest_bulk(source, full_text, orientation):
|
||||
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
||||
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
|
||||
tag so Graphiti links them as one document unit
|
||||
|
||||
All three sub-paths pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||
"""
|
||||
char_length = len(full_text)
|
||||
|
||||
@@ -204,7 +300,11 @@ def ingest_bulk(source, full_text, orientation):
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}]
|
||||
log.info(f" [bulk] Single episode ({char_length} chars)")
|
||||
return post_bulk({"episodes": episodes, "group_id": "aaron"})
|
||||
return post_bulk({
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
})
|
||||
|
||||
chunks = chunk_text(full_text)
|
||||
total_chunks = len(chunks)
|
||||
@@ -220,9 +320,12 @@ def ingest_bulk(source, full_text, orientation):
|
||||
for i, chunk in enumerate(chunks)
|
||||
]
|
||||
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
|
||||
return post_bulk(
|
||||
{"episodes": episodes, "group_id": "aaron", "saga": source}
|
||||
)
|
||||
return post_bulk({
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
})
|
||||
|
||||
# Large document: split into batches sharing the same saga tag
|
||||
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
||||
@@ -247,7 +350,12 @@ def ingest_bulk(source, full_text, orientation):
|
||||
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
|
||||
log.info(f" {batch_label} starting")
|
||||
last_result = post_bulk(
|
||||
{"episodes": episodes, "group_id": "aaron", "saga": source},
|
||||
{
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
},
|
||||
batch_label=batch_label,
|
||||
)
|
||||
log.info(f" {batch_label} committed")
|
||||
@@ -261,6 +369,8 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
||||
with shared saga tag. Each call independent: own timeout, own retry
|
||||
envelope, own failure semantics.
|
||||
|
||||
Each chunk POST passes EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||
|
||||
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
|
||||
stay committed (graphiti has already accepted them) and the function
|
||||
raises with detail about which chunk failed and how many succeeded.
|
||||
@@ -281,6 +391,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
||||
"source_description": orientation,
|
||||
"group_id": "aaron",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
}
|
||||
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
|
||||
return post_episode(payload, episode_label="single-ep")
|
||||
@@ -302,6 +413,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
}
|
||||
try:
|
||||
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
|
||||
|
||||
Reference in New Issue
Block a user