Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7f07972109 | |||
| f645b74b1c | |||
| c0e6159b5e | |||
| d7b2a850c4 | |||
| a0bf280075 |
@@ -0,0 +1,4 @@
|
|||||||
|
# Local backups created by apply.sh — environment state, not source.
|
||||||
|
# Keeping these out of version control prevents repo bloat and avoids
|
||||||
|
# checking in graphiti-core's Apache-2.0 source under our repo's tree.
|
||||||
|
backups/
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
# graphiti-core Patches — FalkorDB Vector Index Support
|
||||||
|
|
||||||
|
Vendored patches against graphiti-core 0.29.0 adding native FalkorDB
|
||||||
|
vector index support. Three files modified, all under
|
||||||
|
`graphiti_core/driver/falkordb/` and `graphiti_core/graph_queries.py`.
|
||||||
|
No changes to Neo4j or Kuzu code paths.
|
||||||
|
|
||||||
|
## Why this exists
|
||||||
|
|
||||||
|
graphiti-core's FalkorDB driver uses interpreted Cypher cosine math
|
||||||
|
(`vec.cosineDistance(...)`) for similarity search. Each query becomes a
|
||||||
|
full table scan over Entity/RELATES_TO/Community nodes. At ~4,000+
|
||||||
|
entities, single-episode ingest's resolve-against-existing-graph step
|
||||||
|
takes 8+ minutes and bulk ingest hangs FalkorDB. FalkorDB itself
|
||||||
|
supports `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||||
|
procedures backed by HNSW indexes; graphiti-core's driver doesn't use
|
||||||
|
them.
|
||||||
|
|
||||||
|
These patches:
|
||||||
|
|
||||||
|
1. Add `get_vector_indices()` to `graph_queries.py` returning CREATE
|
||||||
|
VECTOR INDEX statements for FalkorDB on Entity.name_embedding,
|
||||||
|
RELATES_TO.fact_embedding, and Community.name_embedding.
|
||||||
|
2. Extend `falkordb_driver.py:build_indices_and_constraints()` to create
|
||||||
|
the vector indexes alongside range and fulltext indexes.
|
||||||
|
3. Rewrite the three vector-similarity call sites in
|
||||||
|
`falkordb/operations/search_ops.py` to use
|
||||||
|
`db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||||
|
instead of full-scan cosine math. Over-fetches by a configurable
|
||||||
|
multiplier to handle filter rejections.
|
||||||
|
|
||||||
|
## Files
|
||||||
|
|
||||||
|
| Patched file | Source |
|
||||||
|
|---|---|
|
||||||
|
| `graphiti_core/graph_queries.py` | Adds `get_vector_indices()` |
|
||||||
|
| `graphiti_core/driver/falkordb/falkordb_driver.py` | Extends `build_indices_and_constraints` |
|
||||||
|
| `graphiti_core/driver/falkordb/operations/search_ops.py` | Three query rewrites |
|
||||||
|
|
||||||
|
## How to apply
|
||||||
|
|
||||||
|
`./apply.sh` — backs up the originals into `./backups/<timestamp>/`
|
||||||
|
and copies the patched files over.
|
||||||
|
|
||||||
|
## How to revert
|
||||||
|
|
||||||
|
Move the timestamped backup back over the venv:
|
||||||
|
|
||||||
|
cp backups/<ts>/graph_queries.py /home/aaron/aaronai/venv/lib/python3.12/site-packages/graphiti_core/graph_queries.py
|
||||||
|
# ...etc
|
||||||
|
|
||||||
|
## Upstream candidate
|
||||||
|
|
||||||
|
Documented gap (issue #1263 references it indirectly via vector store
|
||||||
|
overlay RFC). Maintainers' attention is on Milvus/external vector DB
|
||||||
|
overlay; this patch is the FalkorDB-native alternative for users who
|
||||||
|
don't want a separate vector DB. Consider PR after empirical validation
|
||||||
|
in production.
|
||||||
Executable
+77
@@ -0,0 +1,77 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# apply.sh — Apply the BirdAI vendored graphiti-core patches.
|
||||||
|
#
|
||||||
|
# Backs up the original venv files into ./backups/<timestamp>/ before
|
||||||
|
# overwriting. The backup directory layout mirrors the venv layout so a
|
||||||
|
# revert is just a tree copy back.
|
||||||
|
#
|
||||||
|
# Usage: ./apply.sh
|
||||||
|
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
PATCH_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||||
|
VENV_BASE="/home/aaron/aaronai/venv/lib/python3.12/site-packages"
|
||||||
|
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
|
||||||
|
BACKUP_DIR="$PATCH_DIR/backups/$TIMESTAMP"
|
||||||
|
|
||||||
|
# Files to patch — paths relative to graphiti_core/.
|
||||||
|
FILES=(
|
||||||
|
"graph_queries.py"
|
||||||
|
"driver/falkordb_driver.py"
|
||||||
|
"driver/falkordb/operations/search_ops.py"
|
||||||
|
)
|
||||||
|
|
||||||
|
echo "graphiti-core vendored patch apply — BirdAI"
|
||||||
|
echo "Patch directory: $PATCH_DIR"
|
||||||
|
echo "Venv target: $VENV_BASE/graphiti_core/"
|
||||||
|
echo "Backup to: $BACKUP_DIR"
|
||||||
|
echo
|
||||||
|
|
||||||
|
# Pre-flight: confirm all source patch files exist.
|
||||||
|
for rel in "${FILES[@]}"; do
|
||||||
|
if [ ! -f "$PATCH_DIR/graphiti_core/$rel" ]; then
|
||||||
|
echo "ERROR: missing patch file: $PATCH_DIR/graphiti_core/$rel" >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Pre-flight: confirm all target venv files exist.
|
||||||
|
for rel in "${FILES[@]}"; do
|
||||||
|
if [ ! -f "$VENV_BASE/graphiti_core/$rel" ]; then
|
||||||
|
echo "ERROR: missing venv file: $VENV_BASE/graphiti_core/$rel" >&2
|
||||||
|
echo " graphiti-core may not be installed, or version differs from 0.29.0." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Backup originals.
|
||||||
|
echo "[1/3] Backing up originals..."
|
||||||
|
for rel in "${FILES[@]}"; do
|
||||||
|
backup_path="$BACKUP_DIR/graphiti_core/$rel"
|
||||||
|
mkdir -p "$(dirname "$backup_path")"
|
||||||
|
cp "$VENV_BASE/graphiti_core/$rel" "$backup_path"
|
||||||
|
echo " backed up: $rel"
|
||||||
|
done
|
||||||
|
echo
|
||||||
|
|
||||||
|
# Apply patches by copying.
|
||||||
|
echo "[2/3] Applying patches..."
|
||||||
|
for rel in "${FILES[@]}"; do
|
||||||
|
cp "$PATCH_DIR/graphiti_core/$rel" "$VENV_BASE/graphiti_core/$rel"
|
||||||
|
echo " patched: $rel"
|
||||||
|
done
|
||||||
|
echo
|
||||||
|
|
||||||
|
# Sanity check: confirm patched files have the marker.
|
||||||
|
echo "[3/3] Verifying patched files..."
|
||||||
|
for rel in "${FILES[@]}"; do
|
||||||
|
if grep -q "PATCHED 2026-05-02" "$VENV_BASE/graphiti_core/$rel"; then
|
||||||
|
echo " OK: $rel contains patch marker"
|
||||||
|
else
|
||||||
|
echo " WARNING: $rel missing patch marker (may be expected for graph_queries.py — its docstring uses the marker only in the module header)"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
echo
|
||||||
|
echo "Done. Backup: $BACKUP_DIR"
|
||||||
|
echo "Restart the sidecar to pick up changes:"
|
||||||
|
echo " sudo systemctl restart aaronai-graphiti.service"
|
||||||
@@ -0,0 +1,904 @@
|
|||||||
|
"""
|
||||||
|
Copyright 2024, Zep Software, Inc.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from graphiti_core.driver.driver import GraphProvider
|
||||||
|
from graphiti_core.driver.falkordb import STOPWORDS
|
||||||
|
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||||
|
from graphiti_core.driver.query_executor import QueryExecutor
|
||||||
|
from graphiti_core.driver.record_parsers import (
|
||||||
|
community_node_from_record,
|
||||||
|
entity_edge_from_record,
|
||||||
|
entity_node_from_record,
|
||||||
|
episodic_node_from_record,
|
||||||
|
)
|
||||||
|
from graphiti_core.edges import EntityEdge
|
||||||
|
from graphiti_core.graph_queries import (
|
||||||
|
get_nodes_query,
|
||||||
|
get_relationships_query,
|
||||||
|
get_vector_cosine_func_query,
|
||||||
|
)
|
||||||
|
from graphiti_core.models.edges.edge_db_queries import get_entity_edge_return_query
|
||||||
|
from graphiti_core.models.nodes.node_db_queries import (
|
||||||
|
COMMUNITY_NODE_RETURN,
|
||||||
|
EPISODIC_NODE_RETURN,
|
||||||
|
get_entity_node_return_query,
|
||||||
|
)
|
||||||
|
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
|
||||||
|
from graphiti_core.search.search_filters import (
|
||||||
|
SearchFilters,
|
||||||
|
edge_search_filter_query_constructor,
|
||||||
|
node_search_filter_query_constructor,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
MAX_QUERY_LENGTH = 128
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Vector index dispatcher (PATCHED 2026-05-02, BirdAI vendored patch).
|
||||||
|
#
|
||||||
|
# graphiti-core's FalkorDB driver historically composed similarity queries
|
||||||
|
# using `vec.cosineDistance(...)` in interpreted Cypher, which produces a
|
||||||
|
# full-table scan for every search. FalkorDB supports native vector indexes
|
||||||
|
# via `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`;
|
||||||
|
# this dispatcher uses them when present and falls back to the cosine math
|
||||||
|
# otherwise.
|
||||||
|
#
|
||||||
|
# Index existence is checked once per (label, attribute, entity_type) and
|
||||||
|
# cached at module scope. The cache should be invalidated whenever
|
||||||
|
# `build_indices_and_constraints` runs (since indexes may have been created
|
||||||
|
# or dropped). FalkorDriver.build_indices_and_constraints is patched to
|
||||||
|
# call `_invalidate_falkordb_vector_index_cache()` after building.
|
||||||
|
#
|
||||||
|
# Over-fetch factor (VECTOR_INDEX_CANDIDATE_MULTIPLIER from graph_queries)
|
||||||
|
# preserves recall when WHERE filters reject some of the top-k candidates.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
from graphiti_core.graph_queries import (
|
||||||
|
VECTOR_INDEX_CANDIDATE_MULTIPLIER,
|
||||||
|
get_vector_cosine_func_query,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Cache: key = (label, attribute, entity_type), value = bool
|
||||||
|
# entity_type is 'NODE' or 'RELATIONSHIP'.
|
||||||
|
_FALKORDB_VECTOR_INDEX_CACHE: dict[tuple[str, str, str], bool] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _invalidate_falkordb_vector_index_cache() -> None:
|
||||||
|
"""Clear the vector-index existence cache. Call after build_indices_and_constraints."""
|
||||||
|
_FALKORDB_VECTOR_INDEX_CACHE.clear()
|
||||||
|
|
||||||
|
|
||||||
|
async def _falkordb_vector_index_exists(
|
||||||
|
executor: QueryExecutor,
|
||||||
|
label: str,
|
||||||
|
attribute: str,
|
||||||
|
entity_type: str,
|
||||||
|
) -> bool:
|
||||||
|
"""Check whether a FalkorDB vector index exists for the given target.
|
||||||
|
|
||||||
|
entity_type is 'NODE' for node-label indexes, 'RELATIONSHIP' for edge-type indexes.
|
||||||
|
Result is cached at module scope; call _invalidate_falkordb_vector_index_cache()
|
||||||
|
after building or dropping indexes.
|
||||||
|
"""
|
||||||
|
key = (label, attribute, entity_type)
|
||||||
|
if key in _FALKORDB_VECTOR_INDEX_CACHE:
|
||||||
|
return _FALKORDB_VECTOR_INDEX_CACHE[key]
|
||||||
|
|
||||||
|
try:
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
"CALL db.indexes() YIELD label, properties, types, entitytype "
|
||||||
|
"RETURN label, properties, types, entitytype"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
# If we cannot enumerate indexes, fall back to "no index" rather than
|
||||||
|
# propagating the error. The fallback cosine-math path is correct,
|
||||||
|
# just slower.
|
||||||
|
logger.warning(f"FalkorDB vector index probe failed; assuming none exist: {e}")
|
||||||
|
_FALKORDB_VECTOR_INDEX_CACHE[key] = False
|
||||||
|
return False
|
||||||
|
|
||||||
|
found = False
|
||||||
|
for r in records:
|
||||||
|
# Records come back as dict-like rows keyed by column name (not
|
||||||
|
# tuples). Access by string keys matching the YIELD clause above.
|
||||||
|
rec_label = r.get('label') if hasattr(r, 'get') else r['label']
|
||||||
|
rec_props = r.get('properties') if hasattr(r, 'get') else r['properties']
|
||||||
|
rec_types = r.get('types') if hasattr(r, 'get') else r['types']
|
||||||
|
rec_entitytype = r.get('entitytype') if hasattr(r, 'get') else r['entitytype']
|
||||||
|
if rec_props is None:
|
||||||
|
rec_props = []
|
||||||
|
if rec_types is None:
|
||||||
|
rec_types = {}
|
||||||
|
|
||||||
|
if rec_label != label:
|
||||||
|
continue
|
||||||
|
if rec_entitytype is not None and rec_entitytype != entity_type:
|
||||||
|
continue
|
||||||
|
if attribute not in rec_props:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# rec_types is a dict like {attribute: ['VECTOR', ...], ...} or sometimes
|
||||||
|
# a flat list — handle both shapes.
|
||||||
|
if isinstance(rec_types, dict):
|
||||||
|
attr_types = rec_types.get(attribute, [])
|
||||||
|
else:
|
||||||
|
attr_types = rec_types
|
||||||
|
if 'VECTOR' in attr_types:
|
||||||
|
found = True
|
||||||
|
break
|
||||||
|
|
||||||
|
_FALKORDB_VECTOR_INDEX_CACHE[key] = found
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
def _falkordb_vector_node_search_cypher(
|
||||||
|
label: str,
|
||||||
|
embedding_attr: str,
|
||||||
|
search_vector_param: str,
|
||||||
|
use_index: bool,
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
"""Build the cypher prefix and node-binding for a node-vector search.
|
||||||
|
|
||||||
|
Returns (prefix, node_var) where:
|
||||||
|
- prefix is the Cypher fragment that binds the node variable and a
|
||||||
|
`score` variable. With index, it's a CALL ... YIELD; without, it's
|
||||||
|
a MATCH plus WITH cosine math.
|
||||||
|
- node_var is the variable name the caller's downstream Cypher should
|
||||||
|
reference (always 'n' here for parity with the existing code).
|
||||||
|
|
||||||
|
The caller appends WHERE filters and RETURN/ORDER BY/LIMIT as usual.
|
||||||
|
The over-fetch parameter `$candidate_k` must be passed by the caller
|
||||||
|
when use_index is True.
|
||||||
|
"""
|
||||||
|
if use_index:
|
||||||
|
return (
|
||||||
|
f"CALL db.idx.vector.queryNodes("
|
||||||
|
f"'{label}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||||
|
f") YIELD node, score "
|
||||||
|
f"WITH node AS n, score "
|
||||||
|
), "n"
|
||||||
|
# Fallback: original cosine math path
|
||||||
|
cosine = get_vector_cosine_func_query(
|
||||||
|
f"n.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f"MATCH (n:{label}) "
|
||||||
|
f"WITH n, {cosine} AS score "
|
||||||
|
), "n"
|
||||||
|
|
||||||
|
|
||||||
|
def _falkordb_vector_edge_search_cypher(
|
||||||
|
relationship_type: str,
|
||||||
|
embedding_attr: str,
|
||||||
|
search_vector_param: str,
|
||||||
|
use_index: bool,
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
"""Build the cypher prefix and edge-binding for an edge-vector search.
|
||||||
|
|
||||||
|
Returns (prefix, edge_var). With the index, the procedure binds the
|
||||||
|
relationship variable; we then MATCH source and target via the existing
|
||||||
|
edge to recover (n)-[e]->(m). Without the index, it's the original
|
||||||
|
MATCH-and-cosine path.
|
||||||
|
|
||||||
|
Variable name is 'e' for parity with existing code; source/target are
|
||||||
|
'n' and 'm' respectively, also for parity.
|
||||||
|
"""
|
||||||
|
if use_index:
|
||||||
|
return (
|
||||||
|
f"CALL db.idx.vector.queryRelationships("
|
||||||
|
f"'{relationship_type}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||||
|
f") YIELD relationship, score "
|
||||||
|
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||||
|
f"WHERE e = relationship "
|
||||||
|
f"WITH DISTINCT e, n, m, score "
|
||||||
|
), "e"
|
||||||
|
# Fallback
|
||||||
|
cosine = get_vector_cosine_func_query(
|
||||||
|
f"e.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||||
|
f"WITH DISTINCT e, n, m, {cosine} AS score "
|
||||||
|
), "e"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# FalkorDB separator characters that break text into tokens
|
||||||
|
_SEPARATOR_MAP = str.maketrans(
|
||||||
|
{
|
||||||
|
',': ' ',
|
||||||
|
'.': ' ',
|
||||||
|
'<': ' ',
|
||||||
|
'>': ' ',
|
||||||
|
'{': ' ',
|
||||||
|
'}': ' ',
|
||||||
|
'[': ' ',
|
||||||
|
']': ' ',
|
||||||
|
'"': ' ',
|
||||||
|
"'": ' ',
|
||||||
|
':': ' ',
|
||||||
|
';': ' ',
|
||||||
|
'!': ' ',
|
||||||
|
'@': ' ',
|
||||||
|
'#': ' ',
|
||||||
|
'$': ' ',
|
||||||
|
'%': ' ',
|
||||||
|
'^': ' ',
|
||||||
|
'&': ' ',
|
||||||
|
'*': ' ',
|
||||||
|
'(': ' ',
|
||||||
|
')': ' ',
|
||||||
|
'-': ' ',
|
||||||
|
'+': ' ',
|
||||||
|
'=': ' ',
|
||||||
|
'~': ' ',
|
||||||
|
'?': ' ',
|
||||||
|
'|': ' ',
|
||||||
|
'/': ' ',
|
||||||
|
'\\': ' ',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize(query: str) -> str:
|
||||||
|
"""Replace FalkorDB special characters with whitespace."""
|
||||||
|
sanitized = query.translate(_SEPARATOR_MAP)
|
||||||
|
return ' '.join(sanitized.split())
|
||||||
|
|
||||||
|
|
||||||
|
def _build_falkor_fulltext_query(
|
||||||
|
query: str,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
max_query_length: int = MAX_QUERY_LENGTH,
|
||||||
|
) -> str:
|
||||||
|
"""Build a fulltext query string for FalkorDB using RedisSearch syntax."""
|
||||||
|
if group_ids is None or len(group_ids) == 0:
|
||||||
|
group_filter = ''
|
||||||
|
else:
|
||||||
|
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||||
|
group_values = '|'.join(escaped_group_ids)
|
||||||
|
group_filter = f'(@group_id:{group_values})'
|
||||||
|
|
||||||
|
sanitized_query = _sanitize(query)
|
||||||
|
|
||||||
|
# Remove stopwords and empty tokens
|
||||||
|
query_words = sanitized_query.split()
|
||||||
|
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||||
|
sanitized_query = ' | '.join(filtered_words)
|
||||||
|
|
||||||
|
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||||
|
return ''
|
||||||
|
|
||||||
|
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||||
|
return full_query
|
||||||
|
|
||||||
|
|
||||||
|
class FalkorSearchOperations(SearchOperations):
|
||||||
|
# --- Node search ---
|
||||||
|
|
||||||
|
async def node_fulltext_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
query: str,
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[EntityNode]:
|
||||||
|
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||||
|
if fuzzy_query == '':
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('n.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
get_nodes_query(
|
||||||
|
'node_name_and_summary', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
+ 'YIELD node AS n, score'
|
||||||
|
+ filter_query
|
||||||
|
+ """
|
||||||
|
WITH n, score
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher,
|
||||||
|
query=fuzzy_query,
|
||||||
|
limit=limit,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
|
||||||
|
return [entity_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
async def node_similarity_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
search_vector: list[float],
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
min_score: float = 0.6,
|
||||||
|
) -> list[EntityNode]:
|
||||||
|
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('n.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||||
|
# index when available; fall back to interpreted-Cypher cosine math
|
||||||
|
# otherwise. The filter clause's position changes between paths
|
||||||
|
# (after MATCH for fallback, after YIELD for index path), but the
|
||||||
|
# filter expressions themselves are identical because they reference
|
||||||
|
# the bound variable `n` either way.
|
||||||
|
use_index = await _falkordb_vector_index_exists(
|
||||||
|
executor, 'Entity', 'name_embedding', 'NODE'
|
||||||
|
)
|
||||||
|
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||||
|
'Entity', 'name_embedding', '$search_vector', use_index
|
||||||
|
)
|
||||||
|
where_clauses = []
|
||||||
|
if filter_query:
|
||||||
|
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||||
|
where_clauses.append('score > $min_score')
|
||||||
|
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
prefix
|
||||||
|
+ unified_where
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
params = dict(
|
||||||
|
search_vector=search_vector,
|
||||||
|
limit=limit,
|
||||||
|
min_score=min_score,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
if use_index:
|
||||||
|
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||||
|
records, _, _ = await executor.execute_query(cypher, **params)
|
||||||
|
|
||||||
|
return [entity_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
async def node_bfs_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
origin_uuids: list[str],
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
max_depth: int,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[EntityNode]:
|
||||||
|
if not origin_uuids or max_depth < 1:
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('n.group_id IN $group_ids')
|
||||||
|
filter_queries.append('origin.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' AND ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
f"""
|
||||||
|
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||||
|
MATCH (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(n:Entity)
|
||||||
|
WHERE n.group_id = origin.group_id
|
||||||
|
"""
|
||||||
|
+ filter_query
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||||
|
+ """
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher,
|
||||||
|
bfs_origin_node_uuids=origin_uuids,
|
||||||
|
limit=limit,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
|
||||||
|
return [entity_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
# --- Edge search ---
|
||||||
|
|
||||||
|
async def edge_fulltext_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
query: str,
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[EntityEdge]:
|
||||||
|
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||||
|
if fuzzy_query == '':
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('e.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
get_relationships_query(
|
||||||
|
'edge_name_and_fact', limit=limit, provider=GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
+ """
|
||||||
|
YIELD relationship AS rel, score
|
||||||
|
MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
|
||||||
|
"""
|
||||||
|
+ filter_query
|
||||||
|
+ """
|
||||||
|
WITH e, score, n, m
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher,
|
||||||
|
query=fuzzy_query,
|
||||||
|
limit=limit,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
|
||||||
|
return [entity_edge_from_record(r) for r in records]
|
||||||
|
|
||||||
|
async def edge_similarity_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
search_vector: list[float],
|
||||||
|
source_node_uuid: str | None,
|
||||||
|
target_node_uuid: str | None,
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
min_score: float = 0.6,
|
||||||
|
) -> list[EntityEdge]:
|
||||||
|
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('e.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
if source_node_uuid is not None:
|
||||||
|
filter_params['source_uuid'] = source_node_uuid
|
||||||
|
filter_queries.append('n.uuid = $source_uuid')
|
||||||
|
|
||||||
|
if target_node_uuid is not None:
|
||||||
|
filter_params['target_uuid'] = target_node_uuid
|
||||||
|
filter_queries.append('m.uuid = $target_uuid')
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||||
|
# index on RELATES_TO.fact_embedding when available. The unindexed
|
||||||
|
# fallback is the same MATCH-and-cosine math that previously hung
|
||||||
|
# for 6+ minutes on a 4,000-entity graph; this is the load-bearing
|
||||||
|
# call site that motivated the patch.
|
||||||
|
use_index = await _falkordb_vector_index_exists(
|
||||||
|
executor, 'RELATES_TO', 'fact_embedding', 'RELATIONSHIP'
|
||||||
|
)
|
||||||
|
prefix, _ = _falkordb_vector_edge_search_cypher(
|
||||||
|
'RELATES_TO', 'fact_embedding', '$search_vector', use_index
|
||||||
|
)
|
||||||
|
where_clauses = []
|
||||||
|
if filter_query:
|
||||||
|
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||||
|
where_clauses.append('score > $min_score')
|
||||||
|
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
prefix
|
||||||
|
+ unified_where
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
params = dict(
|
||||||
|
search_vector=search_vector,
|
||||||
|
limit=limit,
|
||||||
|
min_score=min_score,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
if use_index:
|
||||||
|
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||||
|
records, _, _ = await executor.execute_query(cypher, **params)
|
||||||
|
|
||||||
|
return [entity_edge_from_record(r) for r in records]
|
||||||
|
|
||||||
|
async def edge_bfs_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
origin_uuids: list[str],
|
||||||
|
max_depth: int,
|
||||||
|
search_filter: SearchFilters,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[EntityEdge]:
|
||||||
|
if not origin_uuids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||||
|
search_filter, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if group_ids is not None:
|
||||||
|
filter_queries.append('e.group_id IN $group_ids')
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
filter_query = ''
|
||||||
|
if filter_queries:
|
||||||
|
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
f"""
|
||||||
|
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||||
|
MATCH path = (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(:Entity)
|
||||||
|
UNWIND relationships(path) AS rel
|
||||||
|
MATCH (n:Entity)-[e:RELATES_TO {{uuid: rel.uuid}}]-(m:Entity)
|
||||||
|
"""
|
||||||
|
+ filter_query
|
||||||
|
+ """
|
||||||
|
RETURN DISTINCT
|
||||||
|
"""
|
||||||
|
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||||
|
+ """
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher,
|
||||||
|
bfs_origin_node_uuids=origin_uuids,
|
||||||
|
depth=max_depth,
|
||||||
|
limit=limit,
|
||||||
|
**filter_params,
|
||||||
|
)
|
||||||
|
|
||||||
|
return [entity_edge_from_record(r) for r in records]
|
||||||
|
|
||||||
|
# --- Episode search ---
|
||||||
|
|
||||||
|
async def episode_fulltext_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
query: str,
|
||||||
|
search_filter: SearchFilters, # noqa: ARG002
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[EpisodicNode]:
|
||||||
|
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||||
|
if fuzzy_query == '':
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_params: dict[str, Any] = {}
|
||||||
|
group_filter_query = ''
|
||||||
|
if group_ids is not None:
|
||||||
|
group_filter_query += '\nAND e.group_id IN $group_ids'
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
get_nodes_query(
|
||||||
|
'episode_content', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
+ """
|
||||||
|
YIELD node AS episode, score
|
||||||
|
MATCH (e:Episodic)
|
||||||
|
WHERE e.uuid = episode.uuid
|
||||||
|
"""
|
||||||
|
+ group_filter_query
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ EPISODIC_NODE_RETURN
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||||
|
)
|
||||||
|
|
||||||
|
return [episodic_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
# --- Community search ---
|
||||||
|
|
||||||
|
async def community_fulltext_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
query: str,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
) -> list[CommunityNode]:
|
||||||
|
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||||
|
if fuzzy_query == '':
|
||||||
|
return []
|
||||||
|
|
||||||
|
filter_params: dict[str, Any] = {}
|
||||||
|
group_filter_query = ''
|
||||||
|
if group_ids is not None:
|
||||||
|
group_filter_query = 'WHERE c.group_id IN $group_ids'
|
||||||
|
filter_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
get_nodes_query(
|
||||||
|
'community_name', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
+ """
|
||||||
|
YIELD node AS c, score
|
||||||
|
WITH c, score
|
||||||
|
"""
|
||||||
|
+ group_filter_query
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ COMMUNITY_NODE_RETURN
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(
|
||||||
|
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||||
|
)
|
||||||
|
|
||||||
|
return [community_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
async def community_similarity_search(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
search_vector: list[float],
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
limit: int = 10,
|
||||||
|
min_score: float = 0.6,
|
||||||
|
) -> list[CommunityNode]:
|
||||||
|
query_params: dict[str, Any] = {}
|
||||||
|
|
||||||
|
group_filter_query = ''
|
||||||
|
if group_ids is not None:
|
||||||
|
group_filter_query += ' WHERE c.group_id IN $group_ids'
|
||||||
|
query_params['group_ids'] = group_ids
|
||||||
|
|
||||||
|
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||||
|
# index on Community.name_embedding when available. Note: the existing
|
||||||
|
# filter is built into `group_filter_query` (already prefixed with
|
||||||
|
# ' WHERE ' if non-empty) and uses variable `c`. The dispatcher binds
|
||||||
|
# the node as `n` for parity with the helper signature, then we
|
||||||
|
# re-bind to `c` via WITH so the rest of the query is unchanged.
|
||||||
|
use_index = await _falkordb_vector_index_exists(
|
||||||
|
executor, 'Community', 'name_embedding', 'NODE'
|
||||||
|
)
|
||||||
|
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||||
|
'Community', 'name_embedding', '$search_vector', use_index
|
||||||
|
)
|
||||||
|
prefix = prefix + ' WITH n AS c, score '
|
||||||
|
where_clauses = []
|
||||||
|
if group_filter_query:
|
||||||
|
where_clauses.append(group_filter_query.replace(' WHERE ', '', 1).strip())
|
||||||
|
where_clauses.append('score > $min_score')
|
||||||
|
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||||
|
|
||||||
|
cypher = (
|
||||||
|
prefix
|
||||||
|
+ unified_where
|
||||||
|
+ """
|
||||||
|
RETURN
|
||||||
|
"""
|
||||||
|
+ COMMUNITY_NODE_RETURN
|
||||||
|
+ """
|
||||||
|
ORDER BY score DESC
|
||||||
|
LIMIT $limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
params = dict(
|
||||||
|
search_vector=search_vector,
|
||||||
|
limit=limit,
|
||||||
|
min_score=min_score,
|
||||||
|
**query_params,
|
||||||
|
)
|
||||||
|
if use_index:
|
||||||
|
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||||
|
records, _, _ = await executor.execute_query(cypher, **params)
|
||||||
|
|
||||||
|
return [community_node_from_record(r) for r in records]
|
||||||
|
|
||||||
|
# --- Rerankers ---
|
||||||
|
|
||||||
|
async def node_distance_reranker(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
node_uuids: list[str],
|
||||||
|
center_node_uuid: str,
|
||||||
|
min_score: float = 0,
|
||||||
|
) -> list[EntityNode]:
|
||||||
|
filtered_uuids = [u for u in node_uuids if u != center_node_uuid]
|
||||||
|
scores: dict[str, float] = {center_node_uuid: 0.0}
|
||||||
|
|
||||||
|
cypher = """
|
||||||
|
UNWIND $node_uuids AS node_uuid
|
||||||
|
MATCH (center:Entity {uuid: $center_uuid})-[:RELATES_TO]-(n:Entity {uuid: node_uuid})
|
||||||
|
RETURN 1 AS score, node_uuid AS uuid
|
||||||
|
"""
|
||||||
|
|
||||||
|
results, _, _ = await executor.execute_query(
|
||||||
|
cypher,
|
||||||
|
node_uuids=filtered_uuids,
|
||||||
|
center_uuid=center_node_uuid,
|
||||||
|
)
|
||||||
|
|
||||||
|
for result in results:
|
||||||
|
scores[result['uuid']] = result['score']
|
||||||
|
|
||||||
|
for uuid in filtered_uuids:
|
||||||
|
if uuid not in scores:
|
||||||
|
scores[uuid] = float('inf')
|
||||||
|
|
||||||
|
filtered_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||||
|
|
||||||
|
if center_node_uuid in node_uuids:
|
||||||
|
scores[center_node_uuid] = 0.1
|
||||||
|
filtered_uuids = [center_node_uuid] + filtered_uuids
|
||||||
|
|
||||||
|
reranked_uuids = [u for u in filtered_uuids if (1 / scores[u]) >= min_score]
|
||||||
|
|
||||||
|
if not reranked_uuids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
get_query = """
|
||||||
|
MATCH (n:Entity)
|
||||||
|
WHERE n.uuid IN $uuids
|
||||||
|
RETURN
|
||||||
|
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||||
|
|
||||||
|
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||||
|
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||||
|
|
||||||
|
async def episode_mentions_reranker(
|
||||||
|
self,
|
||||||
|
executor: QueryExecutor,
|
||||||
|
node_uuids: list[str],
|
||||||
|
min_score: float = 0,
|
||||||
|
) -> list[EntityNode]:
|
||||||
|
if not node_uuids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
scores: dict[str, float] = {}
|
||||||
|
|
||||||
|
results, _, _ = await executor.execute_query(
|
||||||
|
"""
|
||||||
|
UNWIND $node_uuids AS node_uuid
|
||||||
|
MATCH (episode:Episodic)-[r:MENTIONS]->(n:Entity {uuid: node_uuid})
|
||||||
|
RETURN count(*) AS score, n.uuid AS uuid
|
||||||
|
""",
|
||||||
|
node_uuids=node_uuids,
|
||||||
|
)
|
||||||
|
|
||||||
|
for result in results:
|
||||||
|
scores[result['uuid']] = result['score']
|
||||||
|
|
||||||
|
for uuid in node_uuids:
|
||||||
|
if uuid not in scores:
|
||||||
|
scores[uuid] = float('inf')
|
||||||
|
|
||||||
|
sorted_uuids = list(node_uuids)
|
||||||
|
sorted_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||||
|
|
||||||
|
reranked_uuids = [u for u in sorted_uuids if scores[u] >= min_score]
|
||||||
|
|
||||||
|
if not reranked_uuids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
get_query = """
|
||||||
|
MATCH (n:Entity)
|
||||||
|
WHERE n.uuid IN $uuids
|
||||||
|
RETURN
|
||||||
|
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||||
|
|
||||||
|
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||||
|
|
||||||
|
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||||
|
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||||
|
|
||||||
|
# --- Filter builders ---
|
||||||
|
|
||||||
|
def build_node_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||||
|
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||||
|
search_filters, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||||
|
|
||||||
|
def build_edge_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||||
|
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||||
|
search_filters, GraphProvider.FALKORDB
|
||||||
|
)
|
||||||
|
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||||
|
|
||||||
|
# --- Fulltext query builder ---
|
||||||
|
|
||||||
|
def build_fulltext_query(
|
||||||
|
self,
|
||||||
|
query: str,
|
||||||
|
group_ids: list[str] | None = None,
|
||||||
|
max_query_length: int = MAX_QUERY_LENGTH,
|
||||||
|
) -> str:
|
||||||
|
return _build_falkor_fulltext_query(query, group_ids, max_query_length)
|
||||||
@@ -0,0 +1,444 @@
|
|||||||
|
"""
|
||||||
|
Copyright 2024, Zep Software, Inc.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from falkordb import Graph as FalkorGraph
|
||||||
|
from falkordb.asyncio import FalkorDB
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
from falkordb import Graph as FalkorGraph
|
||||||
|
from falkordb.asyncio import FalkorDB
|
||||||
|
except ImportError:
|
||||||
|
# If falkordb is not installed, raise an ImportError
|
||||||
|
raise ImportError(
|
||||||
|
'falkordb is required for FalkorDriver. '
|
||||||
|
'Install it with: pip install graphiti-core[falkordb]'
|
||||||
|
) from None
|
||||||
|
|
||||||
|
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
|
||||||
|
from graphiti_core.driver.falkordb import STOPWORDS as STOPWORDS
|
||||||
|
from graphiti_core.driver.falkordb.operations.community_edge_ops import (
|
||||||
|
FalkorCommunityEdgeOperations,
|
||||||
|
)
|
||||||
|
from graphiti_core.driver.falkordb.operations.community_node_ops import (
|
||||||
|
FalkorCommunityNodeOperations,
|
||||||
|
)
|
||||||
|
from graphiti_core.driver.falkordb.operations.entity_edge_ops import FalkorEntityEdgeOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.entity_node_ops import FalkorEntityNodeOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.episode_node_ops import FalkorEpisodeNodeOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.episodic_edge_ops import FalkorEpisodicEdgeOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.graph_ops import FalkorGraphMaintenanceOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.has_episode_edge_ops import (
|
||||||
|
FalkorHasEpisodeEdgeOperations,
|
||||||
|
)
|
||||||
|
from graphiti_core.driver.falkordb.operations.next_episode_edge_ops import (
|
||||||
|
FalkorNextEpisodeEdgeOperations,
|
||||||
|
)
|
||||||
|
from graphiti_core.driver.falkordb.operations.saga_node_ops import FalkorSagaNodeOperations
|
||||||
|
from graphiti_core.driver.falkordb.operations.search_ops import FalkorSearchOperations
|
||||||
|
from graphiti_core.driver.operations.community_edge_ops import CommunityEdgeOperations
|
||||||
|
from graphiti_core.driver.operations.community_node_ops import CommunityNodeOperations
|
||||||
|
from graphiti_core.driver.operations.entity_edge_ops import EntityEdgeOperations
|
||||||
|
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
|
||||||
|
from graphiti_core.driver.operations.episode_node_ops import EpisodeNodeOperations
|
||||||
|
from graphiti_core.driver.operations.episodic_edge_ops import EpisodicEdgeOperations
|
||||||
|
from graphiti_core.driver.operations.graph_ops import GraphMaintenanceOperations
|
||||||
|
from graphiti_core.driver.operations.has_episode_edge_ops import HasEpisodeEdgeOperations
|
||||||
|
from graphiti_core.driver.operations.next_episode_edge_ops import NextEpisodeEdgeOperations
|
||||||
|
from graphiti_core.driver.operations.saga_node_ops import SagaNodeOperations
|
||||||
|
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||||
|
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices, get_vector_indices
|
||||||
|
from graphiti_core.helpers import validate_group_ids
|
||||||
|
from graphiti_core.utils.datetime_utils import convert_datetimes_to_strings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class FalkorDriverSession(GraphDriverSession):
|
||||||
|
provider = GraphProvider.FALKORDB
|
||||||
|
|
||||||
|
def __init__(self, graph: FalkorGraph):
|
||||||
|
self.graph = graph
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
# No cleanup needed for Falkor, but method must exist
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def close(self):
|
||||||
|
# No explicit close needed for FalkorDB, but method must exist
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def execute_write(self, func, *args, **kwargs):
|
||||||
|
# Directly await the provided async function with `self` as the transaction/session
|
||||||
|
return await func(self, *args, **kwargs)
|
||||||
|
|
||||||
|
async def run(self, query: str | list, **kwargs: Any) -> Any:
|
||||||
|
# FalkorDB does not support argument for Label Set, so it's converted into an array of queries
|
||||||
|
if isinstance(query, list):
|
||||||
|
for cypher, params in query:
|
||||||
|
params = convert_datetimes_to_strings(params)
|
||||||
|
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
|
||||||
|
else:
|
||||||
|
params = dict(kwargs)
|
||||||
|
params = convert_datetimes_to_strings(params)
|
||||||
|
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
|
||||||
|
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class FalkorDriver(GraphDriver):
|
||||||
|
provider = GraphProvider.FALKORDB
|
||||||
|
default_group_id: str = '\\_'
|
||||||
|
fulltext_syntax: str = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries
|
||||||
|
aoss_client: None = None
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str = 'localhost',
|
||||||
|
port: int = 6379,
|
||||||
|
username: str | None = None,
|
||||||
|
password: str | None = None,
|
||||||
|
falkor_db: FalkorDB | None = None,
|
||||||
|
database: str = 'default_db',
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the FalkorDB driver.
|
||||||
|
|
||||||
|
FalkorDB is a multi-tenant graph database.
|
||||||
|
To connect, provide the host and port.
|
||||||
|
The default parameters assume a local (on-premises) FalkorDB instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
host (str): The host where FalkorDB is running.
|
||||||
|
port (int): The port on which FalkorDB is listening.
|
||||||
|
username (str | None): The username for authentication (if required).
|
||||||
|
password (str | None): The password for authentication (if required).
|
||||||
|
falkor_db (FalkorDB | None): An existing FalkorDB instance to use instead of creating a new one.
|
||||||
|
database (str): The name of the database to connect to. Defaults to 'default_db'.
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
self._database = database
|
||||||
|
if falkor_db is not None:
|
||||||
|
# If a FalkorDB instance is provided, use it directly
|
||||||
|
self.client = falkor_db
|
||||||
|
else:
|
||||||
|
self.client = FalkorDB(host=host, port=port, username=username, password=password)
|
||||||
|
|
||||||
|
# Instantiate FalkorDB operations
|
||||||
|
self._entity_node_ops = FalkorEntityNodeOperations()
|
||||||
|
self._episode_node_ops = FalkorEpisodeNodeOperations()
|
||||||
|
self._community_node_ops = FalkorCommunityNodeOperations()
|
||||||
|
self._saga_node_ops = FalkorSagaNodeOperations()
|
||||||
|
self._entity_edge_ops = FalkorEntityEdgeOperations()
|
||||||
|
self._episodic_edge_ops = FalkorEpisodicEdgeOperations()
|
||||||
|
self._community_edge_ops = FalkorCommunityEdgeOperations()
|
||||||
|
self._has_episode_edge_ops = FalkorHasEpisodeEdgeOperations()
|
||||||
|
self._next_episode_edge_ops = FalkorNextEpisodeEdgeOperations()
|
||||||
|
self._search_ops = FalkorSearchOperations()
|
||||||
|
self._graph_ops = FalkorGraphMaintenanceOperations()
|
||||||
|
|
||||||
|
# Schedule the indices and constraints to be built
|
||||||
|
try:
|
||||||
|
# Try to get the current event loop
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
# Schedule the build_indices_and_constraints to run
|
||||||
|
loop.create_task(self.build_indices_and_constraints())
|
||||||
|
except RuntimeError:
|
||||||
|
# No event loop running, this will be handled later
|
||||||
|
pass
|
||||||
|
|
||||||
|
# --- Operations properties ---
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entity_node_ops(self) -> EntityNodeOperations:
|
||||||
|
return self._entity_node_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def episode_node_ops(self) -> EpisodeNodeOperations:
|
||||||
|
return self._episode_node_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def community_node_ops(self) -> CommunityNodeOperations:
|
||||||
|
return self._community_node_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def saga_node_ops(self) -> SagaNodeOperations:
|
||||||
|
return self._saga_node_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def entity_edge_ops(self) -> EntityEdgeOperations:
|
||||||
|
return self._entity_edge_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def episodic_edge_ops(self) -> EpisodicEdgeOperations:
|
||||||
|
return self._episodic_edge_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def community_edge_ops(self) -> CommunityEdgeOperations:
|
||||||
|
return self._community_edge_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations:
|
||||||
|
return self._has_episode_edge_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations:
|
||||||
|
return self._next_episode_edge_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def search_ops(self) -> SearchOperations:
|
||||||
|
return self._search_ops
|
||||||
|
|
||||||
|
@property
|
||||||
|
def graph_ops(self) -> GraphMaintenanceOperations:
|
||||||
|
return self._graph_ops
|
||||||
|
|
||||||
|
def _get_graph(self, graph_name: str | None) -> FalkorGraph:
|
||||||
|
# FalkorDB requires a non-None database name for multi-tenant graphs; the default is "default_db"
|
||||||
|
if graph_name is None:
|
||||||
|
graph_name = self._database
|
||||||
|
return self.client.select_graph(graph_name)
|
||||||
|
|
||||||
|
async def execute_query(self, cypher_query_, **kwargs: Any):
|
||||||
|
graph = self._get_graph(self._database)
|
||||||
|
|
||||||
|
# Convert datetime objects to ISO strings (FalkorDB does not support datetime objects directly)
|
||||||
|
params = convert_datetimes_to_strings(dict(kwargs))
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
|
||||||
|
except Exception as e:
|
||||||
|
if 'already indexed' in str(e):
|
||||||
|
# check if index already exists
|
||||||
|
logger.info(f'Index already exists: {e}')
|
||||||
|
return None
|
||||||
|
logger.error(f'Error executing FalkorDB query: {e}\n{cypher_query_}\n{params}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Convert the result header to a list of strings
|
||||||
|
header = [h[1] for h in result.header]
|
||||||
|
|
||||||
|
# Convert FalkorDB's result format (list of lists) to the format expected by Graphiti (list of dicts)
|
||||||
|
records = []
|
||||||
|
for row in result.result_set:
|
||||||
|
record = {}
|
||||||
|
for i, field_name in enumerate(header):
|
||||||
|
if i < len(row):
|
||||||
|
record[field_name] = row[i]
|
||||||
|
else:
|
||||||
|
# If there are more fields in header than values in row, set to None
|
||||||
|
record[field_name] = None
|
||||||
|
records.append(record)
|
||||||
|
|
||||||
|
return records, header, None
|
||||||
|
|
||||||
|
def session(self, database: str | None = None) -> GraphDriverSession:
|
||||||
|
return FalkorDriverSession(self._get_graph(database))
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
"""Close the driver connection."""
|
||||||
|
if hasattr(self.client, 'aclose'):
|
||||||
|
await self.client.aclose() # type: ignore[reportUnknownMemberType]
|
||||||
|
elif hasattr(self.client.connection, 'aclose'):
|
||||||
|
await self.client.connection.aclose()
|
||||||
|
elif hasattr(self.client.connection, 'close'):
|
||||||
|
await self.client.connection.close()
|
||||||
|
|
||||||
|
async def delete_all_indexes(self) -> None:
|
||||||
|
result = await self.execute_query('CALL db.indexes()')
|
||||||
|
if not result:
|
||||||
|
return
|
||||||
|
|
||||||
|
records, _, _ = result
|
||||||
|
drop_tasks = []
|
||||||
|
|
||||||
|
for record in records:
|
||||||
|
label = record['label']
|
||||||
|
entity_type = record['entitytype']
|
||||||
|
|
||||||
|
for field_name, index_type in record['types'].items():
|
||||||
|
if 'RANGE' in index_type:
|
||||||
|
drop_tasks.append(self.execute_query(f'DROP INDEX ON :{label}({field_name})'))
|
||||||
|
elif 'FULLTEXT' in index_type:
|
||||||
|
if entity_type == 'NODE':
|
||||||
|
drop_tasks.append(
|
||||||
|
self.execute_query(
|
||||||
|
f'DROP FULLTEXT INDEX FOR (n:{label}) ON (n.{field_name})'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
elif entity_type == 'RELATIONSHIP':
|
||||||
|
drop_tasks.append(
|
||||||
|
self.execute_query(
|
||||||
|
f'DROP FULLTEXT INDEX FOR ()-[e:{label}]-() ON (e.{field_name})'
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if drop_tasks:
|
||||||
|
await asyncio.gather(*drop_tasks)
|
||||||
|
|
||||||
|
async def build_indices_and_constraints(self, delete_existing=False):
|
||||||
|
if delete_existing:
|
||||||
|
await self.delete_all_indexes()
|
||||||
|
# PATCHED 2026-05-02 (BirdAI vendored patch): add vector indexes alongside
|
||||||
|
# range and fulltext. FalkorDB supports native vector indexes via
|
||||||
|
# db.idx.vector.queryNodes / queryRelationships; without these, similarity
|
||||||
|
# search runs as full-table-scan cosine math in interpreted Cypher.
|
||||||
|
index_queries = (
|
||||||
|
get_range_indices(self.provider)
|
||||||
|
+ get_fulltext_indices(self.provider)
|
||||||
|
+ get_vector_indices(self.provider)
|
||||||
|
)
|
||||||
|
for query in index_queries:
|
||||||
|
await self.execute_query(query)
|
||||||
|
# Invalidate the search_ops vector-index existence cache so subsequent
|
||||||
|
# similarity queries re-probe and discover the indexes we just built.
|
||||||
|
try:
|
||||||
|
from graphiti_core.driver.falkordb.operations.search_ops import (
|
||||||
|
_invalidate_falkordb_vector_index_cache,
|
||||||
|
)
|
||||||
|
_invalidate_falkordb_vector_index_cache()
|
||||||
|
except ImportError:
|
||||||
|
# search_ops module not yet imported (cold start); cache is empty
|
||||||
|
# by default, so no invalidation needed.
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clone(self, database: str) -> 'GraphDriver':
|
||||||
|
"""
|
||||||
|
Returns a shallow copy of this driver with a different default database.
|
||||||
|
Reuses the same connection (e.g. FalkorDB, Neo4j).
|
||||||
|
"""
|
||||||
|
if database == self._database:
|
||||||
|
cloned = self
|
||||||
|
elif database == self.default_group_id:
|
||||||
|
cloned = FalkorDriver(falkor_db=self.client)
|
||||||
|
else:
|
||||||
|
# Create a new instance of FalkorDriver with the same connection but a different database
|
||||||
|
cloned = FalkorDriver(falkor_db=self.client, database=database)
|
||||||
|
|
||||||
|
return cloned
|
||||||
|
|
||||||
|
async def health_check(self) -> None:
|
||||||
|
"""Check FalkorDB connectivity by running a simple query."""
|
||||||
|
try:
|
||||||
|
await self.execute_query('MATCH (n) RETURN 1 LIMIT 1')
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f'FalkorDB health check failed: {e}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def convert_datetimes_to_strings(obj):
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {k: FalkorDriver.convert_datetimes_to_strings(v) for k, v in obj.items()}
|
||||||
|
elif isinstance(obj, list):
|
||||||
|
return [FalkorDriver.convert_datetimes_to_strings(item) for item in obj]
|
||||||
|
elif isinstance(obj, tuple):
|
||||||
|
return tuple(FalkorDriver.convert_datetimes_to_strings(item) for item in obj)
|
||||||
|
elif isinstance(obj, datetime):
|
||||||
|
return obj.isoformat()
|
||||||
|
else:
|
||||||
|
return obj
|
||||||
|
|
||||||
|
def sanitize(self, query: str) -> str:
|
||||||
|
"""
|
||||||
|
Replace FalkorDB special characters with whitespace.
|
||||||
|
Based on FalkorDB tokenization rules: ,.<>{}[]"':;!@#$%^&*()-+=~
|
||||||
|
"""
|
||||||
|
# FalkorDB separator characters that break text into tokens
|
||||||
|
separator_map = str.maketrans(
|
||||||
|
{
|
||||||
|
',': ' ',
|
||||||
|
'.': ' ',
|
||||||
|
'<': ' ',
|
||||||
|
'>': ' ',
|
||||||
|
'{': ' ',
|
||||||
|
'}': ' ',
|
||||||
|
'[': ' ',
|
||||||
|
']': ' ',
|
||||||
|
'"': ' ',
|
||||||
|
"'": ' ',
|
||||||
|
':': ' ',
|
||||||
|
';': ' ',
|
||||||
|
'!': ' ',
|
||||||
|
'@': ' ',
|
||||||
|
'#': ' ',
|
||||||
|
'$': ' ',
|
||||||
|
'%': ' ',
|
||||||
|
'^': ' ',
|
||||||
|
'&': ' ',
|
||||||
|
'*': ' ',
|
||||||
|
'(': ' ',
|
||||||
|
')': ' ',
|
||||||
|
'-': ' ',
|
||||||
|
'+': ' ',
|
||||||
|
'=': ' ',
|
||||||
|
'~': ' ',
|
||||||
|
'?': ' ',
|
||||||
|
'|': ' ',
|
||||||
|
'/': ' ',
|
||||||
|
'\\': ' ',
|
||||||
|
}
|
||||||
|
)
|
||||||
|
sanitized = query.translate(separator_map)
|
||||||
|
# Clean up multiple spaces
|
||||||
|
sanitized = ' '.join(sanitized.split())
|
||||||
|
return sanitized
|
||||||
|
|
||||||
|
def build_fulltext_query(
|
||||||
|
self, query: str, group_ids: list[str] | None = None, max_query_length: int = 128
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Build a fulltext query string for FalkorDB using RedisSearch syntax.
|
||||||
|
FalkorDB uses RedisSearch-like syntax where:
|
||||||
|
- Field queries use @ prefix: @field:value
|
||||||
|
- Multiple values for same field: (@field:value1|value2)
|
||||||
|
- Text search doesn't need @ prefix for content fields
|
||||||
|
- AND is implicit with space: (@group_id:value) (text)
|
||||||
|
- OR uses pipe within parentheses: (@group_id:value1|value2)
|
||||||
|
"""
|
||||||
|
validate_group_ids(group_ids)
|
||||||
|
|
||||||
|
if group_ids is None or len(group_ids) == 0:
|
||||||
|
group_filter = ''
|
||||||
|
else:
|
||||||
|
# Escape group_ids with quotes to prevent RediSearch syntax errors
|
||||||
|
# with reserved words like "main" or special characters like hyphens
|
||||||
|
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||||
|
group_values = '|'.join(escaped_group_ids)
|
||||||
|
group_filter = f'(@group_id:{group_values})'
|
||||||
|
|
||||||
|
sanitized_query = self.sanitize(query)
|
||||||
|
|
||||||
|
# Remove stopwords and empty tokens from the sanitized query
|
||||||
|
query_words = sanitized_query.split()
|
||||||
|
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||||
|
sanitized_query = ' | '.join(filtered_words)
|
||||||
|
|
||||||
|
# If the query is too long return no query
|
||||||
|
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||||
|
return ''
|
||||||
|
|
||||||
|
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||||
|
|
||||||
|
return full_query
|
||||||
@@ -0,0 +1,242 @@
|
|||||||
|
"""
|
||||||
|
Database query utilities for different graph database backends.
|
||||||
|
|
||||||
|
This module provides database-agnostic query generation for Neo4j and FalkorDB,
|
||||||
|
supporting index creation, fulltext search, and bulk operations.
|
||||||
|
|
||||||
|
PATCHED for FalkorDB native vector index support (BirdAI vendored patch,
|
||||||
|
2026-05-02). Adds:
|
||||||
|
- get_vector_indices(): CREATE VECTOR INDEX statements for FalkorDB
|
||||||
|
- get_vector_search_query(): Cypher fragment for vector similarity using
|
||||||
|
FalkorDB's db.idx.vector procedures, with fallback to cosine math when
|
||||||
|
the index does not yet exist
|
||||||
|
- VECTOR_INDEX_CANDIDATE_MULTIPLIER: over-fetch factor for vector index
|
||||||
|
queries to handle filter rejections after index lookup
|
||||||
|
|
||||||
|
No changes to Neo4j or Kuzu code paths.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing_extensions import LiteralString
|
||||||
|
|
||||||
|
from graphiti_core.driver.driver import GraphProvider
|
||||||
|
|
||||||
|
# Mapping from Neo4j fulltext index names to FalkorDB node labels
|
||||||
|
NEO4J_TO_FALKORDB_MAPPING = {
|
||||||
|
'node_name_and_summary': 'Entity',
|
||||||
|
'community_name': 'Community',
|
||||||
|
'episode_content': 'Episodic',
|
||||||
|
'edge_name_and_fact': 'RELATES_TO',
|
||||||
|
}
|
||||||
|
# Mapping from fulltext index names to Kuzu node labels
|
||||||
|
INDEX_TO_LABEL_KUZU_MAPPING = {
|
||||||
|
'node_name_and_summary': 'Entity',
|
||||||
|
'community_name': 'Community',
|
||||||
|
'episode_content': 'Episodic',
|
||||||
|
'edge_name_and_fact': 'RelatesToNode_',
|
||||||
|
}
|
||||||
|
|
||||||
|
# Vector index over-fetch multiplier. When a vector index search is
|
||||||
|
# combined with WHERE filters (group_id, source_uuid, etc.), some of
|
||||||
|
# the top-k index results may be filtered out. Over-fetching by this
|
||||||
|
# factor preserves recall against the final LIMIT after filtering.
|
||||||
|
# Conservative default; tunable per-deployment by editing this constant
|
||||||
|
# or via environment-variable override at the driver level (future).
|
||||||
|
VECTOR_INDEX_CANDIDATE_MULTIPLIER = 5
|
||||||
|
|
||||||
|
|
||||||
|
def get_range_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
return [
|
||||||
|
# Entity node
|
||||||
|
'CREATE INDEX FOR (n:Entity) ON (n.uuid, n.group_id, n.name, n.created_at)',
|
||||||
|
# Episodic node
|
||||||
|
'CREATE INDEX FOR (n:Episodic) ON (n.uuid, n.group_id, n.created_at, n.valid_at)',
|
||||||
|
# Community node
|
||||||
|
'CREATE INDEX FOR (n:Community) ON (n.uuid)',
|
||||||
|
# Saga node
|
||||||
|
'CREATE INDEX FOR (n:Saga) ON (n.uuid, n.group_id, n.name)',
|
||||||
|
# RELATES_TO edge
|
||||||
|
'CREATE INDEX FOR ()-[e:RELATES_TO]-() ON (e.uuid, e.group_id, e.name, e.created_at, e.expired_at, e.valid_at, e.invalid_at)',
|
||||||
|
# MENTIONS edge
|
||||||
|
'CREATE INDEX FOR ()-[e:MENTIONS]-() ON (e.uuid, e.group_id)',
|
||||||
|
# HAS_MEMBER edge
|
||||||
|
'CREATE INDEX FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||||
|
# HAS_EPISODE edge
|
||||||
|
'CREATE INDEX FOR ()-[e:HAS_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||||
|
# NEXT_EPISODE edge
|
||||||
|
'CREATE INDEX FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||||
|
]
|
||||||
|
|
||||||
|
if provider == GraphProvider.KUZU:
|
||||||
|
return []
|
||||||
|
|
||||||
|
return [
|
||||||
|
'CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)',
|
||||||
|
'CREATE INDEX episode_uuid IF NOT EXISTS FOR (n:Episodic) ON (n.uuid)',
|
||||||
|
'CREATE INDEX community_uuid IF NOT EXISTS FOR (n:Community) ON (n.uuid)',
|
||||||
|
'CREATE INDEX saga_uuid IF NOT EXISTS FOR (n:Saga) ON (n.uuid)',
|
||||||
|
'CREATE INDEX relation_uuid IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.uuid)',
|
||||||
|
'CREATE INDEX mention_uuid IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.uuid)',
|
||||||
|
'CREATE INDEX has_member_uuid IF NOT EXISTS FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||||
|
'CREATE INDEX has_episode_uuid IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.uuid)',
|
||||||
|
'CREATE INDEX next_episode_uuid IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid)',
|
||||||
|
'CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id)',
|
||||||
|
'CREATE INDEX episode_group_id IF NOT EXISTS FOR (n:Episodic) ON (n.group_id)',
|
||||||
|
'CREATE INDEX community_group_id IF NOT EXISTS FOR (n:Community) ON (n.group_id)',
|
||||||
|
'CREATE INDEX saga_group_id IF NOT EXISTS FOR (n:Saga) ON (n.group_id)',
|
||||||
|
'CREATE INDEX relation_group_id IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.group_id)',
|
||||||
|
'CREATE INDEX mention_group_id IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.group_id)',
|
||||||
|
'CREATE INDEX has_episode_group_id IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.group_id)',
|
||||||
|
'CREATE INDEX next_episode_group_id IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.group_id)',
|
||||||
|
'CREATE INDEX name_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.name)',
|
||||||
|
'CREATE INDEX saga_name IF NOT EXISTS FOR (n:Saga) ON (n.name)',
|
||||||
|
'CREATE INDEX created_at_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.created_at)',
|
||||||
|
'CREATE INDEX created_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.created_at)',
|
||||||
|
'CREATE INDEX valid_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.valid_at)',
|
||||||
|
'CREATE INDEX name_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.name)',
|
||||||
|
'CREATE INDEX created_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.created_at)',
|
||||||
|
'CREATE INDEX expired_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.expired_at)',
|
||||||
|
'CREATE INDEX valid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.valid_at)',
|
||||||
|
'CREATE INDEX invalid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.invalid_at)',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_fulltext_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
|
from graphiti_core.driver.falkordb import STOPWORDS
|
||||||
|
|
||||||
|
# Convert to string representation for embedding in queries
|
||||||
|
stopwords_str = str(STOPWORDS)
|
||||||
|
|
||||||
|
# Use type: ignore to satisfy LiteralString requirement while maintaining single source of truth
|
||||||
|
return cast(
|
||||||
|
list[LiteralString],
|
||||||
|
[
|
||||||
|
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||||
|
{{
|
||||||
|
label: 'Episodic',
|
||||||
|
stopwords: {stopwords_str}
|
||||||
|
}},
|
||||||
|
'content', 'source', 'source_description', 'group_id'
|
||||||
|
)""",
|
||||||
|
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||||
|
{{
|
||||||
|
label: 'Entity',
|
||||||
|
stopwords: {stopwords_str}
|
||||||
|
}},
|
||||||
|
'name', 'summary', 'group_id'
|
||||||
|
)""",
|
||||||
|
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||||
|
{{
|
||||||
|
label: 'Community',
|
||||||
|
stopwords: {stopwords_str}
|
||||||
|
}},
|
||||||
|
'name', 'group_id'
|
||||||
|
)""",
|
||||||
|
"""CREATE FULLTEXT INDEX FOR ()-[e:RELATES_TO]-() ON (e.name, e.fact, e.group_id)""",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
if provider == GraphProvider.KUZU:
|
||||||
|
return [
|
||||||
|
"CALL CREATE_FTS_INDEX('Episodic', 'episode_content', ['content', 'source', 'source_description']);",
|
||||||
|
"CALL CREATE_FTS_INDEX('Entity', 'node_name_and_summary', ['name', 'summary']);",
|
||||||
|
"CALL CREATE_FTS_INDEX('Community', 'community_name', ['name']);",
|
||||||
|
"CALL CREATE_FTS_INDEX('RelatesToNode_', 'edge_name_and_fact', ['name', 'fact']);",
|
||||||
|
]
|
||||||
|
|
||||||
|
return [
|
||||||
|
"""CREATE FULLTEXT INDEX episode_content IF NOT EXISTS
|
||||||
|
FOR (e:Episodic) ON EACH [e.content, e.source, e.source_description, e.group_id]""",
|
||||||
|
"""CREATE FULLTEXT INDEX node_name_and_summary IF NOT EXISTS
|
||||||
|
FOR (n:Entity) ON EACH [n.name, n.summary, n.group_id]""",
|
||||||
|
"""CREATE FULLTEXT INDEX community_name IF NOT EXISTS
|
||||||
|
FOR (n:Community) ON EACH [n.name, n.group_id]""",
|
||||||
|
"""CREATE FULLTEXT INDEX edge_name_and_fact IF NOT EXISTS
|
||||||
|
FOR ()-[e:RELATES_TO]-() ON EACH [e.name, e.fact, e.group_id]""",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def get_vector_indices(provider: GraphProvider, dimension: int = 384) -> list[LiteralString]:
|
||||||
|
"""Return CREATE VECTOR INDEX statements for the given provider.
|
||||||
|
|
||||||
|
For FalkorDB: creates HNSW vector indexes on Entity.name_embedding,
|
||||||
|
RELATES_TO.fact_embedding, and Community.name_embedding. Backed by
|
||||||
|
FalkorDB's native vector index (db.idx.vector.queryNodes /
|
||||||
|
queryRelationships).
|
||||||
|
|
||||||
|
For Neo4j and Kuzu: returns an empty list. Those backends create vector
|
||||||
|
indexes via different mechanisms (Neo4j auto-creates them when needed
|
||||||
|
via its vector.similarity.cosine function; Kuzu uses array_cosine_similarity
|
||||||
|
and does not require pre-built vector indexes for graphiti-core's usage).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
provider: The graph database provider.
|
||||||
|
dimension: Embedding dimension. Defaults to 384 (all-MiniLM-L6-v2).
|
||||||
|
Embedders with different dimensions should pass their own value
|
||||||
|
through driver configuration. graphiti-core's default embedder
|
||||||
|
is 1536 (OpenAI ada-002); BirdAI uses 384 (sentence-transformers).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of CREATE VECTOR INDEX statements. Idempotent at FalkorDB level
|
||||||
|
if the index already exists with matching options.
|
||||||
|
"""
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
from typing import cast
|
||||||
|
return cast(
|
||||||
|
list[LiteralString],
|
||||||
|
[
|
||||||
|
f"CREATE VECTOR INDEX FOR (n:Entity) ON (n.name_embedding) "
|
||||||
|
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||||
|
f"CREATE VECTOR INDEX FOR ()-[e:RELATES_TO]-() ON (e.fact_embedding) "
|
||||||
|
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||||
|
f"CREATE VECTOR INDEX FOR (n:Community) ON (n.name_embedding) "
|
||||||
|
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def get_nodes_query(name: str, query: str, limit: int, provider: GraphProvider) -> str:
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||||
|
return f"CALL db.idx.fulltext.queryNodes('{label}', {query})"
|
||||||
|
|
||||||
|
if provider == GraphProvider.KUZU:
|
||||||
|
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||||
|
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', {query}, TOP := $limit)"
|
||||||
|
|
||||||
|
return f'CALL db.index.fulltext.queryNodes("{name}", {query}, {{limit: $limit}})'
|
||||||
|
|
||||||
|
|
||||||
|
def get_vector_cosine_func_query(vec1, vec2, provider: GraphProvider) -> str:
|
||||||
|
"""Return a Cypher fragment for cosine similarity score in [0, 1].
|
||||||
|
|
||||||
|
PRESERVED for backward compatibility and as fallback when vector indexes
|
||||||
|
do not yet exist on the FalkorDB backend. New code paths should prefer
|
||||||
|
get_vector_search_query() which uses the native vector index when
|
||||||
|
available.
|
||||||
|
"""
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
# FalkorDB uses a different syntax for regular cosine similarity and Neo4j uses normalized cosine similarity
|
||||||
|
return f'(2 - vec.cosineDistance({vec1}, vecf32({vec2})))/2'
|
||||||
|
|
||||||
|
if provider == GraphProvider.KUZU:
|
||||||
|
return f'array_cosine_similarity({vec1}, {vec2})'
|
||||||
|
|
||||||
|
return f'vector.similarity.cosine({vec1}, {vec2})'
|
||||||
|
|
||||||
|
|
||||||
|
def get_relationships_query(name: str, limit: int, provider: GraphProvider) -> str:
|
||||||
|
if provider == GraphProvider.FALKORDB:
|
||||||
|
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||||
|
return f"CALL db.idx.fulltext.queryRelationships('{label}', $query)"
|
||||||
|
|
||||||
|
if provider == GraphProvider.KUZU:
|
||||||
|
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||||
|
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', cast($query AS STRING), TOP := $limit)"
|
||||||
|
|
||||||
|
return f'CALL db.index.fulltext.queryRelationships("{name}", $query, {{limit: $limit}})'
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
-- Migration: 20260502-001_async_job_model
|
||||||
|
-- Purpose: Pattern 1 async job model — sidecar processes ingest jobs serially
|
||||||
|
-- via Postgres-backed queue. Worker submits and polls rather than
|
||||||
|
-- blocking on synchronous HTTP response.
|
||||||
|
--
|
||||||
|
-- Architectural rationale: tonight's smoke test (2026-05-02 ~01:40-01:50 UTC)
|
||||||
|
-- diagnosed that bulk ingest against a 4,222-entity graph commits successfully
|
||||||
|
-- but the worker's HTTP read-timeout fires before the response returns. Three
|
||||||
|
-- days of "saga deadlock" failures were false negatives — the work succeeded;
|
||||||
|
-- the worker just stopped listening. Pattern 1 separates submission from
|
||||||
|
-- completion observation so the worker can't false-negative this way.
|
||||||
|
--
|
||||||
|
-- The job model is also the natural data source for Phase A items 6-7
|
||||||
|
-- (metrics tables) — graphiti_jobs records duration, status transitions,
|
||||||
|
-- and per-job summary that those tables will aggregate.
|
||||||
|
--
|
||||||
|
-- Idempotent: safe to re-run.
|
||||||
|
|
||||||
|
-- Job state for sidecar's async ingest queue.
|
||||||
|
-- One row per submitted bulk-or-single ingest. Sidecar reads queued jobs
|
||||||
|
-- on startup to resume after restart. Worker polls status until terminal.
|
||||||
|
CREATE TABLE IF NOT EXISTS graphiti_jobs (
|
||||||
|
job_id UUID PRIMARY KEY,
|
||||||
|
job_type TEXT NOT NULL CHECK (job_type IN ('bulk', 'single')),
|
||||||
|
payload JSONB NOT NULL, -- full submitted request body
|
||||||
|
status TEXT NOT NULL DEFAULT 'queued' -- 'queued'|'running'|'committed'|'failed'
|
||||||
|
CHECK (status IN ('queued', 'running', 'committed', 'failed')),
|
||||||
|
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
started_at TIMESTAMPTZ,
|
||||||
|
finished_at TIMESTAMPTZ,
|
||||||
|
error TEXT, -- non-null when status='failed'
|
||||||
|
summary JSONB, -- {nodes: N, edges: N, episodes: N}
|
||||||
|
submitted_by TEXT -- worker name for traceability
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Index supporting sidecar's "pick next queued job" query
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_queued
|
||||||
|
ON graphiti_jobs (enqueued_at)
|
||||||
|
WHERE status = 'queued';
|
||||||
|
|
||||||
|
-- Index supporting worker's "poll my job by id" query (PK already does this,
|
||||||
|
-- but explicit index aids ANALYZE behavior on small tables)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_status
|
||||||
|
ON graphiti_jobs (status);
|
||||||
|
|
||||||
|
-- Stage 3 queue gains a reference to the sidecar job processing the row.
|
||||||
|
-- When set, worker polls graphiti_jobs.status rather than blocking on HTTP.
|
||||||
|
-- NULL means: row not yet submitted, or pre-Pattern-1 row.
|
||||||
|
ALTER TABLE stage_3_queue
|
||||||
|
ADD COLUMN IF NOT EXISTS external_job_id UUID;
|
||||||
|
|
||||||
|
-- Index for "find rows that submitted but didn't complete" recovery scans
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_stage_3_queue_external_job
|
||||||
|
ON stage_3_queue (external_job_id)
|
||||||
|
WHERE external_job_id IS NOT NULL AND completed_at IS NULL AND failed_at IS NULL;
|
||||||
+419
-69
@@ -1,14 +1,44 @@
|
|||||||
"""
|
"""
|
||||||
Aaron AI — Graphiti Sidecar Service
|
Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model)
|
||||||
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
|
|
||||||
|
Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest
|
||||||
|
submission and completion are decoupled. Submitters POST to /episodes or
|
||||||
|
/episodes/bulk and receive a job_id; an in-process background worker
|
||||||
|
processes jobs serially against the graph; submitters poll GET /jobs/{id}
|
||||||
|
until terminal status.
|
||||||
|
|
||||||
|
Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk
|
||||||
|
ingest against the 4,222-entity graph commits successfully even when the
|
||||||
|
worker's HTTP read-timeout fires. The synchronous interface was producing
|
||||||
|
false-negative failures — work succeeded but the worker stopped listening.
|
||||||
|
Pattern 1 separates submission from completion observation so the worker
|
||||||
|
can't false-negative this way.
|
||||||
|
|
||||||
|
Architectural commitments:
|
||||||
|
- One in-flight job per sidecar (per graph). Concurrent jobs against the
|
||||||
|
same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk
|
||||||
|
(no transaction boundary, no internal coordination). Concurrent
|
||||||
|
multi-tenancy is "run multiple sidecars," not "make one sidecar
|
||||||
|
concurrency-safe across graphs."
|
||||||
|
- Postgres-backed job state. Survives sidecar restart. On startup the
|
||||||
|
sidecar resets any 'running' rows to 'queued' (their previous run died);
|
||||||
|
the background worker picks them up naturally.
|
||||||
|
- Both /episodes and /episodes/bulk are async-shaped for parity. graphiti-
|
||||||
|
core operations underneath (add_episode, add_episode_bulk) are unchanged.
|
||||||
|
- The bulk pathway is preserved — load-bearing for first-run corpus
|
||||||
|
migration. Single-episode is preserved — load-bearing for state-
|
||||||
|
superseding content per the Stage 2/3 routing rule.
|
||||||
|
|
||||||
Port 8001 (internal only). No OpenAI dependency.
|
Port 8001 (internal only). No OpenAI dependency.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os, logging, sys, traceback
|
import os, logging, sys, asyncio, traceback, uuid, json
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
import psycopg2.extras
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from fastapi import FastAPI, HTTPException
|
from fastapi import FastAPI, HTTPException
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -31,8 +61,18 @@ FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
|
|||||||
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
|
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
|
||||||
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
|
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
|
||||||
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
|
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
|
||||||
|
PG_DSN = os.getenv("PG_DSN")
|
||||||
|
SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1")
|
||||||
os.environ["EMBEDDING_DIM"] = "384"
|
os.environ["EMBEDDING_DIM"] = "384"
|
||||||
|
|
||||||
|
# Background worker configuration. Polls Postgres for queued jobs every
|
||||||
|
# WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design;
|
||||||
|
# no concurrency primitive beyond the serial loop. The sleep is brief
|
||||||
|
# enough to feel responsive but long enough to avoid burning CPU on an
|
||||||
|
# empty queue.
|
||||||
|
WORKER_POLL_INTERVAL = 2.0
|
||||||
|
|
||||||
|
|
||||||
def get_llm_client():
|
def get_llm_client():
|
||||||
from graphiti_core.llm_client.config import LLMConfig
|
from graphiti_core.llm_client.config import LLMConfig
|
||||||
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
|
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
|
||||||
@@ -50,16 +90,286 @@ def get_llm_client():
|
|||||||
return GroqClient(config)
|
return GroqClient(config)
|
||||||
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
|
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
|
||||||
|
|
||||||
graphiti_instance = None
|
|
||||||
|
|
||||||
async def get_graphiti():
|
graphiti_instance = None
|
||||||
if graphiti_instance is None:
|
worker_task = None
|
||||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
|
||||||
return graphiti_instance
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Postgres job-state helpers. Synchronous psycopg2 calls inside async
|
||||||
|
# functions: each call opens a fresh connection, runs one statement, closes.
|
||||||
|
# Acceptable here because traffic is low (single-digit jobs/min steady state)
|
||||||
|
# and the simplicity is worth more than connection pooling. If this ever
|
||||||
|
# becomes a bottleneck, swap to asyncpg or psycopg3 async.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _pg():
|
||||||
|
return psycopg2.connect(PG_DSN)
|
||||||
|
|
||||||
|
|
||||||
|
def _job_insert(job_id: str, job_type: str, payload: dict) -> None:
|
||||||
|
"""Write a new job row in 'queued' status."""
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by)
|
||||||
|
VALUES (%s, %s, %s::jsonb, 'queued', %s)
|
||||||
|
""",
|
||||||
|
(job_id, job_type, json.dumps(payload), SIDECAR_NAME),
|
||||||
|
)
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _job_get(job_id: str) -> dict | None:
|
||||||
|
"""Read a single job by id. Returns None if not found."""
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
SELECT job_id, job_type, status, enqueued_at, started_at, finished_at,
|
||||||
|
error, summary, submitted_by
|
||||||
|
FROM graphiti_jobs
|
||||||
|
WHERE job_id = %s
|
||||||
|
""",
|
||||||
|
(job_id,),
|
||||||
|
)
|
||||||
|
row = cur.fetchone()
|
||||||
|
pg.close()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
# Convert UUID, datetimes for JSON serialization
|
||||||
|
return {
|
||||||
|
"job_id": str(row["job_id"]),
|
||||||
|
"job_type": row["job_type"],
|
||||||
|
"status": row["status"],
|
||||||
|
"enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None,
|
||||||
|
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
|
||||||
|
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else None,
|
||||||
|
"error": row["error"],
|
||||||
|
"summary": row["summary"],
|
||||||
|
"submitted_by": row["submitted_by"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _job_claim_next() -> dict | None:
|
||||||
|
"""Atomically claim the oldest queued job for processing.
|
||||||
|
|
||||||
|
Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances
|
||||||
|
(future multi-tenant deployment) don't fight over the same row. For
|
||||||
|
single-sidecar deployments this is just a clean atomic transition.
|
||||||
|
|
||||||
|
Returns the full job row (including payload) or None if queue is empty.
|
||||||
|
"""
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
WITH next_job AS (
|
||||||
|
SELECT job_id
|
||||||
|
FROM graphiti_jobs
|
||||||
|
WHERE status = 'queued'
|
||||||
|
ORDER BY enqueued_at ASC
|
||||||
|
LIMIT 1
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
)
|
||||||
|
UPDATE graphiti_jobs g
|
||||||
|
SET status = 'running', started_at = NOW()
|
||||||
|
FROM next_job
|
||||||
|
WHERE g.job_id = next_job.job_id
|
||||||
|
RETURNING g.job_id, g.job_type, g.payload
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
row = cur.fetchone()
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"job_id": str(row["job_id"]),
|
||||||
|
"job_type": row["job_type"],
|
||||||
|
"payload": row["payload"], # already a dict via JSONB
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _job_complete(job_id: str, summary: dict) -> None:
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
UPDATE graphiti_jobs
|
||||||
|
SET status = 'committed', finished_at = NOW(), summary = %s::jsonb
|
||||||
|
WHERE job_id = %s
|
||||||
|
""",
|
||||||
|
(json.dumps(summary), job_id),
|
||||||
|
)
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _job_fail(job_id: str, error: str) -> None:
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
UPDATE graphiti_jobs
|
||||||
|
SET status = 'failed', finished_at = NOW(), error = %s
|
||||||
|
WHERE job_id = %s
|
||||||
|
""",
|
||||||
|
(error[:2000], job_id), # truncate to keep error column reasonable
|
||||||
|
)
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _startup_recovery() -> int:
|
||||||
|
"""Reset any 'running' jobs to 'queued' on startup.
|
||||||
|
|
||||||
|
Rationale: if the sidecar died while processing a job, that row is
|
||||||
|
stuck in 'running' with no process advancing it. The right behavior
|
||||||
|
on restart is to retry. graphiti-core's add_episode_bulk and
|
||||||
|
add_episode are idempotent against the graph (dedup handles duplicate
|
||||||
|
submission), so re-running a job is safe — at worst, a second run
|
||||||
|
incurs API spend on resolve calls that no-op against an already-
|
||||||
|
committed entity set.
|
||||||
|
|
||||||
|
Returns the count of recovered jobs.
|
||||||
|
"""
|
||||||
|
pg = _pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
UPDATE graphiti_jobs
|
||||||
|
SET status = 'queued', started_at = NULL
|
||||||
|
WHERE status = 'running'
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
count = cur.rowcount
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Background worker — single asyncio task running for the sidecar lifetime.
|
||||||
|
# Processes one job at a time. No concurrency. Restart recovery is handled
|
||||||
|
# by _startup_recovery() before this task starts.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def background_worker():
|
||||||
|
"""Serial job processor. Polls graphiti_jobs, processes one at a time."""
|
||||||
|
log.info("Background worker started")
|
||||||
|
|
||||||
|
from graphiti_core.nodes import EpisodeType
|
||||||
|
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
claimed = _job_claim_next()
|
||||||
|
if claimed is None:
|
||||||
|
await asyncio.sleep(WORKER_POLL_INTERVAL)
|
||||||
|
continue
|
||||||
|
|
||||||
|
job_id = claimed["job_id"]
|
||||||
|
job_type = claimed["job_type"]
|
||||||
|
payload = claimed["payload"]
|
||||||
|
|
||||||
|
log.info(f"Processing job {job_id} (type={job_type})")
|
||||||
|
start = datetime.now()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if job_type == "bulk":
|
||||||
|
summary = await _process_bulk_job(payload, EpisodeType, RawEpisode)
|
||||||
|
elif job_type == "single":
|
||||||
|
summary = await _process_single_job(payload, EpisodeType)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown job_type: {job_type}")
|
||||||
|
|
||||||
|
duration = (datetime.now() - start).total_seconds()
|
||||||
|
summary["duration_seconds"] = duration
|
||||||
|
_job_complete(job_id, summary)
|
||||||
|
log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}")
|
||||||
|
except Exception as e:
|
||||||
|
duration = (datetime.now() - start).total_seconds()
|
||||||
|
err = f"{type(e).__name__}: {e}"
|
||||||
|
log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}")
|
||||||
|
_job_fail(job_id, err)
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
log.info("Background worker cancelled")
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
# Defensive: don't let the worker loop die from an unexpected error.
|
||||||
|
# Log it, sleep briefly, continue.
|
||||||
|
log.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
|
||||||
|
await asyncio.sleep(5.0)
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict:
|
||||||
|
"""Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest."""
|
||||||
|
raw_episodes = []
|
||||||
|
for ep in payload["episodes"]:
|
||||||
|
ref_time = (
|
||||||
|
datetime.fromisoformat(ep["timestamp"])
|
||||||
|
if ep.get("timestamp") else datetime.now()
|
||||||
|
)
|
||||||
|
raw_episodes.append(RawEpisode(
|
||||||
|
name=ep["name"],
|
||||||
|
content=ep["content"],
|
||||||
|
source_description=ep.get("source_description", ""),
|
||||||
|
source=EpisodeType.text,
|
||||||
|
reference_time=ref_time,
|
||||||
|
))
|
||||||
|
|
||||||
|
kwargs = dict(
|
||||||
|
bulk_episodes=raw_episodes,
|
||||||
|
group_id=payload.get("group_id") or GROUP_ID,
|
||||||
|
saga=payload.get("saga"),
|
||||||
|
)
|
||||||
|
if payload.get("custom_extraction_instructions") is not None:
|
||||||
|
kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"]
|
||||||
|
|
||||||
|
result = await graphiti_instance.add_episode_bulk(**kwargs)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"type": "bulk",
|
||||||
|
"episodes": len(result.episodes) if result and result.episodes else len(raw_episodes),
|
||||||
|
"nodes": len(result.nodes) if result and result.nodes else 0,
|
||||||
|
"edges": len(result.edges) if result and result.edges else 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _process_single_job(payload: dict, EpisodeType) -> dict:
|
||||||
|
"""Run add_episode for a 'single' job. Payload mirrors EpisodeRequest."""
|
||||||
|
ref_time = (
|
||||||
|
datetime.fromisoformat(payload["timestamp"])
|
||||||
|
if payload.get("timestamp") else datetime.now()
|
||||||
|
)
|
||||||
|
kwargs = dict(
|
||||||
|
name=payload["name"],
|
||||||
|
episode_body=payload["content"],
|
||||||
|
source=EpisodeType.text,
|
||||||
|
reference_time=ref_time,
|
||||||
|
source_description=payload.get("source_description", ""),
|
||||||
|
group_id=payload.get("group_id") or GROUP_ID,
|
||||||
|
custom_extraction_instructions=payload.get("custom_extraction_instructions"),
|
||||||
|
)
|
||||||
|
if payload.get("saga") is not None:
|
||||||
|
kwargs["saga"] = payload["saga"]
|
||||||
|
|
||||||
|
await graphiti_instance.add_episode(**kwargs)
|
||||||
|
return {"type": "single", "episodes": 1}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Lifespan & app
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
global graphiti_instance
|
global graphiti_instance, worker_task
|
||||||
|
|
||||||
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
|
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
|
||||||
log.info("Loading embedding and reranker models...")
|
log.info("Loading embedding and reranker models...")
|
||||||
from st_embedder import SentenceTransformerEmbedder
|
from st_embedder import SentenceTransformerEmbedder
|
||||||
@@ -75,11 +385,51 @@ async def lifespan(app: FastAPI):
|
|||||||
max_coroutines=2,
|
max_coroutines=2,
|
||||||
)
|
)
|
||||||
await graphiti_instance.build_indices_and_constraints()
|
await graphiti_instance.build_indices_and_constraints()
|
||||||
|
|
||||||
|
# PATCHED 2026-05-02: bridge the per-driver SearchOperations to the
|
||||||
|
# search_interface attribute that search_utils.py dispatches on.
|
||||||
|
# graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops
|
||||||
|
# but never assigns it to driver.search_interface — naming mismatch
|
||||||
|
# between the two halves of the codebase. Without this, search_utils.py
|
||||||
|
# falls through to interpreted-Cypher cosine math (full-table scan) even
|
||||||
|
# when our patched FalkorSearchOperations exists. Setting search_interface
|
||||||
|
# activates the per-driver vector-index path.
|
||||||
|
if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None:
|
||||||
|
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
|
||||||
|
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
|
||||||
|
|
||||||
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
|
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
|
||||||
|
|
||||||
|
# Recover any jobs left 'running' from a previous sidecar instance.
|
||||||
|
# They become 'queued' again and the background worker picks them up.
|
||||||
|
recovered = _startup_recovery()
|
||||||
|
if recovered > 0:
|
||||||
|
log.info(f"Startup recovery: reset {recovered} running job(s) to queued")
|
||||||
|
|
||||||
|
# Start the background job worker.
|
||||||
|
worker_task = asyncio.create_task(background_worker())
|
||||||
|
log.info("Sidecar ready — accepting job submissions on :8001")
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
# Shutdown: cancel worker, close graphiti.
|
||||||
|
if worker_task is not None:
|
||||||
|
worker_task.cancel()
|
||||||
|
try:
|
||||||
|
await worker_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
await graphiti_instance.close()
|
await graphiti_instance.close()
|
||||||
|
|
||||||
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
|
|
||||||
|
app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Request models — preserved from v1.0 with no payload-shape changes. The
|
||||||
|
# only API change is the response shape: instead of blocking until
|
||||||
|
# graphiti-core returns, submission endpoints return a job_id immediately.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
class BulkEpisodeItem(BaseModel):
|
class BulkEpisodeItem(BaseModel):
|
||||||
name: str
|
name: str
|
||||||
@@ -92,11 +442,6 @@ class BulkEpisodeRequest(BaseModel):
|
|||||||
episodes: list[BulkEpisodeItem]
|
episodes: list[BulkEpisodeItem]
|
||||||
group_id: str | None = None
|
group_id: str | None = None
|
||||||
saga: str | None = None
|
saga: str | None = None
|
||||||
# Batch-level extraction guidance. graphiti-core inserts this into the
|
|
||||||
# entity-extraction and edge-extraction prompts only — NOT into dedup
|
|
||||||
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
|
|
||||||
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
|
|
||||||
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
|
|
||||||
custom_extraction_instructions: str | None = None
|
custom_extraction_instructions: str | None = None
|
||||||
|
|
||||||
|
|
||||||
@@ -109,72 +454,76 @@ class EpisodeRequest(BaseModel):
|
|||||||
custom_extraction_instructions: str | None = None
|
custom_extraction_instructions: str | None = None
|
||||||
saga: str | None = None
|
saga: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Endpoints
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health():
|
async def health():
|
||||||
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
|
return {
|
||||||
|
"ok": True,
|
||||||
|
"provider": LLM_PROVIDER,
|
||||||
|
"group": GROUP_ID,
|
||||||
|
"sidecar": SIDECAR_NAME,
|
||||||
|
"version": "2.0",
|
||||||
|
}
|
||||||
|
|
||||||
@app.post("/episodes")
|
|
||||||
async def add_episode(req: EpisodeRequest):
|
|
||||||
g = await get_graphiti()
|
|
||||||
from graphiti_core.nodes import EpisodeType
|
|
||||||
try:
|
|
||||||
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
|
|
||||||
kwargs = dict(
|
|
||||||
name=req.name,
|
|
||||||
episode_body=req.content,
|
|
||||||
source=EpisodeType.text,
|
|
||||||
reference_time=ref_time,
|
|
||||||
source_description=req.source_description,
|
|
||||||
group_id=req.group_id or GROUP_ID,
|
|
||||||
custom_extraction_instructions=req.custom_extraction_instructions,
|
|
||||||
)
|
|
||||||
# Saga is supported on graphiti-core add_episode but kept optional
|
|
||||||
# so older callers don't need to know about it.
|
|
||||||
if req.saga is not None:
|
|
||||||
kwargs["saga"] = req.saga
|
|
||||||
await g.add_episode(**kwargs)
|
|
||||||
return {"ok": True}
|
|
||||||
except Exception as e:
|
|
||||||
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
@app.post("/episodes/bulk")
|
@app.post("/episodes/bulk")
|
||||||
async def add_episodes_bulk(req: BulkEpisodeRequest):
|
async def submit_bulk(req: BulkEpisodeRequest):
|
||||||
g = await get_graphiti()
|
"""Submit a bulk ingest job. Returns job_id for polling.
|
||||||
from graphiti_core.nodes import EpisodeType
|
|
||||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
Job is processed serially by the sidecar's background worker; one
|
||||||
raw_episodes = []
|
bulk-or-single job at a time per graph. No HTTP read-timeout
|
||||||
for ep in req.episodes:
|
blocking. Submitter polls GET /jobs/{job_id} until terminal status.
|
||||||
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
|
"""
|
||||||
raw_episodes.append(RawEpisode(
|
if graphiti_instance is None:
|
||||||
name=ep.name,
|
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||||
content=ep.content,
|
|
||||||
source_description=ep.source_description,
|
job_id = str(uuid.uuid4())
|
||||||
source=EpisodeType.text,
|
payload = req.model_dump()
|
||||||
reference_time=ref_time,
|
|
||||||
))
|
|
||||||
try:
|
try:
|
||||||
kwargs = dict(
|
_job_insert(job_id, "bulk", payload)
|
||||||
bulk_episodes=raw_episodes,
|
|
||||||
group_id=req.group_id or GROUP_ID,
|
|
||||||
saga=req.saga or None,
|
|
||||||
)
|
|
||||||
# Pass-through only when set, so callers that don't supply
|
|
||||||
# instructions get graphiti-core's default behavior unchanged.
|
|
||||||
if req.custom_extraction_instructions is not None:
|
|
||||||
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
|
|
||||||
result = await g.add_episode_bulk(**kwargs)
|
|
||||||
return {"ok": True, "count": len(raw_episodes)}
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
|
log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}")
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||||
|
|
||||||
|
return {"job_id": job_id, "status": "queued"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/episodes")
|
||||||
|
async def submit_single(req: EpisodeRequest):
|
||||||
|
"""Submit a single-episode ingest job. Returns job_id for polling."""
|
||||||
|
if graphiti_instance is None:
|
||||||
|
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||||
|
|
||||||
|
job_id = str(uuid.uuid4())
|
||||||
|
payload = req.model_dump()
|
||||||
|
try:
|
||||||
|
_job_insert(job_id, "single", payload)
|
||||||
|
except Exception as e:
|
||||||
|
log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}")
|
||||||
|
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||||
|
|
||||||
|
return {"job_id": job_id, "status": "queued"}
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/jobs/{job_id}")
|
||||||
|
async def get_job(job_id: str):
|
||||||
|
"""Poll a job's status. Returns 404 if job not found."""
|
||||||
|
job = _job_get(job_id)
|
||||||
|
if job is None:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
|
||||||
|
return job
|
||||||
|
|
||||||
|
|
||||||
@app.get("/search")
|
@app.get("/search")
|
||||||
async def search(query: str, limit: int = 8, group_id: str | None = None):
|
async def search(query: str, limit: int = 8, group_id: str | None = None):
|
||||||
g = await get_graphiti()
|
if graphiti_instance is None:
|
||||||
|
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||||
try:
|
try:
|
||||||
results = await g.search(
|
results = await graphiti_instance.search(
|
||||||
query=query,
|
query=query,
|
||||||
num_results=limit,
|
num_results=limit,
|
||||||
group_ids=[group_id or GROUP_ID],
|
group_ids=[group_id or GROUP_ID],
|
||||||
@@ -195,6 +544,7 @@ async def search(query: str, limit: int = 8, group_id: str | None = None):
|
|||||||
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
|
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import uvicorn
|
import uvicorn
|
||||||
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
|
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
|
||||||
|
|||||||
@@ -220,8 +220,15 @@ def enqueue_stage3(pg, source, full_text, orientation, metadata,
|
|||||||
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
|
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
|
||||||
state_type_rationale = EXCLUDED.state_type_rationale,
|
state_type_rationale = EXCLUDED.state_type_rationale,
|
||||||
enqueued_at = NOW(),
|
enqueued_at = NOW(),
|
||||||
|
-- Reset all run-state fields on re-enqueue. Without this,
|
||||||
|
-- stale started_at from a prior attempt makes the row
|
||||||
|
-- invisible to the Stage 3 worker's claim filter (which
|
||||||
|
-- typically uses started_at IS NULL).
|
||||||
|
started_at = NULL,
|
||||||
completed_at = NULL,
|
completed_at = NULL,
|
||||||
failed_at = NULL,
|
failed_at = NULL,
|
||||||
|
failure_reason = NULL,
|
||||||
|
external_job_id = NULL,
|
||||||
attempts = 0
|
attempts = 0
|
||||||
""", (source, full_text, orientation, json.dumps(metadata),
|
""", (source, full_text, orientation, json.dumps(metadata),
|
||||||
state_type, state_type_confidence, supersedes_prior_state,
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
|||||||
+118
-6
@@ -1,6 +1,7 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
|
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
|
||||||
|
+ Encoder Instructions (v1.0)
|
||||||
|
|
||||||
Polls stage_3_queue, routes each row to one of two ingest pathways based on
|
Polls stage_3_queue, routes each row to one of two ingest pathways based on
|
||||||
state-type classification produced by Stage 2:
|
state-type classification produced by Stage 2:
|
||||||
@@ -12,6 +13,18 @@ state-type classification produced by Stage 2:
|
|||||||
confidence in {medium, high}. Per-chunk POST to /episodes with shared
|
confidence in {medium, high}. Per-chunk POST to /episodes with shared
|
||||||
saga tag, full edge invalidation, per-chunk timeout/retry independence.
|
saga tag, full edge invalidation, per-chunk timeout/retry independence.
|
||||||
|
|
||||||
|
Both pathways pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar via
|
||||||
|
custom_extraction_instructions, which graphiti-core inserts into entity
|
||||||
|
and edge extraction prompts (NOT dedup prompts — that's intentional under
|
||||||
|
the encoder-stays-naive commitment).
|
||||||
|
|
||||||
|
Architectural posture: the encoder is content-naïve. It does not draw on
|
||||||
|
prior knowledge of the user, the substrate, or the cycle's accumulated
|
||||||
|
work. Schema and personality live in the cycle's consolidated substrate,
|
||||||
|
where the dream phase shapes them. The encoder produces source-grounded
|
||||||
|
ground truth for the cycle to work from. See EXTRACTION_INSTRUCTIONS_V1
|
||||||
|
below for the extraction guidance text.
|
||||||
|
|
||||||
Routing rationale: the single-episode pathway is the correct API per
|
Routing rationale: the single-episode pathway is the correct API per
|
||||||
graphiti-core's docs for content that supersedes prior facts (it does
|
graphiti-core's docs for content that supersedes prior facts (it does
|
||||||
edge invalidation that bulk skips). It costs more per chunk because of
|
edge invalidation that bulk skips). It costs more per chunk because of
|
||||||
@@ -67,7 +80,7 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
|
|||||||
RETRY_ATTEMPTS = 2
|
RETRY_ATTEMPTS = 2
|
||||||
POLL_INTERVAL = 5
|
POLL_INTERVAL = 5
|
||||||
INGEST_TIMEOUT = 600
|
INGEST_TIMEOUT = 600
|
||||||
WORKER_VERSION = "2.3"
|
WORKER_VERSION = "2.4"
|
||||||
|
|
||||||
# Match Stage 1 chunking parameters
|
# Match Stage 1 chunking parameters
|
||||||
CHUNK_SIZE_WORDS = 500
|
CHUNK_SIZE_WORDS = 500
|
||||||
@@ -84,6 +97,87 @@ MAX_CHUNKS_PER_SAGA = 10
|
|||||||
# the expensive pathway.
|
# the expensive pathway.
|
||||||
HIGH_TRUST_CONFIDENCE = ("medium", "high")
|
HIGH_TRUST_CONFIDENCE = ("medium", "high")
|
||||||
|
|
||||||
|
# Encoder extraction guidance v1.0 — see module docstring for posture rationale.
|
||||||
|
# Passed to graphiti-core via custom_extraction_instructions on both ingest
|
||||||
|
# pathways. Inserted into entity-extraction and edge-extraction prompts only;
|
||||||
|
# does NOT enter dedup prompts. Encoder-stays-naïve commitment is structural,
|
||||||
|
# not versioned: this text gets refined over time but the encoder does not
|
||||||
|
# acquire substrate context as the cycle matures.
|
||||||
|
EXTRACTION_INSTRUCTIONS_V1 = """\
|
||||||
|
EXTRACTION GUIDANCE — BirdAI cascade
|
||||||
|
|
||||||
|
The encoder's job is faithful capture from this chunk's text. It does
|
||||||
|
not draw on prior knowledge of the user, the substrate, or the cycle's
|
||||||
|
accumulated work. Schema, personality, and inferred context live in
|
||||||
|
the cycle's consolidated substrate, where the dream phase shapes them
|
||||||
|
through prediction-error replay and speculation. The encoder stays
|
||||||
|
content-naïve so the cycle has source-grounded ground truth to work
|
||||||
|
from.
|
||||||
|
|
||||||
|
The orientation produced by an upstream pass describes content shape,
|
||||||
|
not content interpretation. Use it as forward-facing guidance for what
|
||||||
|
to attend to in this document. Do not let it bound or limit what you
|
||||||
|
extract.
|
||||||
|
|
||||||
|
PREDICATE NAMING
|
||||||
|
|
||||||
|
Produce semantic predicates that describe the actual relationship the
|
||||||
|
text states. Use verbs or verb phrases — "wrote", "advised", "founded",
|
||||||
|
"works at", "led to", "contradicts", "is autobiographical to" — not
|
||||||
|
generic placeholders. Reserve generic forms (for example, "relates to"
|
||||||
|
or "mentions") for cases where the text genuinely does not specify a
|
||||||
|
more particular relationship. The verb is the load-bearing part of
|
||||||
|
the fact; preserving it is what makes the relationship queryable later.
|
||||||
|
|
||||||
|
EXTRACTION POSTURE
|
||||||
|
|
||||||
|
Extract from this chunk's text as if each entity is encountered fresh.
|
||||||
|
Do not try to reconcile entities you find here with entities that
|
||||||
|
might already exist elsewhere in the graph. Redundant entity instances
|
||||||
|
are acceptable. Cross-document entity resolution is downstream cycle
|
||||||
|
work, not extraction work.
|
||||||
|
|
||||||
|
When the same entity appears multiple times within this chunk with
|
||||||
|
slightly different spellings — a common artifact of voice transcription —
|
||||||
|
prefer the more frequent or more canonical-looking form. Do not invent
|
||||||
|
canonical forms; choose among the variants the text actually contains.
|
||||||
|
|
||||||
|
EXTRACT FROM THE SOURCE
|
||||||
|
|
||||||
|
Extract relationships the text states or strongly implies through
|
||||||
|
direct linguistic markers ("X led to Y", "X works for Y", "X met Y at
|
||||||
|
Z"). Do not extend extraction to relationships the text neither states
|
||||||
|
nor directly implies. Inferred relationships are produced by the
|
||||||
|
cycle's dream phase as speculative edges with explicit low-confidence
|
||||||
|
tagging, where they can be evaluated and either ratified or pruned by
|
||||||
|
subsequent cycle work. Encoding-time inference, mixed in with source-
|
||||||
|
grounded extraction, would lose the speculation/source distinction the
|
||||||
|
cycle's consolidation work relies on.
|
||||||
|
|
||||||
|
DO NOT PRE-EMPT CYCLE WORK
|
||||||
|
|
||||||
|
Do not omit relationships because they seem redundant with prior
|
||||||
|
extractions or with the existing graph. Cross-document entity
|
||||||
|
resolution and edge consolidation are downstream cycle operations;
|
||||||
|
redundant extraction at this stage is intentional. Extracting the
|
||||||
|
same fact from multiple sources gives the cycle's consolidation work
|
||||||
|
the recurrence signal it relies on.
|
||||||
|
|
||||||
|
EXTRACTION DEPTH
|
||||||
|
|
||||||
|
Use the orientation's frame_relationships and extraction_orientation
|
||||||
|
fields to inform what to attend to. If the orientation describes
|
||||||
|
cross-domain relational content, look for relationships that bridge
|
||||||
|
those domains explicitly, with named predicates for the bridging.
|
||||||
|
If the orientation describes single-domain technical content, look
|
||||||
|
for the structural relationships internal to that domain.
|
||||||
|
|
||||||
|
Extract every entity and every relationship the text states. Do not
|
||||||
|
summarize, do not filter, do not omit content because it seems
|
||||||
|
incidental. The orientation tells you what to look for; the source
|
||||||
|
text tells you what is there.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
def get_pg():
|
def get_pg():
|
||||||
return psycopg2.connect(PG_DSN)
|
return psycopg2.connect(PG_DSN)
|
||||||
@@ -193,6 +287,8 @@ def ingest_bulk(source, full_text, orientation):
|
|||||||
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
||||||
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
|
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
|
||||||
tag so Graphiti links them as one document unit
|
tag so Graphiti links them as one document unit
|
||||||
|
|
||||||
|
All three sub-paths pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||||
"""
|
"""
|
||||||
char_length = len(full_text)
|
char_length = len(full_text)
|
||||||
|
|
||||||
@@ -204,7 +300,11 @@ def ingest_bulk(source, full_text, orientation):
|
|||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
}]
|
}]
|
||||||
log.info(f" [bulk] Single episode ({char_length} chars)")
|
log.info(f" [bulk] Single episode ({char_length} chars)")
|
||||||
return post_bulk({"episodes": episodes, "group_id": "aaron"})
|
return post_bulk({
|
||||||
|
"episodes": episodes,
|
||||||
|
"group_id": "aaron",
|
||||||
|
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||||
|
})
|
||||||
|
|
||||||
chunks = chunk_text(full_text)
|
chunks = chunk_text(full_text)
|
||||||
total_chunks = len(chunks)
|
total_chunks = len(chunks)
|
||||||
@@ -220,9 +320,12 @@ def ingest_bulk(source, full_text, orientation):
|
|||||||
for i, chunk in enumerate(chunks)
|
for i, chunk in enumerate(chunks)
|
||||||
]
|
]
|
||||||
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
|
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
|
||||||
return post_bulk(
|
return post_bulk({
|
||||||
{"episodes": episodes, "group_id": "aaron", "saga": source}
|
"episodes": episodes,
|
||||||
)
|
"group_id": "aaron",
|
||||||
|
"saga": source,
|
||||||
|
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||||
|
})
|
||||||
|
|
||||||
# Large document: split into batches sharing the same saga tag
|
# Large document: split into batches sharing the same saga tag
|
||||||
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
||||||
@@ -247,7 +350,12 @@ def ingest_bulk(source, full_text, orientation):
|
|||||||
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
|
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
|
||||||
log.info(f" {batch_label} starting")
|
log.info(f" {batch_label} starting")
|
||||||
last_result = post_bulk(
|
last_result = post_bulk(
|
||||||
{"episodes": episodes, "group_id": "aaron", "saga": source},
|
{
|
||||||
|
"episodes": episodes,
|
||||||
|
"group_id": "aaron",
|
||||||
|
"saga": source,
|
||||||
|
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||||
|
},
|
||||||
batch_label=batch_label,
|
batch_label=batch_label,
|
||||||
)
|
)
|
||||||
log.info(f" {batch_label} committed")
|
log.info(f" {batch_label} committed")
|
||||||
@@ -261,6 +369,8 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
|||||||
with shared saga tag. Each call independent: own timeout, own retry
|
with shared saga tag. Each call independent: own timeout, own retry
|
||||||
envelope, own failure semantics.
|
envelope, own failure semantics.
|
||||||
|
|
||||||
|
Each chunk POST passes EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||||
|
|
||||||
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
|
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
|
||||||
stay committed (graphiti has already accepted them) and the function
|
stay committed (graphiti has already accepted them) and the function
|
||||||
raises with detail about which chunk failed and how many succeeded.
|
raises with detail about which chunk failed and how many succeeded.
|
||||||
@@ -281,6 +391,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
|||||||
"source_description": orientation,
|
"source_description": orientation,
|
||||||
"group_id": "aaron",
|
"group_id": "aaron",
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||||
}
|
}
|
||||||
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
|
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
|
||||||
return post_episode(payload, episode_label="single-ep")
|
return post_episode(payload, episode_label="single-ep")
|
||||||
@@ -302,6 +413,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
|
|||||||
"group_id": "aaron",
|
"group_id": "aaron",
|
||||||
"saga": source,
|
"saga": source,
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
|
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
|
||||||
|
|||||||
Reference in New Issue
Block a user