Compare commits
9 Commits
main
...
7f07972109
| Author | SHA1 | Date | |
|---|---|---|---|
| 7f07972109 | |||
| f645b74b1c | |||
| c0e6159b5e | |||
| d7b2a850c4 | |||
| a0bf280075 | |||
| 30beeb3a26 | |||
| e7de7fb64b | |||
| 70e87e3ab5 | |||
| 8b0a163670 |
@@ -0,0 +1,4 @@
|
||||
# Local backups created by apply.sh — environment state, not source.
|
||||
# Keeping these out of version control prevents repo bloat and avoids
|
||||
# checking in graphiti-core's Apache-2.0 source under our repo's tree.
|
||||
backups/
|
||||
@@ -0,0 +1,58 @@
|
||||
# graphiti-core Patches — FalkorDB Vector Index Support
|
||||
|
||||
Vendored patches against graphiti-core 0.29.0 adding native FalkorDB
|
||||
vector index support. Three files modified, all under
|
||||
`graphiti_core/driver/falkordb/` and `graphiti_core/graph_queries.py`.
|
||||
No changes to Neo4j or Kuzu code paths.
|
||||
|
||||
## Why this exists
|
||||
|
||||
graphiti-core's FalkorDB driver uses interpreted Cypher cosine math
|
||||
(`vec.cosineDistance(...)`) for similarity search. Each query becomes a
|
||||
full table scan over Entity/RELATES_TO/Community nodes. At ~4,000+
|
||||
entities, single-episode ingest's resolve-against-existing-graph step
|
||||
takes 8+ minutes and bulk ingest hangs FalkorDB. FalkorDB itself
|
||||
supports `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||
procedures backed by HNSW indexes; graphiti-core's driver doesn't use
|
||||
them.
|
||||
|
||||
These patches:
|
||||
|
||||
1. Add `get_vector_indices()` to `graph_queries.py` returning CREATE
|
||||
VECTOR INDEX statements for FalkorDB on Entity.name_embedding,
|
||||
RELATES_TO.fact_embedding, and Community.name_embedding.
|
||||
2. Extend `falkordb_driver.py:build_indices_and_constraints()` to create
|
||||
the vector indexes alongside range and fulltext indexes.
|
||||
3. Rewrite the three vector-similarity call sites in
|
||||
`falkordb/operations/search_ops.py` to use
|
||||
`db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
|
||||
instead of full-scan cosine math. Over-fetches by a configurable
|
||||
multiplier to handle filter rejections.
|
||||
|
||||
## Files
|
||||
|
||||
| Patched file | Source |
|
||||
|---|---|
|
||||
| `graphiti_core/graph_queries.py` | Adds `get_vector_indices()` |
|
||||
| `graphiti_core/driver/falkordb/falkordb_driver.py` | Extends `build_indices_and_constraints` |
|
||||
| `graphiti_core/driver/falkordb/operations/search_ops.py` | Three query rewrites |
|
||||
|
||||
## How to apply
|
||||
|
||||
`./apply.sh` — backs up the originals into `./backups/<timestamp>/`
|
||||
and copies the patched files over.
|
||||
|
||||
## How to revert
|
||||
|
||||
Move the timestamped backup back over the venv:
|
||||
|
||||
cp backups/<ts>/graph_queries.py /home/aaron/aaronai/venv/lib/python3.12/site-packages/graphiti_core/graph_queries.py
|
||||
# ...etc
|
||||
|
||||
## Upstream candidate
|
||||
|
||||
Documented gap (issue #1263 references it indirectly via vector store
|
||||
overlay RFC). Maintainers' attention is on Milvus/external vector DB
|
||||
overlay; this patch is the FalkorDB-native alternative for users who
|
||||
don't want a separate vector DB. Consider PR after empirical validation
|
||||
in production.
|
||||
Executable
+77
@@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env bash
|
||||
# apply.sh — Apply the BirdAI vendored graphiti-core patches.
|
||||
#
|
||||
# Backs up the original venv files into ./backups/<timestamp>/ before
|
||||
# overwriting. The backup directory layout mirrors the venv layout so a
|
||||
# revert is just a tree copy back.
|
||||
#
|
||||
# Usage: ./apply.sh
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
PATCH_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
VENV_BASE="/home/aaron/aaronai/venv/lib/python3.12/site-packages"
|
||||
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
|
||||
BACKUP_DIR="$PATCH_DIR/backups/$TIMESTAMP"
|
||||
|
||||
# Files to patch — paths relative to graphiti_core/.
|
||||
FILES=(
|
||||
"graph_queries.py"
|
||||
"driver/falkordb_driver.py"
|
||||
"driver/falkordb/operations/search_ops.py"
|
||||
)
|
||||
|
||||
echo "graphiti-core vendored patch apply — BirdAI"
|
||||
echo "Patch directory: $PATCH_DIR"
|
||||
echo "Venv target: $VENV_BASE/graphiti_core/"
|
||||
echo "Backup to: $BACKUP_DIR"
|
||||
echo
|
||||
|
||||
# Pre-flight: confirm all source patch files exist.
|
||||
for rel in "${FILES[@]}"; do
|
||||
if [ ! -f "$PATCH_DIR/graphiti_core/$rel" ]; then
|
||||
echo "ERROR: missing patch file: $PATCH_DIR/graphiti_core/$rel" >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Pre-flight: confirm all target venv files exist.
|
||||
for rel in "${FILES[@]}"; do
|
||||
if [ ! -f "$VENV_BASE/graphiti_core/$rel" ]; then
|
||||
echo "ERROR: missing venv file: $VENV_BASE/graphiti_core/$rel" >&2
|
||||
echo " graphiti-core may not be installed, or version differs from 0.29.0." >&2
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
# Backup originals.
|
||||
echo "[1/3] Backing up originals..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
backup_path="$BACKUP_DIR/graphiti_core/$rel"
|
||||
mkdir -p "$(dirname "$backup_path")"
|
||||
cp "$VENV_BASE/graphiti_core/$rel" "$backup_path"
|
||||
echo " backed up: $rel"
|
||||
done
|
||||
echo
|
||||
|
||||
# Apply patches by copying.
|
||||
echo "[2/3] Applying patches..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
cp "$PATCH_DIR/graphiti_core/$rel" "$VENV_BASE/graphiti_core/$rel"
|
||||
echo " patched: $rel"
|
||||
done
|
||||
echo
|
||||
|
||||
# Sanity check: confirm patched files have the marker.
|
||||
echo "[3/3] Verifying patched files..."
|
||||
for rel in "${FILES[@]}"; do
|
||||
if grep -q "PATCHED 2026-05-02" "$VENV_BASE/graphiti_core/$rel"; then
|
||||
echo " OK: $rel contains patch marker"
|
||||
else
|
||||
echo " WARNING: $rel missing patch marker (may be expected for graph_queries.py — its docstring uses the marker only in the module header)"
|
||||
fi
|
||||
done
|
||||
echo
|
||||
echo "Done. Backup: $BACKUP_DIR"
|
||||
echo "Restart the sidecar to pick up changes:"
|
||||
echo " sudo systemctl restart aaronai-graphiti.service"
|
||||
@@ -0,0 +1,904 @@
|
||||
"""
|
||||
Copyright 2024, Zep Software, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from graphiti_core.driver.driver import GraphProvider
|
||||
from graphiti_core.driver.falkordb import STOPWORDS
|
||||
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||
from graphiti_core.driver.query_executor import QueryExecutor
|
||||
from graphiti_core.driver.record_parsers import (
|
||||
community_node_from_record,
|
||||
entity_edge_from_record,
|
||||
entity_node_from_record,
|
||||
episodic_node_from_record,
|
||||
)
|
||||
from graphiti_core.edges import EntityEdge
|
||||
from graphiti_core.graph_queries import (
|
||||
get_nodes_query,
|
||||
get_relationships_query,
|
||||
get_vector_cosine_func_query,
|
||||
)
|
||||
from graphiti_core.models.edges.edge_db_queries import get_entity_edge_return_query
|
||||
from graphiti_core.models.nodes.node_db_queries import (
|
||||
COMMUNITY_NODE_RETURN,
|
||||
EPISODIC_NODE_RETURN,
|
||||
get_entity_node_return_query,
|
||||
)
|
||||
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
|
||||
from graphiti_core.search.search_filters import (
|
||||
SearchFilters,
|
||||
edge_search_filter_query_constructor,
|
||||
node_search_filter_query_constructor,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_QUERY_LENGTH = 128
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Vector index dispatcher (PATCHED 2026-05-02, BirdAI vendored patch).
|
||||
#
|
||||
# graphiti-core's FalkorDB driver historically composed similarity queries
|
||||
# using `vec.cosineDistance(...)` in interpreted Cypher, which produces a
|
||||
# full-table scan for every search. FalkorDB supports native vector indexes
|
||||
# via `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`;
|
||||
# this dispatcher uses them when present and falls back to the cosine math
|
||||
# otherwise.
|
||||
#
|
||||
# Index existence is checked once per (label, attribute, entity_type) and
|
||||
# cached at module scope. The cache should be invalidated whenever
|
||||
# `build_indices_and_constraints` runs (since indexes may have been created
|
||||
# or dropped). FalkorDriver.build_indices_and_constraints is patched to
|
||||
# call `_invalidate_falkordb_vector_index_cache()` after building.
|
||||
#
|
||||
# Over-fetch factor (VECTOR_INDEX_CANDIDATE_MULTIPLIER from graph_queries)
|
||||
# preserves recall when WHERE filters reject some of the top-k candidates.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
from graphiti_core.graph_queries import (
|
||||
VECTOR_INDEX_CANDIDATE_MULTIPLIER,
|
||||
get_vector_cosine_func_query,
|
||||
)
|
||||
|
||||
# Cache: key = (label, attribute, entity_type), value = bool
|
||||
# entity_type is 'NODE' or 'RELATIONSHIP'.
|
||||
_FALKORDB_VECTOR_INDEX_CACHE: dict[tuple[str, str, str], bool] = {}
|
||||
|
||||
|
||||
def _invalidate_falkordb_vector_index_cache() -> None:
|
||||
"""Clear the vector-index existence cache. Call after build_indices_and_constraints."""
|
||||
_FALKORDB_VECTOR_INDEX_CACHE.clear()
|
||||
|
||||
|
||||
async def _falkordb_vector_index_exists(
|
||||
executor: QueryExecutor,
|
||||
label: str,
|
||||
attribute: str,
|
||||
entity_type: str,
|
||||
) -> bool:
|
||||
"""Check whether a FalkorDB vector index exists for the given target.
|
||||
|
||||
entity_type is 'NODE' for node-label indexes, 'RELATIONSHIP' for edge-type indexes.
|
||||
Result is cached at module scope; call _invalidate_falkordb_vector_index_cache()
|
||||
after building or dropping indexes.
|
||||
"""
|
||||
key = (label, attribute, entity_type)
|
||||
if key in _FALKORDB_VECTOR_INDEX_CACHE:
|
||||
return _FALKORDB_VECTOR_INDEX_CACHE[key]
|
||||
|
||||
try:
|
||||
records, _, _ = await executor.execute_query(
|
||||
"CALL db.indexes() YIELD label, properties, types, entitytype "
|
||||
"RETURN label, properties, types, entitytype"
|
||||
)
|
||||
except Exception as e:
|
||||
# If we cannot enumerate indexes, fall back to "no index" rather than
|
||||
# propagating the error. The fallback cosine-math path is correct,
|
||||
# just slower.
|
||||
logger.warning(f"FalkorDB vector index probe failed; assuming none exist: {e}")
|
||||
_FALKORDB_VECTOR_INDEX_CACHE[key] = False
|
||||
return False
|
||||
|
||||
found = False
|
||||
for r in records:
|
||||
# Records come back as dict-like rows keyed by column name (not
|
||||
# tuples). Access by string keys matching the YIELD clause above.
|
||||
rec_label = r.get('label') if hasattr(r, 'get') else r['label']
|
||||
rec_props = r.get('properties') if hasattr(r, 'get') else r['properties']
|
||||
rec_types = r.get('types') if hasattr(r, 'get') else r['types']
|
||||
rec_entitytype = r.get('entitytype') if hasattr(r, 'get') else r['entitytype']
|
||||
if rec_props is None:
|
||||
rec_props = []
|
||||
if rec_types is None:
|
||||
rec_types = {}
|
||||
|
||||
if rec_label != label:
|
||||
continue
|
||||
if rec_entitytype is not None and rec_entitytype != entity_type:
|
||||
continue
|
||||
if attribute not in rec_props:
|
||||
continue
|
||||
|
||||
# rec_types is a dict like {attribute: ['VECTOR', ...], ...} or sometimes
|
||||
# a flat list — handle both shapes.
|
||||
if isinstance(rec_types, dict):
|
||||
attr_types = rec_types.get(attribute, [])
|
||||
else:
|
||||
attr_types = rec_types
|
||||
if 'VECTOR' in attr_types:
|
||||
found = True
|
||||
break
|
||||
|
||||
_FALKORDB_VECTOR_INDEX_CACHE[key] = found
|
||||
return found
|
||||
|
||||
|
||||
def _falkordb_vector_node_search_cypher(
|
||||
label: str,
|
||||
embedding_attr: str,
|
||||
search_vector_param: str,
|
||||
use_index: bool,
|
||||
) -> tuple[str, str]:
|
||||
"""Build the cypher prefix and node-binding for a node-vector search.
|
||||
|
||||
Returns (prefix, node_var) where:
|
||||
- prefix is the Cypher fragment that binds the node variable and a
|
||||
`score` variable. With index, it's a CALL ... YIELD; without, it's
|
||||
a MATCH plus WITH cosine math.
|
||||
- node_var is the variable name the caller's downstream Cypher should
|
||||
reference (always 'n' here for parity with the existing code).
|
||||
|
||||
The caller appends WHERE filters and RETURN/ORDER BY/LIMIT as usual.
|
||||
The over-fetch parameter `$candidate_k` must be passed by the caller
|
||||
when use_index is True.
|
||||
"""
|
||||
if use_index:
|
||||
return (
|
||||
f"CALL db.idx.vector.queryNodes("
|
||||
f"'{label}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||
f") YIELD node, score "
|
||||
f"WITH node AS n, score "
|
||||
), "n"
|
||||
# Fallback: original cosine math path
|
||||
cosine = get_vector_cosine_func_query(
|
||||
f"n.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||
)
|
||||
return (
|
||||
f"MATCH (n:{label}) "
|
||||
f"WITH n, {cosine} AS score "
|
||||
), "n"
|
||||
|
||||
|
||||
def _falkordb_vector_edge_search_cypher(
|
||||
relationship_type: str,
|
||||
embedding_attr: str,
|
||||
search_vector_param: str,
|
||||
use_index: bool,
|
||||
) -> tuple[str, str]:
|
||||
"""Build the cypher prefix and edge-binding for an edge-vector search.
|
||||
|
||||
Returns (prefix, edge_var). With the index, the procedure binds the
|
||||
relationship variable; we then MATCH source and target via the existing
|
||||
edge to recover (n)-[e]->(m). Without the index, it's the original
|
||||
MATCH-and-cosine path.
|
||||
|
||||
Variable name is 'e' for parity with existing code; source/target are
|
||||
'n' and 'm' respectively, also for parity.
|
||||
"""
|
||||
if use_index:
|
||||
return (
|
||||
f"CALL db.idx.vector.queryRelationships("
|
||||
f"'{relationship_type}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
|
||||
f") YIELD relationship, score "
|
||||
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||
f"WHERE e = relationship "
|
||||
f"WITH DISTINCT e, n, m, score "
|
||||
), "e"
|
||||
# Fallback
|
||||
cosine = get_vector_cosine_func_query(
|
||||
f"e.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
|
||||
)
|
||||
return (
|
||||
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
|
||||
f"WITH DISTINCT e, n, m, {cosine} AS score "
|
||||
), "e"
|
||||
|
||||
|
||||
|
||||
# FalkorDB separator characters that break text into tokens
|
||||
_SEPARATOR_MAP = str.maketrans(
|
||||
{
|
||||
',': ' ',
|
||||
'.': ' ',
|
||||
'<': ' ',
|
||||
'>': ' ',
|
||||
'{': ' ',
|
||||
'}': ' ',
|
||||
'[': ' ',
|
||||
']': ' ',
|
||||
'"': ' ',
|
||||
"'": ' ',
|
||||
':': ' ',
|
||||
';': ' ',
|
||||
'!': ' ',
|
||||
'@': ' ',
|
||||
'#': ' ',
|
||||
'$': ' ',
|
||||
'%': ' ',
|
||||
'^': ' ',
|
||||
'&': ' ',
|
||||
'*': ' ',
|
||||
'(': ' ',
|
||||
')': ' ',
|
||||
'-': ' ',
|
||||
'+': ' ',
|
||||
'=': ' ',
|
||||
'~': ' ',
|
||||
'?': ' ',
|
||||
'|': ' ',
|
||||
'/': ' ',
|
||||
'\\': ' ',
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _sanitize(query: str) -> str:
|
||||
"""Replace FalkorDB special characters with whitespace."""
|
||||
sanitized = query.translate(_SEPARATOR_MAP)
|
||||
return ' '.join(sanitized.split())
|
||||
|
||||
|
||||
def _build_falkor_fulltext_query(
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
max_query_length: int = MAX_QUERY_LENGTH,
|
||||
) -> str:
|
||||
"""Build a fulltext query string for FalkorDB using RedisSearch syntax."""
|
||||
if group_ids is None or len(group_ids) == 0:
|
||||
group_filter = ''
|
||||
else:
|
||||
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||
group_values = '|'.join(escaped_group_ids)
|
||||
group_filter = f'(@group_id:{group_values})'
|
||||
|
||||
sanitized_query = _sanitize(query)
|
||||
|
||||
# Remove stopwords and empty tokens
|
||||
query_words = sanitized_query.split()
|
||||
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||
sanitized_query = ' | '.join(filtered_words)
|
||||
|
||||
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||
return ''
|
||||
|
||||
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||
return full_query
|
||||
|
||||
|
||||
class FalkorSearchOperations(SearchOperations):
|
||||
# --- Node search ---
|
||||
|
||||
async def node_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'node_name_and_summary', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ 'YIELD node AS n, score'
|
||||
+ filter_query
|
||||
+ """
|
||||
WITH n, score
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
query=fuzzy_query,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
async def node_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[EntityNode]:
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index when available; fall back to interpreted-Cypher cosine math
|
||||
# otherwise. The filter clause's position changes between paths
|
||||
# (after MATCH for fallback, after YIELD for index path), but the
|
||||
# filter expressions themselves are identical because they reference
|
||||
# the bound variable `n` either way.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'Entity', 'name_embedding', 'NODE'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||
'Entity', 'name_embedding', '$search_vector', use_index
|
||||
)
|
||||
where_clauses = []
|
||||
if filter_query:
|
||||
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**filter_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
async def node_bfs_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
origin_uuids: list[str],
|
||||
search_filter: SearchFilters,
|
||||
max_depth: int,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityNode]:
|
||||
if not origin_uuids or max_depth < 1:
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('n.group_id IN $group_ids')
|
||||
filter_queries.append('origin.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' AND ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
f"""
|
||||
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||
MATCH (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(n:Entity)
|
||||
WHERE n.group_id = origin.group_id
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
bfs_origin_node_uuids=origin_uuids,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_node_from_record(r) for r in records]
|
||||
|
||||
# --- Edge search ---
|
||||
|
||||
async def edge_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityEdge]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
get_relationships_query(
|
||||
'edge_name_and_fact', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD relationship AS rel, score
|
||||
MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
WITH e, score, n, m
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
query=fuzzy_query,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
async def edge_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
source_node_uuid: str | None,
|
||||
target_node_uuid: str | None,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[EntityEdge]:
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
if source_node_uuid is not None:
|
||||
filter_params['source_uuid'] = source_node_uuid
|
||||
filter_queries.append('n.uuid = $source_uuid')
|
||||
|
||||
if target_node_uuid is not None:
|
||||
filter_params['target_uuid'] = target_node_uuid
|
||||
filter_queries.append('m.uuid = $target_uuid')
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index on RELATES_TO.fact_embedding when available. The unindexed
|
||||
# fallback is the same MATCH-and-cosine math that previously hung
|
||||
# for 6+ minutes on a 4,000-entity graph; this is the load-bearing
|
||||
# call site that motivated the patch.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'RELATES_TO', 'fact_embedding', 'RELATIONSHIP'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_edge_search_cypher(
|
||||
'RELATES_TO', 'fact_embedding', '$search_vector', use_index
|
||||
)
|
||||
where_clauses = []
|
||||
if filter_query:
|
||||
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**filter_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
async def edge_bfs_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
origin_uuids: list[str],
|
||||
max_depth: int,
|
||||
search_filter: SearchFilters,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EntityEdge]:
|
||||
if not origin_uuids:
|
||||
return []
|
||||
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filter, GraphProvider.FALKORDB
|
||||
)
|
||||
|
||||
if group_ids is not None:
|
||||
filter_queries.append('e.group_id IN $group_ids')
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
filter_query = ''
|
||||
if filter_queries:
|
||||
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
|
||||
|
||||
cypher = (
|
||||
f"""
|
||||
UNWIND $bfs_origin_node_uuids AS origin_uuid
|
||||
MATCH path = (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(:Entity)
|
||||
UNWIND relationships(path) AS rel
|
||||
MATCH (n:Entity)-[e:RELATES_TO {{uuid: rel.uuid}}]-(m:Entity)
|
||||
"""
|
||||
+ filter_query
|
||||
+ """
|
||||
RETURN DISTINCT
|
||||
"""
|
||||
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
|
||||
+ """
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
bfs_origin_node_uuids=origin_uuids,
|
||||
depth=max_depth,
|
||||
limit=limit,
|
||||
**filter_params,
|
||||
)
|
||||
|
||||
return [entity_edge_from_record(r) for r in records]
|
||||
|
||||
# --- Episode search ---
|
||||
|
||||
async def episode_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
search_filter: SearchFilters, # noqa: ARG002
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[EpisodicNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_params: dict[str, Any] = {}
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query += '\nAND e.group_id IN $group_ids'
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'episode_content', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD node AS episode, score
|
||||
MATCH (e:Episodic)
|
||||
WHERE e.uuid = episode.uuid
|
||||
"""
|
||||
+ group_filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ EPISODIC_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||
)
|
||||
|
||||
return [episodic_node_from_record(r) for r in records]
|
||||
|
||||
# --- Community search ---
|
||||
|
||||
async def community_fulltext_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
) -> list[CommunityNode]:
|
||||
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
|
||||
if fuzzy_query == '':
|
||||
return []
|
||||
|
||||
filter_params: dict[str, Any] = {}
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query = 'WHERE c.group_id IN $group_ids'
|
||||
filter_params['group_ids'] = group_ids
|
||||
|
||||
cypher = (
|
||||
get_nodes_query(
|
||||
'community_name', '$query', limit=limit, provider=GraphProvider.FALKORDB
|
||||
)
|
||||
+ """
|
||||
YIELD node AS c, score
|
||||
WITH c, score
|
||||
"""
|
||||
+ group_filter_query
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ COMMUNITY_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
|
||||
records, _, _ = await executor.execute_query(
|
||||
cypher, query=fuzzy_query, limit=limit, **filter_params
|
||||
)
|
||||
|
||||
return [community_node_from_record(r) for r in records]
|
||||
|
||||
async def community_similarity_search(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
search_vector: list[float],
|
||||
group_ids: list[str] | None = None,
|
||||
limit: int = 10,
|
||||
min_score: float = 0.6,
|
||||
) -> list[CommunityNode]:
|
||||
query_params: dict[str, Any] = {}
|
||||
|
||||
group_filter_query = ''
|
||||
if group_ids is not None:
|
||||
group_filter_query += ' WHERE c.group_id IN $group_ids'
|
||||
query_params['group_ids'] = group_ids
|
||||
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
|
||||
# index on Community.name_embedding when available. Note: the existing
|
||||
# filter is built into `group_filter_query` (already prefixed with
|
||||
# ' WHERE ' if non-empty) and uses variable `c`. The dispatcher binds
|
||||
# the node as `n` for parity with the helper signature, then we
|
||||
# re-bind to `c` via WITH so the rest of the query is unchanged.
|
||||
use_index = await _falkordb_vector_index_exists(
|
||||
executor, 'Community', 'name_embedding', 'NODE'
|
||||
)
|
||||
prefix, _ = _falkordb_vector_node_search_cypher(
|
||||
'Community', 'name_embedding', '$search_vector', use_index
|
||||
)
|
||||
prefix = prefix + ' WITH n AS c, score '
|
||||
where_clauses = []
|
||||
if group_filter_query:
|
||||
where_clauses.append(group_filter_query.replace(' WHERE ', '', 1).strip())
|
||||
where_clauses.append('score > $min_score')
|
||||
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
|
||||
|
||||
cypher = (
|
||||
prefix
|
||||
+ unified_where
|
||||
+ """
|
||||
RETURN
|
||||
"""
|
||||
+ COMMUNITY_NODE_RETURN
|
||||
+ """
|
||||
ORDER BY score DESC
|
||||
LIMIT $limit
|
||||
"""
|
||||
)
|
||||
params = dict(
|
||||
search_vector=search_vector,
|
||||
limit=limit,
|
||||
min_score=min_score,
|
||||
**query_params,
|
||||
)
|
||||
if use_index:
|
||||
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
|
||||
records, _, _ = await executor.execute_query(cypher, **params)
|
||||
|
||||
return [community_node_from_record(r) for r in records]
|
||||
|
||||
# --- Rerankers ---
|
||||
|
||||
async def node_distance_reranker(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
node_uuids: list[str],
|
||||
center_node_uuid: str,
|
||||
min_score: float = 0,
|
||||
) -> list[EntityNode]:
|
||||
filtered_uuids = [u for u in node_uuids if u != center_node_uuid]
|
||||
scores: dict[str, float] = {center_node_uuid: 0.0}
|
||||
|
||||
cypher = """
|
||||
UNWIND $node_uuids AS node_uuid
|
||||
MATCH (center:Entity {uuid: $center_uuid})-[:RELATES_TO]-(n:Entity {uuid: node_uuid})
|
||||
RETURN 1 AS score, node_uuid AS uuid
|
||||
"""
|
||||
|
||||
results, _, _ = await executor.execute_query(
|
||||
cypher,
|
||||
node_uuids=filtered_uuids,
|
||||
center_uuid=center_node_uuid,
|
||||
)
|
||||
|
||||
for result in results:
|
||||
scores[result['uuid']] = result['score']
|
||||
|
||||
for uuid in filtered_uuids:
|
||||
if uuid not in scores:
|
||||
scores[uuid] = float('inf')
|
||||
|
||||
filtered_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||
|
||||
if center_node_uuid in node_uuids:
|
||||
scores[center_node_uuid] = 0.1
|
||||
filtered_uuids = [center_node_uuid] + filtered_uuids
|
||||
|
||||
reranked_uuids = [u for u in filtered_uuids if (1 / scores[u]) >= min_score]
|
||||
|
||||
if not reranked_uuids:
|
||||
return []
|
||||
|
||||
get_query = """
|
||||
MATCH (n:Entity)
|
||||
WHERE n.uuid IN $uuids
|
||||
RETURN
|
||||
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
|
||||
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||
|
||||
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||
|
||||
async def episode_mentions_reranker(
|
||||
self,
|
||||
executor: QueryExecutor,
|
||||
node_uuids: list[str],
|
||||
min_score: float = 0,
|
||||
) -> list[EntityNode]:
|
||||
if not node_uuids:
|
||||
return []
|
||||
|
||||
scores: dict[str, float] = {}
|
||||
|
||||
results, _, _ = await executor.execute_query(
|
||||
"""
|
||||
UNWIND $node_uuids AS node_uuid
|
||||
MATCH (episode:Episodic)-[r:MENTIONS]->(n:Entity {uuid: node_uuid})
|
||||
RETURN count(*) AS score, n.uuid AS uuid
|
||||
""",
|
||||
node_uuids=node_uuids,
|
||||
)
|
||||
|
||||
for result in results:
|
||||
scores[result['uuid']] = result['score']
|
||||
|
||||
for uuid in node_uuids:
|
||||
if uuid not in scores:
|
||||
scores[uuid] = float('inf')
|
||||
|
||||
sorted_uuids = list(node_uuids)
|
||||
sorted_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
|
||||
|
||||
reranked_uuids = [u for u in sorted_uuids if scores[u] >= min_score]
|
||||
|
||||
if not reranked_uuids:
|
||||
return []
|
||||
|
||||
get_query = """
|
||||
MATCH (n:Entity)
|
||||
WHERE n.uuid IN $uuids
|
||||
RETURN
|
||||
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
|
||||
|
||||
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
|
||||
|
||||
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
|
||||
return [node_map[u] for u in reranked_uuids if u in node_map]
|
||||
|
||||
# --- Filter builders ---
|
||||
|
||||
def build_node_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||
filter_queries, filter_params = node_search_filter_query_constructor(
|
||||
search_filters, GraphProvider.FALKORDB
|
||||
)
|
||||
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||
|
||||
def build_edge_search_filters(self, search_filters: SearchFilters) -> Any:
|
||||
filter_queries, filter_params = edge_search_filter_query_constructor(
|
||||
search_filters, GraphProvider.FALKORDB
|
||||
)
|
||||
return {'filter_queries': filter_queries, 'filter_params': filter_params}
|
||||
|
||||
# --- Fulltext query builder ---
|
||||
|
||||
def build_fulltext_query(
|
||||
self,
|
||||
query: str,
|
||||
group_ids: list[str] | None = None,
|
||||
max_query_length: int = MAX_QUERY_LENGTH,
|
||||
) -> str:
|
||||
return _build_falkor_fulltext_query(query, group_ids, max_query_length)
|
||||
@@ -0,0 +1,444 @@
|
||||
"""
|
||||
Copyright 2024, Zep Software, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from falkordb import Graph as FalkorGraph
|
||||
from falkordb.asyncio import FalkorDB
|
||||
else:
|
||||
try:
|
||||
from falkordb import Graph as FalkorGraph
|
||||
from falkordb.asyncio import FalkorDB
|
||||
except ImportError:
|
||||
# If falkordb is not installed, raise an ImportError
|
||||
raise ImportError(
|
||||
'falkordb is required for FalkorDriver. '
|
||||
'Install it with: pip install graphiti-core[falkordb]'
|
||||
) from None
|
||||
|
||||
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
|
||||
from graphiti_core.driver.falkordb import STOPWORDS as STOPWORDS
|
||||
from graphiti_core.driver.falkordb.operations.community_edge_ops import (
|
||||
FalkorCommunityEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.community_node_ops import (
|
||||
FalkorCommunityNodeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.entity_edge_ops import FalkorEntityEdgeOperations
|
||||
from graphiti_core.driver.falkordb.operations.entity_node_ops import FalkorEntityNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.episode_node_ops import FalkorEpisodeNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.episodic_edge_ops import FalkorEpisodicEdgeOperations
|
||||
from graphiti_core.driver.falkordb.operations.graph_ops import FalkorGraphMaintenanceOperations
|
||||
from graphiti_core.driver.falkordb.operations.has_episode_edge_ops import (
|
||||
FalkorHasEpisodeEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.next_episode_edge_ops import (
|
||||
FalkorNextEpisodeEdgeOperations,
|
||||
)
|
||||
from graphiti_core.driver.falkordb.operations.saga_node_ops import FalkorSagaNodeOperations
|
||||
from graphiti_core.driver.falkordb.operations.search_ops import FalkorSearchOperations
|
||||
from graphiti_core.driver.operations.community_edge_ops import CommunityEdgeOperations
|
||||
from graphiti_core.driver.operations.community_node_ops import CommunityNodeOperations
|
||||
from graphiti_core.driver.operations.entity_edge_ops import EntityEdgeOperations
|
||||
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
|
||||
from graphiti_core.driver.operations.episode_node_ops import EpisodeNodeOperations
|
||||
from graphiti_core.driver.operations.episodic_edge_ops import EpisodicEdgeOperations
|
||||
from graphiti_core.driver.operations.graph_ops import GraphMaintenanceOperations
|
||||
from graphiti_core.driver.operations.has_episode_edge_ops import HasEpisodeEdgeOperations
|
||||
from graphiti_core.driver.operations.next_episode_edge_ops import NextEpisodeEdgeOperations
|
||||
from graphiti_core.driver.operations.saga_node_ops import SagaNodeOperations
|
||||
from graphiti_core.driver.operations.search_ops import SearchOperations
|
||||
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices, get_vector_indices
|
||||
from graphiti_core.helpers import validate_group_ids
|
||||
from graphiti_core.utils.datetime_utils import convert_datetimes_to_strings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FalkorDriverSession(GraphDriverSession):
|
||||
provider = GraphProvider.FALKORDB
|
||||
|
||||
def __init__(self, graph: FalkorGraph):
|
||||
self.graph = graph
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
# No cleanup needed for Falkor, but method must exist
|
||||
pass
|
||||
|
||||
async def close(self):
|
||||
# No explicit close needed for FalkorDB, but method must exist
|
||||
pass
|
||||
|
||||
async def execute_write(self, func, *args, **kwargs):
|
||||
# Directly await the provided async function with `self` as the transaction/session
|
||||
return await func(self, *args, **kwargs)
|
||||
|
||||
async def run(self, query: str | list, **kwargs: Any) -> Any:
|
||||
# FalkorDB does not support argument for Label Set, so it's converted into an array of queries
|
||||
if isinstance(query, list):
|
||||
for cypher, params in query:
|
||||
params = convert_datetimes_to_strings(params)
|
||||
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
|
||||
else:
|
||||
params = dict(kwargs)
|
||||
params = convert_datetimes_to_strings(params)
|
||||
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
|
||||
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
|
||||
return None
|
||||
|
||||
|
||||
class FalkorDriver(GraphDriver):
|
||||
provider = GraphProvider.FALKORDB
|
||||
default_group_id: str = '\\_'
|
||||
fulltext_syntax: str = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries
|
||||
aoss_client: None = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = 'localhost',
|
||||
port: int = 6379,
|
||||
username: str | None = None,
|
||||
password: str | None = None,
|
||||
falkor_db: FalkorDB | None = None,
|
||||
database: str = 'default_db',
|
||||
):
|
||||
"""
|
||||
Initialize the FalkorDB driver.
|
||||
|
||||
FalkorDB is a multi-tenant graph database.
|
||||
To connect, provide the host and port.
|
||||
The default parameters assume a local (on-premises) FalkorDB instance.
|
||||
|
||||
Args:
|
||||
host (str): The host where FalkorDB is running.
|
||||
port (int): The port on which FalkorDB is listening.
|
||||
username (str | None): The username for authentication (if required).
|
||||
password (str | None): The password for authentication (if required).
|
||||
falkor_db (FalkorDB | None): An existing FalkorDB instance to use instead of creating a new one.
|
||||
database (str): The name of the database to connect to. Defaults to 'default_db'.
|
||||
"""
|
||||
super().__init__()
|
||||
self._database = database
|
||||
if falkor_db is not None:
|
||||
# If a FalkorDB instance is provided, use it directly
|
||||
self.client = falkor_db
|
||||
else:
|
||||
self.client = FalkorDB(host=host, port=port, username=username, password=password)
|
||||
|
||||
# Instantiate FalkorDB operations
|
||||
self._entity_node_ops = FalkorEntityNodeOperations()
|
||||
self._episode_node_ops = FalkorEpisodeNodeOperations()
|
||||
self._community_node_ops = FalkorCommunityNodeOperations()
|
||||
self._saga_node_ops = FalkorSagaNodeOperations()
|
||||
self._entity_edge_ops = FalkorEntityEdgeOperations()
|
||||
self._episodic_edge_ops = FalkorEpisodicEdgeOperations()
|
||||
self._community_edge_ops = FalkorCommunityEdgeOperations()
|
||||
self._has_episode_edge_ops = FalkorHasEpisodeEdgeOperations()
|
||||
self._next_episode_edge_ops = FalkorNextEpisodeEdgeOperations()
|
||||
self._search_ops = FalkorSearchOperations()
|
||||
self._graph_ops = FalkorGraphMaintenanceOperations()
|
||||
|
||||
# Schedule the indices and constraints to be built
|
||||
try:
|
||||
# Try to get the current event loop
|
||||
loop = asyncio.get_running_loop()
|
||||
# Schedule the build_indices_and_constraints to run
|
||||
loop.create_task(self.build_indices_and_constraints())
|
||||
except RuntimeError:
|
||||
# No event loop running, this will be handled later
|
||||
pass
|
||||
|
||||
# --- Operations properties ---
|
||||
|
||||
@property
|
||||
def entity_node_ops(self) -> EntityNodeOperations:
|
||||
return self._entity_node_ops
|
||||
|
||||
@property
|
||||
def episode_node_ops(self) -> EpisodeNodeOperations:
|
||||
return self._episode_node_ops
|
||||
|
||||
@property
|
||||
def community_node_ops(self) -> CommunityNodeOperations:
|
||||
return self._community_node_ops
|
||||
|
||||
@property
|
||||
def saga_node_ops(self) -> SagaNodeOperations:
|
||||
return self._saga_node_ops
|
||||
|
||||
@property
|
||||
def entity_edge_ops(self) -> EntityEdgeOperations:
|
||||
return self._entity_edge_ops
|
||||
|
||||
@property
|
||||
def episodic_edge_ops(self) -> EpisodicEdgeOperations:
|
||||
return self._episodic_edge_ops
|
||||
|
||||
@property
|
||||
def community_edge_ops(self) -> CommunityEdgeOperations:
|
||||
return self._community_edge_ops
|
||||
|
||||
@property
|
||||
def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations:
|
||||
return self._has_episode_edge_ops
|
||||
|
||||
@property
|
||||
def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations:
|
||||
return self._next_episode_edge_ops
|
||||
|
||||
@property
|
||||
def search_ops(self) -> SearchOperations:
|
||||
return self._search_ops
|
||||
|
||||
@property
|
||||
def graph_ops(self) -> GraphMaintenanceOperations:
|
||||
return self._graph_ops
|
||||
|
||||
def _get_graph(self, graph_name: str | None) -> FalkorGraph:
|
||||
# FalkorDB requires a non-None database name for multi-tenant graphs; the default is "default_db"
|
||||
if graph_name is None:
|
||||
graph_name = self._database
|
||||
return self.client.select_graph(graph_name)
|
||||
|
||||
async def execute_query(self, cypher_query_, **kwargs: Any):
|
||||
graph = self._get_graph(self._database)
|
||||
|
||||
# Convert datetime objects to ISO strings (FalkorDB does not support datetime objects directly)
|
||||
params = convert_datetimes_to_strings(dict(kwargs))
|
||||
|
||||
try:
|
||||
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
|
||||
except Exception as e:
|
||||
if 'already indexed' in str(e):
|
||||
# check if index already exists
|
||||
logger.info(f'Index already exists: {e}')
|
||||
return None
|
||||
logger.error(f'Error executing FalkorDB query: {e}\n{cypher_query_}\n{params}')
|
||||
raise
|
||||
|
||||
# Convert the result header to a list of strings
|
||||
header = [h[1] for h in result.header]
|
||||
|
||||
# Convert FalkorDB's result format (list of lists) to the format expected by Graphiti (list of dicts)
|
||||
records = []
|
||||
for row in result.result_set:
|
||||
record = {}
|
||||
for i, field_name in enumerate(header):
|
||||
if i < len(row):
|
||||
record[field_name] = row[i]
|
||||
else:
|
||||
# If there are more fields in header than values in row, set to None
|
||||
record[field_name] = None
|
||||
records.append(record)
|
||||
|
||||
return records, header, None
|
||||
|
||||
def session(self, database: str | None = None) -> GraphDriverSession:
|
||||
return FalkorDriverSession(self._get_graph(database))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the driver connection."""
|
||||
if hasattr(self.client, 'aclose'):
|
||||
await self.client.aclose() # type: ignore[reportUnknownMemberType]
|
||||
elif hasattr(self.client.connection, 'aclose'):
|
||||
await self.client.connection.aclose()
|
||||
elif hasattr(self.client.connection, 'close'):
|
||||
await self.client.connection.close()
|
||||
|
||||
async def delete_all_indexes(self) -> None:
|
||||
result = await self.execute_query('CALL db.indexes()')
|
||||
if not result:
|
||||
return
|
||||
|
||||
records, _, _ = result
|
||||
drop_tasks = []
|
||||
|
||||
for record in records:
|
||||
label = record['label']
|
||||
entity_type = record['entitytype']
|
||||
|
||||
for field_name, index_type in record['types'].items():
|
||||
if 'RANGE' in index_type:
|
||||
drop_tasks.append(self.execute_query(f'DROP INDEX ON :{label}({field_name})'))
|
||||
elif 'FULLTEXT' in index_type:
|
||||
if entity_type == 'NODE':
|
||||
drop_tasks.append(
|
||||
self.execute_query(
|
||||
f'DROP FULLTEXT INDEX FOR (n:{label}) ON (n.{field_name})'
|
||||
)
|
||||
)
|
||||
elif entity_type == 'RELATIONSHIP':
|
||||
drop_tasks.append(
|
||||
self.execute_query(
|
||||
f'DROP FULLTEXT INDEX FOR ()-[e:{label}]-() ON (e.{field_name})'
|
||||
)
|
||||
)
|
||||
|
||||
if drop_tasks:
|
||||
await asyncio.gather(*drop_tasks)
|
||||
|
||||
async def build_indices_and_constraints(self, delete_existing=False):
|
||||
if delete_existing:
|
||||
await self.delete_all_indexes()
|
||||
# PATCHED 2026-05-02 (BirdAI vendored patch): add vector indexes alongside
|
||||
# range and fulltext. FalkorDB supports native vector indexes via
|
||||
# db.idx.vector.queryNodes / queryRelationships; without these, similarity
|
||||
# search runs as full-table-scan cosine math in interpreted Cypher.
|
||||
index_queries = (
|
||||
get_range_indices(self.provider)
|
||||
+ get_fulltext_indices(self.provider)
|
||||
+ get_vector_indices(self.provider)
|
||||
)
|
||||
for query in index_queries:
|
||||
await self.execute_query(query)
|
||||
# Invalidate the search_ops vector-index existence cache so subsequent
|
||||
# similarity queries re-probe and discover the indexes we just built.
|
||||
try:
|
||||
from graphiti_core.driver.falkordb.operations.search_ops import (
|
||||
_invalidate_falkordb_vector_index_cache,
|
||||
)
|
||||
_invalidate_falkordb_vector_index_cache()
|
||||
except ImportError:
|
||||
# search_ops module not yet imported (cold start); cache is empty
|
||||
# by default, so no invalidation needed.
|
||||
pass
|
||||
|
||||
def clone(self, database: str) -> 'GraphDriver':
|
||||
"""
|
||||
Returns a shallow copy of this driver with a different default database.
|
||||
Reuses the same connection (e.g. FalkorDB, Neo4j).
|
||||
"""
|
||||
if database == self._database:
|
||||
cloned = self
|
||||
elif database == self.default_group_id:
|
||||
cloned = FalkorDriver(falkor_db=self.client)
|
||||
else:
|
||||
# Create a new instance of FalkorDriver with the same connection but a different database
|
||||
cloned = FalkorDriver(falkor_db=self.client, database=database)
|
||||
|
||||
return cloned
|
||||
|
||||
async def health_check(self) -> None:
|
||||
"""Check FalkorDB connectivity by running a simple query."""
|
||||
try:
|
||||
await self.execute_query('MATCH (n) RETURN 1 LIMIT 1')
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f'FalkorDB health check failed: {e}')
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def convert_datetimes_to_strings(obj):
|
||||
if isinstance(obj, dict):
|
||||
return {k: FalkorDriver.convert_datetimes_to_strings(v) for k, v in obj.items()}
|
||||
elif isinstance(obj, list):
|
||||
return [FalkorDriver.convert_datetimes_to_strings(item) for item in obj]
|
||||
elif isinstance(obj, tuple):
|
||||
return tuple(FalkorDriver.convert_datetimes_to_strings(item) for item in obj)
|
||||
elif isinstance(obj, datetime):
|
||||
return obj.isoformat()
|
||||
else:
|
||||
return obj
|
||||
|
||||
def sanitize(self, query: str) -> str:
|
||||
"""
|
||||
Replace FalkorDB special characters with whitespace.
|
||||
Based on FalkorDB tokenization rules: ,.<>{}[]"':;!@#$%^&*()-+=~
|
||||
"""
|
||||
# FalkorDB separator characters that break text into tokens
|
||||
separator_map = str.maketrans(
|
||||
{
|
||||
',': ' ',
|
||||
'.': ' ',
|
||||
'<': ' ',
|
||||
'>': ' ',
|
||||
'{': ' ',
|
||||
'}': ' ',
|
||||
'[': ' ',
|
||||
']': ' ',
|
||||
'"': ' ',
|
||||
"'": ' ',
|
||||
':': ' ',
|
||||
';': ' ',
|
||||
'!': ' ',
|
||||
'@': ' ',
|
||||
'#': ' ',
|
||||
'$': ' ',
|
||||
'%': ' ',
|
||||
'^': ' ',
|
||||
'&': ' ',
|
||||
'*': ' ',
|
||||
'(': ' ',
|
||||
')': ' ',
|
||||
'-': ' ',
|
||||
'+': ' ',
|
||||
'=': ' ',
|
||||
'~': ' ',
|
||||
'?': ' ',
|
||||
'|': ' ',
|
||||
'/': ' ',
|
||||
'\\': ' ',
|
||||
}
|
||||
)
|
||||
sanitized = query.translate(separator_map)
|
||||
# Clean up multiple spaces
|
||||
sanitized = ' '.join(sanitized.split())
|
||||
return sanitized
|
||||
|
||||
def build_fulltext_query(
|
||||
self, query: str, group_ids: list[str] | None = None, max_query_length: int = 128
|
||||
) -> str:
|
||||
"""
|
||||
Build a fulltext query string for FalkorDB using RedisSearch syntax.
|
||||
FalkorDB uses RedisSearch-like syntax where:
|
||||
- Field queries use @ prefix: @field:value
|
||||
- Multiple values for same field: (@field:value1|value2)
|
||||
- Text search doesn't need @ prefix for content fields
|
||||
- AND is implicit with space: (@group_id:value) (text)
|
||||
- OR uses pipe within parentheses: (@group_id:value1|value2)
|
||||
"""
|
||||
validate_group_ids(group_ids)
|
||||
|
||||
if group_ids is None or len(group_ids) == 0:
|
||||
group_filter = ''
|
||||
else:
|
||||
# Escape group_ids with quotes to prevent RediSearch syntax errors
|
||||
# with reserved words like "main" or special characters like hyphens
|
||||
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
|
||||
group_values = '|'.join(escaped_group_ids)
|
||||
group_filter = f'(@group_id:{group_values})'
|
||||
|
||||
sanitized_query = self.sanitize(query)
|
||||
|
||||
# Remove stopwords and empty tokens from the sanitized query
|
||||
query_words = sanitized_query.split()
|
||||
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
|
||||
sanitized_query = ' | '.join(filtered_words)
|
||||
|
||||
# If the query is too long return no query
|
||||
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
|
||||
return ''
|
||||
|
||||
full_query = group_filter + ' (' + sanitized_query + ')'
|
||||
|
||||
return full_query
|
||||
@@ -0,0 +1,242 @@
|
||||
"""
|
||||
Database query utilities for different graph database backends.
|
||||
|
||||
This module provides database-agnostic query generation for Neo4j and FalkorDB,
|
||||
supporting index creation, fulltext search, and bulk operations.
|
||||
|
||||
PATCHED for FalkorDB native vector index support (BirdAI vendored patch,
|
||||
2026-05-02). Adds:
|
||||
- get_vector_indices(): CREATE VECTOR INDEX statements for FalkorDB
|
||||
- get_vector_search_query(): Cypher fragment for vector similarity using
|
||||
FalkorDB's db.idx.vector procedures, with fallback to cosine math when
|
||||
the index does not yet exist
|
||||
- VECTOR_INDEX_CANDIDATE_MULTIPLIER: over-fetch factor for vector index
|
||||
queries to handle filter rejections after index lookup
|
||||
|
||||
No changes to Neo4j or Kuzu code paths.
|
||||
"""
|
||||
|
||||
from typing_extensions import LiteralString
|
||||
|
||||
from graphiti_core.driver.driver import GraphProvider
|
||||
|
||||
# Mapping from Neo4j fulltext index names to FalkorDB node labels
|
||||
NEO4J_TO_FALKORDB_MAPPING = {
|
||||
'node_name_and_summary': 'Entity',
|
||||
'community_name': 'Community',
|
||||
'episode_content': 'Episodic',
|
||||
'edge_name_and_fact': 'RELATES_TO',
|
||||
}
|
||||
# Mapping from fulltext index names to Kuzu node labels
|
||||
INDEX_TO_LABEL_KUZU_MAPPING = {
|
||||
'node_name_and_summary': 'Entity',
|
||||
'community_name': 'Community',
|
||||
'episode_content': 'Episodic',
|
||||
'edge_name_and_fact': 'RelatesToNode_',
|
||||
}
|
||||
|
||||
# Vector index over-fetch multiplier. When a vector index search is
|
||||
# combined with WHERE filters (group_id, source_uuid, etc.), some of
|
||||
# the top-k index results may be filtered out. Over-fetching by this
|
||||
# factor preserves recall against the final LIMIT after filtering.
|
||||
# Conservative default; tunable per-deployment by editing this constant
|
||||
# or via environment-variable override at the driver level (future).
|
||||
VECTOR_INDEX_CANDIDATE_MULTIPLIER = 5
|
||||
|
||||
|
||||
def get_range_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
return [
|
||||
# Entity node
|
||||
'CREATE INDEX FOR (n:Entity) ON (n.uuid, n.group_id, n.name, n.created_at)',
|
||||
# Episodic node
|
||||
'CREATE INDEX FOR (n:Episodic) ON (n.uuid, n.group_id, n.created_at, n.valid_at)',
|
||||
# Community node
|
||||
'CREATE INDEX FOR (n:Community) ON (n.uuid)',
|
||||
# Saga node
|
||||
'CREATE INDEX FOR (n:Saga) ON (n.uuid, n.group_id, n.name)',
|
||||
# RELATES_TO edge
|
||||
'CREATE INDEX FOR ()-[e:RELATES_TO]-() ON (e.uuid, e.group_id, e.name, e.created_at, e.expired_at, e.valid_at, e.invalid_at)',
|
||||
# MENTIONS edge
|
||||
'CREATE INDEX FOR ()-[e:MENTIONS]-() ON (e.uuid, e.group_id)',
|
||||
# HAS_MEMBER edge
|
||||
'CREATE INDEX FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||
# HAS_EPISODE edge
|
||||
'CREATE INDEX FOR ()-[e:HAS_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||
# NEXT_EPISODE edge
|
||||
'CREATE INDEX FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid, e.group_id)',
|
||||
]
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return []
|
||||
|
||||
return [
|
||||
'CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)',
|
||||
'CREATE INDEX episode_uuid IF NOT EXISTS FOR (n:Episodic) ON (n.uuid)',
|
||||
'CREATE INDEX community_uuid IF NOT EXISTS FOR (n:Community) ON (n.uuid)',
|
||||
'CREATE INDEX saga_uuid IF NOT EXISTS FOR (n:Saga) ON (n.uuid)',
|
||||
'CREATE INDEX relation_uuid IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.uuid)',
|
||||
'CREATE INDEX mention_uuid IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.uuid)',
|
||||
'CREATE INDEX has_member_uuid IF NOT EXISTS FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
|
||||
'CREATE INDEX has_episode_uuid IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.uuid)',
|
||||
'CREATE INDEX next_episode_uuid IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid)',
|
||||
'CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id)',
|
||||
'CREATE INDEX episode_group_id IF NOT EXISTS FOR (n:Episodic) ON (n.group_id)',
|
||||
'CREATE INDEX community_group_id IF NOT EXISTS FOR (n:Community) ON (n.group_id)',
|
||||
'CREATE INDEX saga_group_id IF NOT EXISTS FOR (n:Saga) ON (n.group_id)',
|
||||
'CREATE INDEX relation_group_id IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.group_id)',
|
||||
'CREATE INDEX mention_group_id IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.group_id)',
|
||||
'CREATE INDEX has_episode_group_id IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.group_id)',
|
||||
'CREATE INDEX next_episode_group_id IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.group_id)',
|
||||
'CREATE INDEX name_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.name)',
|
||||
'CREATE INDEX saga_name IF NOT EXISTS FOR (n:Saga) ON (n.name)',
|
||||
'CREATE INDEX created_at_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.created_at)',
|
||||
'CREATE INDEX created_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.created_at)',
|
||||
'CREATE INDEX valid_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.valid_at)',
|
||||
'CREATE INDEX name_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.name)',
|
||||
'CREATE INDEX created_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.created_at)',
|
||||
'CREATE INDEX expired_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.expired_at)',
|
||||
'CREATE INDEX valid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.valid_at)',
|
||||
'CREATE INDEX invalid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.invalid_at)',
|
||||
]
|
||||
|
||||
|
||||
def get_fulltext_indices(provider: GraphProvider) -> list[LiteralString]:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
from typing import cast
|
||||
|
||||
from graphiti_core.driver.falkordb import STOPWORDS
|
||||
|
||||
# Convert to string representation for embedding in queries
|
||||
stopwords_str = str(STOPWORDS)
|
||||
|
||||
# Use type: ignore to satisfy LiteralString requirement while maintaining single source of truth
|
||||
return cast(
|
||||
list[LiteralString],
|
||||
[
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Episodic',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'content', 'source', 'source_description', 'group_id'
|
||||
)""",
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Entity',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'name', 'summary', 'group_id'
|
||||
)""",
|
||||
f"""CALL db.idx.fulltext.createNodeIndex(
|
||||
{{
|
||||
label: 'Community',
|
||||
stopwords: {stopwords_str}
|
||||
}},
|
||||
'name', 'group_id'
|
||||
)""",
|
||||
"""CREATE FULLTEXT INDEX FOR ()-[e:RELATES_TO]-() ON (e.name, e.fact, e.group_id)""",
|
||||
],
|
||||
)
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return [
|
||||
"CALL CREATE_FTS_INDEX('Episodic', 'episode_content', ['content', 'source', 'source_description']);",
|
||||
"CALL CREATE_FTS_INDEX('Entity', 'node_name_and_summary', ['name', 'summary']);",
|
||||
"CALL CREATE_FTS_INDEX('Community', 'community_name', ['name']);",
|
||||
"CALL CREATE_FTS_INDEX('RelatesToNode_', 'edge_name_and_fact', ['name', 'fact']);",
|
||||
]
|
||||
|
||||
return [
|
||||
"""CREATE FULLTEXT INDEX episode_content IF NOT EXISTS
|
||||
FOR (e:Episodic) ON EACH [e.content, e.source, e.source_description, e.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX node_name_and_summary IF NOT EXISTS
|
||||
FOR (n:Entity) ON EACH [n.name, n.summary, n.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX community_name IF NOT EXISTS
|
||||
FOR (n:Community) ON EACH [n.name, n.group_id]""",
|
||||
"""CREATE FULLTEXT INDEX edge_name_and_fact IF NOT EXISTS
|
||||
FOR ()-[e:RELATES_TO]-() ON EACH [e.name, e.fact, e.group_id]""",
|
||||
]
|
||||
|
||||
|
||||
def get_vector_indices(provider: GraphProvider, dimension: int = 384) -> list[LiteralString]:
|
||||
"""Return CREATE VECTOR INDEX statements for the given provider.
|
||||
|
||||
For FalkorDB: creates HNSW vector indexes on Entity.name_embedding,
|
||||
RELATES_TO.fact_embedding, and Community.name_embedding. Backed by
|
||||
FalkorDB's native vector index (db.idx.vector.queryNodes /
|
||||
queryRelationships).
|
||||
|
||||
For Neo4j and Kuzu: returns an empty list. Those backends create vector
|
||||
indexes via different mechanisms (Neo4j auto-creates them when needed
|
||||
via its vector.similarity.cosine function; Kuzu uses array_cosine_similarity
|
||||
and does not require pre-built vector indexes for graphiti-core's usage).
|
||||
|
||||
Args:
|
||||
provider: The graph database provider.
|
||||
dimension: Embedding dimension. Defaults to 384 (all-MiniLM-L6-v2).
|
||||
Embedders with different dimensions should pass their own value
|
||||
through driver configuration. graphiti-core's default embedder
|
||||
is 1536 (OpenAI ada-002); BirdAI uses 384 (sentence-transformers).
|
||||
|
||||
Returns:
|
||||
List of CREATE VECTOR INDEX statements. Idempotent at FalkorDB level
|
||||
if the index already exists with matching options.
|
||||
"""
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
from typing import cast
|
||||
return cast(
|
||||
list[LiteralString],
|
||||
[
|
||||
f"CREATE VECTOR INDEX FOR (n:Entity) ON (n.name_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
f"CREATE VECTOR INDEX FOR ()-[e:RELATES_TO]-() ON (e.fact_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
f"CREATE VECTOR INDEX FOR (n:Community) ON (n.name_embedding) "
|
||||
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
|
||||
],
|
||||
)
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def get_nodes_query(name: str, query: str, limit: int, provider: GraphProvider) -> str:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||
return f"CALL db.idx.fulltext.queryNodes('{label}', {query})"
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', {query}, TOP := $limit)"
|
||||
|
||||
return f'CALL db.index.fulltext.queryNodes("{name}", {query}, {{limit: $limit}})'
|
||||
|
||||
|
||||
def get_vector_cosine_func_query(vec1, vec2, provider: GraphProvider) -> str:
|
||||
"""Return a Cypher fragment for cosine similarity score in [0, 1].
|
||||
|
||||
PRESERVED for backward compatibility and as fallback when vector indexes
|
||||
do not yet exist on the FalkorDB backend. New code paths should prefer
|
||||
get_vector_search_query() which uses the native vector index when
|
||||
available.
|
||||
"""
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
# FalkorDB uses a different syntax for regular cosine similarity and Neo4j uses normalized cosine similarity
|
||||
return f'(2 - vec.cosineDistance({vec1}, vecf32({vec2})))/2'
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
return f'array_cosine_similarity({vec1}, {vec2})'
|
||||
|
||||
return f'vector.similarity.cosine({vec1}, {vec2})'
|
||||
|
||||
|
||||
def get_relationships_query(name: str, limit: int, provider: GraphProvider) -> str:
|
||||
if provider == GraphProvider.FALKORDB:
|
||||
label = NEO4J_TO_FALKORDB_MAPPING[name]
|
||||
return f"CALL db.idx.fulltext.queryRelationships('{label}', $query)"
|
||||
|
||||
if provider == GraphProvider.KUZU:
|
||||
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
|
||||
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', cast($query AS STRING), TOP := $limit)"
|
||||
|
||||
return f'CALL db.index.fulltext.queryRelationships("{name}", $query, {{limit: $limit}})'
|
||||
@@ -0,0 +1,23 @@
|
||||
-- 20260501-001 — Stage 3 queue routing columns for Phase A bulk-vs-single-episode routing
|
||||
--
|
||||
-- Adds four columns and one index to stage_3_queue, written by Stage 2 v2.2
|
||||
-- and read by Stage 3 v2.3 to choose between bulk and single-episode ingest
|
||||
-- pathways. See architecture doc and Phase A handoff (2026-05-01) for design.
|
||||
--
|
||||
-- Required by:
|
||||
-- scripts/stage2_worker.py >= 2.2
|
||||
-- scripts/stage3_worker.py >= 2.3
|
||||
--
|
||||
-- Idempotent: safe to re-apply against a database where the columns already
|
||||
-- exist (was applied live before this file was created).
|
||||
|
||||
ALTER TABLE stage_3_queue
|
||||
ADD COLUMN IF NOT EXISTS state_type TEXT,
|
||||
ADD COLUMN IF NOT EXISTS state_type_confidence TEXT,
|
||||
ADD COLUMN IF NOT EXISTS supersedes_prior_state BOOLEAN,
|
||||
ADD COLUMN IF NOT EXISTS state_type_rationale TEXT;
|
||||
|
||||
-- Index on the routing signal — Stage 3 reads this on every dequeue,
|
||||
-- and observability queries (item 6: routing_decisions) will filter on it.
|
||||
CREATE INDEX IF NOT EXISTS stage_3_queue_supersedes_idx
|
||||
ON stage_3_queue (supersedes_prior_state);
|
||||
@@ -0,0 +1,55 @@
|
||||
-- Migration: 20260502-001_async_job_model
|
||||
-- Purpose: Pattern 1 async job model — sidecar processes ingest jobs serially
|
||||
-- via Postgres-backed queue. Worker submits and polls rather than
|
||||
-- blocking on synchronous HTTP response.
|
||||
--
|
||||
-- Architectural rationale: tonight's smoke test (2026-05-02 ~01:40-01:50 UTC)
|
||||
-- diagnosed that bulk ingest against a 4,222-entity graph commits successfully
|
||||
-- but the worker's HTTP read-timeout fires before the response returns. Three
|
||||
-- days of "saga deadlock" failures were false negatives — the work succeeded;
|
||||
-- the worker just stopped listening. Pattern 1 separates submission from
|
||||
-- completion observation so the worker can't false-negative this way.
|
||||
--
|
||||
-- The job model is also the natural data source for Phase A items 6-7
|
||||
-- (metrics tables) — graphiti_jobs records duration, status transitions,
|
||||
-- and per-job summary that those tables will aggregate.
|
||||
--
|
||||
-- Idempotent: safe to re-run.
|
||||
|
||||
-- Job state for sidecar's async ingest queue.
|
||||
-- One row per submitted bulk-or-single ingest. Sidecar reads queued jobs
|
||||
-- on startup to resume after restart. Worker polls status until terminal.
|
||||
CREATE TABLE IF NOT EXISTS graphiti_jobs (
|
||||
job_id UUID PRIMARY KEY,
|
||||
job_type TEXT NOT NULL CHECK (job_type IN ('bulk', 'single')),
|
||||
payload JSONB NOT NULL, -- full submitted request body
|
||||
status TEXT NOT NULL DEFAULT 'queued' -- 'queued'|'running'|'committed'|'failed'
|
||||
CHECK (status IN ('queued', 'running', 'committed', 'failed')),
|
||||
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
started_at TIMESTAMPTZ,
|
||||
finished_at TIMESTAMPTZ,
|
||||
error TEXT, -- non-null when status='failed'
|
||||
summary JSONB, -- {nodes: N, edges: N, episodes: N}
|
||||
submitted_by TEXT -- worker name for traceability
|
||||
);
|
||||
|
||||
-- Index supporting sidecar's "pick next queued job" query
|
||||
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_queued
|
||||
ON graphiti_jobs (enqueued_at)
|
||||
WHERE status = 'queued';
|
||||
|
||||
-- Index supporting worker's "poll my job by id" query (PK already does this,
|
||||
-- but explicit index aids ANALYZE behavior on small tables)
|
||||
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_status
|
||||
ON graphiti_jobs (status);
|
||||
|
||||
-- Stage 3 queue gains a reference to the sidecar job processing the row.
|
||||
-- When set, worker polls graphiti_jobs.status rather than blocking on HTTP.
|
||||
-- NULL means: row not yet submitted, or pre-Pattern-1 row.
|
||||
ALTER TABLE stage_3_queue
|
||||
ADD COLUMN IF NOT EXISTS external_job_id UUID;
|
||||
|
||||
-- Index for "find rows that submitted but didn't complete" recovery scans
|
||||
CREATE INDEX IF NOT EXISTS idx_stage_3_queue_external_job
|
||||
ON stage_3_queue (external_job_id)
|
||||
WHERE external_job_id IS NOT NULL AND completed_at IS NULL AND failed_at IS NULL;
|
||||
@@ -0,0 +1,37 @@
|
||||
# BirdAI database migrations
|
||||
|
||||
Schema changes applied to the BirdAI Postgres database, in chronological order.
|
||||
Filenames are YYYYMMDD-NNN_short_description.sql where NNN is a sequence number
|
||||
within the day for ordering when multiple migrations land same-day.
|
||||
|
||||
## Conventions
|
||||
|
||||
- Each file is idempotent: uses IF NOT EXISTS / IF EXISTS so it can be
|
||||
re-run safely against a database that already has the change applied. This
|
||||
matters because we don't track which migrations a given DB has applied (no
|
||||
migrations table yet — that's its own future migration).
|
||||
- Each file is a single logical change: one feature, one rollout. Don't pile
|
||||
unrelated DDL into one file.
|
||||
- Each file documents what it's for and which worker version requires it
|
||||
in a header comment, so the relationship between schema and code is legible
|
||||
from either side.
|
||||
- Migrations are forward-only. No down-migrations. If a change is wrong,
|
||||
write a new migration that fixes it.
|
||||
|
||||
## Applying
|
||||
|
||||
Against the live DB:
|
||||
|
||||
psql "$PG_DSN" -f migrations/YYYYMMDD-NNN_name.sql
|
||||
|
||||
Against a fresh DB (disaster recovery, dev clone), apply all files in order:
|
||||
|
||||
for f in migrations/*.sql; do
|
||||
echo "Applying $f"
|
||||
psql "$PG_DSN" -f "$f"
|
||||
done
|
||||
|
||||
## Pending: migrations tracking table
|
||||
|
||||
There is no schema_migrations table yet. Adding one is itself a migration —
|
||||
deferred until a second migration after this one lands and the need is real.
|
||||
+421
-54
@@ -1,14 +1,44 @@
|
||||
"""
|
||||
Aaron AI — Graphiti Sidecar Service
|
||||
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
|
||||
Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model)
|
||||
|
||||
Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest
|
||||
submission and completion are decoupled. Submitters POST to /episodes or
|
||||
/episodes/bulk and receive a job_id; an in-process background worker
|
||||
processes jobs serially against the graph; submitters poll GET /jobs/{id}
|
||||
until terminal status.
|
||||
|
||||
Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk
|
||||
ingest against the 4,222-entity graph commits successfully even when the
|
||||
worker's HTTP read-timeout fires. The synchronous interface was producing
|
||||
false-negative failures — work succeeded but the worker stopped listening.
|
||||
Pattern 1 separates submission from completion observation so the worker
|
||||
can't false-negative this way.
|
||||
|
||||
Architectural commitments:
|
||||
- One in-flight job per sidecar (per graph). Concurrent jobs against the
|
||||
same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk
|
||||
(no transaction boundary, no internal coordination). Concurrent
|
||||
multi-tenancy is "run multiple sidecars," not "make one sidecar
|
||||
concurrency-safe across graphs."
|
||||
- Postgres-backed job state. Survives sidecar restart. On startup the
|
||||
sidecar resets any 'running' rows to 'queued' (their previous run died);
|
||||
the background worker picks them up naturally.
|
||||
- Both /episodes and /episodes/bulk are async-shaped for parity. graphiti-
|
||||
core operations underneath (add_episode, add_episode_bulk) are unchanged.
|
||||
- The bulk pathway is preserved — load-bearing for first-run corpus
|
||||
migration. Single-episode is preserved — load-bearing for state-
|
||||
superseding content per the Stage 2/3 routing rule.
|
||||
|
||||
Port 8001 (internal only). No OpenAI dependency.
|
||||
"""
|
||||
|
||||
import os, logging, sys, traceback
|
||||
import os, logging, sys, asyncio, traceback, uuid, json
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extras
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from pydantic import BaseModel
|
||||
@@ -31,8 +61,18 @@ FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
|
||||
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
|
||||
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
|
||||
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
|
||||
PG_DSN = os.getenv("PG_DSN")
|
||||
SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1")
|
||||
os.environ["EMBEDDING_DIM"] = "384"
|
||||
|
||||
# Background worker configuration. Polls Postgres for queued jobs every
|
||||
# WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design;
|
||||
# no concurrency primitive beyond the serial loop. The sleep is brief
|
||||
# enough to feel responsive but long enough to avoid burning CPU on an
|
||||
# empty queue.
|
||||
WORKER_POLL_INTERVAL = 2.0
|
||||
|
||||
|
||||
def get_llm_client():
|
||||
from graphiti_core.llm_client.config import LLMConfig
|
||||
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
|
||||
@@ -50,16 +90,286 @@ def get_llm_client():
|
||||
return GroqClient(config)
|
||||
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
|
||||
|
||||
graphiti_instance = None
|
||||
|
||||
async def get_graphiti():
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
return graphiti_instance
|
||||
graphiti_instance = None
|
||||
worker_task = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Postgres job-state helpers. Synchronous psycopg2 calls inside async
|
||||
# functions: each call opens a fresh connection, runs one statement, closes.
|
||||
# Acceptable here because traffic is low (single-digit jobs/min steady state)
|
||||
# and the simplicity is worth more than connection pooling. If this ever
|
||||
# becomes a bottleneck, swap to asyncpg or psycopg3 async.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
|
||||
|
||||
def _job_insert(job_id: str, job_type: str, payload: dict) -> None:
|
||||
"""Write a new job row in 'queued' status."""
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by)
|
||||
VALUES (%s, %s, %s::jsonb, 'queued', %s)
|
||||
""",
|
||||
(job_id, job_type, json.dumps(payload), SIDECAR_NAME),
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _job_get(job_id: str) -> dict | None:
|
||||
"""Read a single job by id. Returns None if not found."""
|
||||
pg = _pg()
|
||||
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT job_id, job_type, status, enqueued_at, started_at, finished_at,
|
||||
error, summary, submitted_by
|
||||
FROM graphiti_jobs
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(job_id,),
|
||||
)
|
||||
row = cur.fetchone()
|
||||
pg.close()
|
||||
if row is None:
|
||||
return None
|
||||
# Convert UUID, datetimes for JSON serialization
|
||||
return {
|
||||
"job_id": str(row["job_id"]),
|
||||
"job_type": row["job_type"],
|
||||
"status": row["status"],
|
||||
"enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None,
|
||||
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
|
||||
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else None,
|
||||
"error": row["error"],
|
||||
"summary": row["summary"],
|
||||
"submitted_by": row["submitted_by"],
|
||||
}
|
||||
|
||||
|
||||
def _job_claim_next() -> dict | None:
|
||||
"""Atomically claim the oldest queued job for processing.
|
||||
|
||||
Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances
|
||||
(future multi-tenant deployment) don't fight over the same row. For
|
||||
single-sidecar deployments this is just a clean atomic transition.
|
||||
|
||||
Returns the full job row (including payload) or None if queue is empty.
|
||||
"""
|
||||
pg = _pg()
|
||||
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||||
cur.execute(
|
||||
"""
|
||||
WITH next_job AS (
|
||||
SELECT job_id
|
||||
FROM graphiti_jobs
|
||||
WHERE status = 'queued'
|
||||
ORDER BY enqueued_at ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
UPDATE graphiti_jobs g
|
||||
SET status = 'running', started_at = NOW()
|
||||
FROM next_job
|
||||
WHERE g.job_id = next_job.job_id
|
||||
RETURNING g.job_id, g.job_type, g.payload
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
pg.commit()
|
||||
pg.close()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"job_id": str(row["job_id"]),
|
||||
"job_type": row["job_type"],
|
||||
"payload": row["payload"], # already a dict via JSONB
|
||||
}
|
||||
|
||||
|
||||
def _job_complete(job_id: str, summary: dict) -> None:
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'committed', finished_at = NOW(), summary = %s::jsonb
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(json.dumps(summary), job_id),
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _job_fail(job_id: str, error: str) -> None:
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'failed', finished_at = NOW(), error = %s
|
||||
WHERE job_id = %s
|
||||
""",
|
||||
(error[:2000], job_id), # truncate to keep error column reasonable
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
|
||||
def _startup_recovery() -> int:
|
||||
"""Reset any 'running' jobs to 'queued' on startup.
|
||||
|
||||
Rationale: if the sidecar died while processing a job, that row is
|
||||
stuck in 'running' with no process advancing it. The right behavior
|
||||
on restart is to retry. graphiti-core's add_episode_bulk and
|
||||
add_episode are idempotent against the graph (dedup handles duplicate
|
||||
submission), so re-running a job is safe — at worst, a second run
|
||||
incurs API spend on resolve calls that no-op against an already-
|
||||
committed entity set.
|
||||
|
||||
Returns the count of recovered jobs.
|
||||
"""
|
||||
pg = _pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
UPDATE graphiti_jobs
|
||||
SET status = 'queued', started_at = NULL
|
||||
WHERE status = 'running'
|
||||
"""
|
||||
)
|
||||
count = cur.rowcount
|
||||
pg.commit()
|
||||
pg.close()
|
||||
return count
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Background worker — single asyncio task running for the sidecar lifetime.
|
||||
# Processes one job at a time. No concurrency. Restart recovery is handled
|
||||
# by _startup_recovery() before this task starts.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def background_worker():
|
||||
"""Serial job processor. Polls graphiti_jobs, processes one at a time."""
|
||||
log.info("Background worker started")
|
||||
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||
|
||||
while True:
|
||||
try:
|
||||
claimed = _job_claim_next()
|
||||
if claimed is None:
|
||||
await asyncio.sleep(WORKER_POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
job_id = claimed["job_id"]
|
||||
job_type = claimed["job_type"]
|
||||
payload = claimed["payload"]
|
||||
|
||||
log.info(f"Processing job {job_id} (type={job_type})")
|
||||
start = datetime.now()
|
||||
|
||||
try:
|
||||
if job_type == "bulk":
|
||||
summary = await _process_bulk_job(payload, EpisodeType, RawEpisode)
|
||||
elif job_type == "single":
|
||||
summary = await _process_single_job(payload, EpisodeType)
|
||||
else:
|
||||
raise ValueError(f"Unknown job_type: {job_type}")
|
||||
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
summary["duration_seconds"] = duration
|
||||
_job_complete(job_id, summary)
|
||||
log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}")
|
||||
except Exception as e:
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
err = f"{type(e).__name__}: {e}"
|
||||
log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}")
|
||||
_job_fail(job_id, err)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
log.info("Background worker cancelled")
|
||||
raise
|
||||
except Exception as e:
|
||||
# Defensive: don't let the worker loop die from an unexpected error.
|
||||
# Log it, sleep briefly, continue.
|
||||
log.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
|
||||
await asyncio.sleep(5.0)
|
||||
|
||||
|
||||
async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict:
|
||||
"""Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest."""
|
||||
raw_episodes = []
|
||||
for ep in payload["episodes"]:
|
||||
ref_time = (
|
||||
datetime.fromisoformat(ep["timestamp"])
|
||||
if ep.get("timestamp") else datetime.now()
|
||||
)
|
||||
raw_episodes.append(RawEpisode(
|
||||
name=ep["name"],
|
||||
content=ep["content"],
|
||||
source_description=ep.get("source_description", ""),
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
))
|
||||
|
||||
kwargs = dict(
|
||||
bulk_episodes=raw_episodes,
|
||||
group_id=payload.get("group_id") or GROUP_ID,
|
||||
saga=payload.get("saga"),
|
||||
)
|
||||
if payload.get("custom_extraction_instructions") is not None:
|
||||
kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"]
|
||||
|
||||
result = await graphiti_instance.add_episode_bulk(**kwargs)
|
||||
|
||||
return {
|
||||
"type": "bulk",
|
||||
"episodes": len(result.episodes) if result and result.episodes else len(raw_episodes),
|
||||
"nodes": len(result.nodes) if result and result.nodes else 0,
|
||||
"edges": len(result.edges) if result and result.edges else 0,
|
||||
}
|
||||
|
||||
|
||||
async def _process_single_job(payload: dict, EpisodeType) -> dict:
|
||||
"""Run add_episode for a 'single' job. Payload mirrors EpisodeRequest."""
|
||||
ref_time = (
|
||||
datetime.fromisoformat(payload["timestamp"])
|
||||
if payload.get("timestamp") else datetime.now()
|
||||
)
|
||||
kwargs = dict(
|
||||
name=payload["name"],
|
||||
episode_body=payload["content"],
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
source_description=payload.get("source_description", ""),
|
||||
group_id=payload.get("group_id") or GROUP_ID,
|
||||
custom_extraction_instructions=payload.get("custom_extraction_instructions"),
|
||||
)
|
||||
if payload.get("saga") is not None:
|
||||
kwargs["saga"] = payload["saga"]
|
||||
|
||||
await graphiti_instance.add_episode(**kwargs)
|
||||
return {"type": "single", "episodes": 1}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lifespan & app
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
global graphiti_instance
|
||||
global graphiti_instance, worker_task
|
||||
|
||||
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
|
||||
log.info("Loading embedding and reranker models...")
|
||||
from st_embedder import SentenceTransformerEmbedder
|
||||
@@ -75,11 +385,51 @@ async def lifespan(app: FastAPI):
|
||||
max_coroutines=2,
|
||||
)
|
||||
await graphiti_instance.build_indices_and_constraints()
|
||||
|
||||
# PATCHED 2026-05-02: bridge the per-driver SearchOperations to the
|
||||
# search_interface attribute that search_utils.py dispatches on.
|
||||
# graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops
|
||||
# but never assigns it to driver.search_interface — naming mismatch
|
||||
# between the two halves of the codebase. Without this, search_utils.py
|
||||
# falls through to interpreted-Cypher cosine math (full-table scan) even
|
||||
# when our patched FalkorSearchOperations exists. Setting search_interface
|
||||
# activates the per-driver vector-index path.
|
||||
if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None:
|
||||
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
|
||||
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
|
||||
|
||||
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
|
||||
|
||||
# Recover any jobs left 'running' from a previous sidecar instance.
|
||||
# They become 'queued' again and the background worker picks them up.
|
||||
recovered = _startup_recovery()
|
||||
if recovered > 0:
|
||||
log.info(f"Startup recovery: reset {recovered} running job(s) to queued")
|
||||
|
||||
# Start the background job worker.
|
||||
worker_task = asyncio.create_task(background_worker())
|
||||
log.info("Sidecar ready — accepting job submissions on :8001")
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown: cancel worker, close graphiti.
|
||||
if worker_task is not None:
|
||||
worker_task.cancel()
|
||||
try:
|
||||
await worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await graphiti_instance.close()
|
||||
|
||||
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
|
||||
|
||||
app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Request models — preserved from v1.0 with no payload-shape changes. The
|
||||
# only API change is the response shape: instead of blocking until
|
||||
# graphiti-core returns, submission endpoints return a job_id immediately.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class BulkEpisodeItem(BaseModel):
|
||||
name: str
|
||||
@@ -92,6 +442,7 @@ class BulkEpisodeRequest(BaseModel):
|
||||
episodes: list[BulkEpisodeItem]
|
||||
group_id: str | None = None
|
||||
saga: str | None = None
|
||||
custom_extraction_instructions: str | None = None
|
||||
|
||||
|
||||
class EpisodeRequest(BaseModel):
|
||||
@@ -101,63 +452,78 @@ class EpisodeRequest(BaseModel):
|
||||
timestamp: str | None = None
|
||||
group_id: str | None = None
|
||||
custom_extraction_instructions: str | None = None
|
||||
saga: str | None = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Endpoints
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
|
||||
return {
|
||||
"ok": True,
|
||||
"provider": LLM_PROVIDER,
|
||||
"group": GROUP_ID,
|
||||
"sidecar": SIDECAR_NAME,
|
||||
"version": "2.0",
|
||||
}
|
||||
|
||||
@app.post("/episodes")
|
||||
async def add_episode(req: EpisodeRequest):
|
||||
g = await get_graphiti()
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
try:
|
||||
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
|
||||
await g.add_episode(
|
||||
name=req.name,
|
||||
episode_body=req.content,
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
source_description=req.source_description,
|
||||
group_id=req.group_id or GROUP_ID,
|
||||
custom_extraction_instructions=req.custom_extraction_instructions,
|
||||
)
|
||||
return {"ok": True}
|
||||
except Exception as e:
|
||||
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@app.post("/episodes/bulk")
|
||||
async def add_episodes_bulk(req: BulkEpisodeRequest):
|
||||
g = await get_graphiti()
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||
raw_episodes = []
|
||||
for ep in req.episodes:
|
||||
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
|
||||
raw_episodes.append(RawEpisode(
|
||||
name=ep.name,
|
||||
content=ep.content,
|
||||
source_description=ep.source_description,
|
||||
source=EpisodeType.text,
|
||||
reference_time=ref_time,
|
||||
))
|
||||
async def submit_bulk(req: BulkEpisodeRequest):
|
||||
"""Submit a bulk ingest job. Returns job_id for polling.
|
||||
|
||||
Job is processed serially by the sidecar's background worker; one
|
||||
bulk-or-single job at a time per graph. No HTTP read-timeout
|
||||
blocking. Submitter polls GET /jobs/{job_id} until terminal status.
|
||||
"""
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
|
||||
job_id = str(uuid.uuid4())
|
||||
payload = req.model_dump()
|
||||
try:
|
||||
result = await g.add_episode_bulk(
|
||||
bulk_episodes=raw_episodes,
|
||||
group_id=req.group_id or GROUP_ID,
|
||||
saga=req.saga or None,
|
||||
)
|
||||
return {"ok": True, "count": len(raw_episodes)}
|
||||
_job_insert(job_id, "bulk", payload)
|
||||
except Exception as e:
|
||||
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||
|
||||
return {"job_id": job_id, "status": "queued"}
|
||||
|
||||
|
||||
@app.post("/episodes")
|
||||
async def submit_single(req: EpisodeRequest):
|
||||
"""Submit a single-episode ingest job. Returns job_id for polling."""
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
|
||||
job_id = str(uuid.uuid4())
|
||||
payload = req.model_dump()
|
||||
try:
|
||||
_job_insert(job_id, "single", payload)
|
||||
except Exception as e:
|
||||
log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
|
||||
|
||||
return {"job_id": job_id, "status": "queued"}
|
||||
|
||||
|
||||
@app.get("/jobs/{job_id}")
|
||||
async def get_job(job_id: str):
|
||||
"""Poll a job's status. Returns 404 if job not found."""
|
||||
job = _job_get(job_id)
|
||||
if job is None:
|
||||
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
|
||||
return job
|
||||
|
||||
|
||||
@app.get("/search")
|
||||
async def search(query: str, limit: int = 8, group_id: str | None = None):
|
||||
g = await get_graphiti()
|
||||
if graphiti_instance is None:
|
||||
raise HTTPException(status_code=503, detail="Graphiti not initialized")
|
||||
try:
|
||||
results = await g.search(
|
||||
results = await graphiti_instance.search(
|
||||
query=query,
|
||||
num_results=limit,
|
||||
group_ids=[group_id or GROUP_ID],
|
||||
@@ -178,6 +544,7 @@ async def search(query: str, limit: int = 8, group_id: str | None = None):
|
||||
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
|
||||
|
||||
+144
-22
@@ -1,12 +1,19 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Stage 2 Worker — Taxonomy-Free Mistral Orientation
|
||||
Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3.
|
||||
Runs as systemd service: aaronai-stage2.service
|
||||
Stage 2 Worker — Taxonomy-Free Mistral Orientation + State-Type Classification
|
||||
|
||||
Polls stage_2_queue, runs Mistral pass that produces:
|
||||
(a) orientation context (active frames, frame relationships, extraction focus)
|
||||
(b) state-type classification for Stage 3 routing (current/reference/historical,
|
||||
supersedes_prior_state boolean, confidence, rationale)
|
||||
|
||||
Enqueues Stage 3 with both concerns as explicit columns.
|
||||
|
||||
Routing:
|
||||
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
|
||||
- char_length >= 2000 → enqueue Stage 3 with orientation metadata
|
||||
- char_length >= 2000 → enqueue Stage 3 with orientation + routing metadata
|
||||
|
||||
Runs as systemd service: aaronai-stage2.service
|
||||
"""
|
||||
|
||||
import os, json, time, subprocess, logging, requests
|
||||
@@ -33,22 +40,68 @@ CHAR_LENGTH_THRESHOLD = 2000
|
||||
REQUEST_TIMEOUT = 300
|
||||
RETRY_ATTEMPTS = 2
|
||||
POLL_INTERVAL = 5
|
||||
WORKER_VERSION = "2.1"
|
||||
WORKER_VERSION = "2.2"
|
||||
|
||||
# Valid values for state-type fields. Mistral output validated against these;
|
||||
# anything outside falls through to safe-cheap defaults (bulk routing).
|
||||
VALID_STATE_TYPES = ("current", "reference", "historical")
|
||||
VALID_CONFIDENCE = ("low", "medium", "high")
|
||||
|
||||
# Safe-cheap defaults applied when Mistral output is missing or malformed.
|
||||
# All route to bulk pathway (no temporal invalidation cost) per Phase A
|
||||
# routing rule: route to single-episode only on supersedes_prior_state=true
|
||||
# AND confidence in {medium, high}.
|
||||
DEFAULT_STATE_TYPE = "reference"
|
||||
DEFAULT_CONFIDENCE = "low"
|
||||
DEFAULT_SUPERSEDES = False
|
||||
DEFAULT_RATIONALE = "mistral output missing or malformed; default applied"
|
||||
|
||||
TAXFREE_PROMPT = (
|
||||
"You are a metadata extraction system. Given a document, describe its content "
|
||||
"shape for use as orientation context in a knowledge graph extraction pass.\n\n"
|
||||
"Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n"
|
||||
"Instead, describe:\n"
|
||||
"- What domains or frames are active in this content (there may be several simultaneously)\n"
|
||||
"- How those frames relate to each other in this specific document\n"
|
||||
"- What kind of relational content a knowledge graph extractor should look for\n\n"
|
||||
"Output JSON only. No prose, no explanation, no markdown.\n\n"
|
||||
"Schema:\n"
|
||||
"You are a metadata extraction system. Given a document, produce a JSON object "
|
||||
"describing two distinct concerns about the document. Output JSON only — no prose, "
|
||||
"no explanation, no markdown.\n\n"
|
||||
|
||||
"CONCERN 1 — ORIENTATION CONTEXT (for downstream knowledge-graph extraction):\n"
|
||||
"Describe the content shape. Do not summarize content. Do not extract entities. "
|
||||
"Do not assign a single category label.\n"
|
||||
" - active_frames: which domains or frames are active in this content (there may "
|
||||
"be several simultaneously)\n"
|
||||
" - frame_relationships: how those frames relate in this specific document, "
|
||||
"one sentence\n"
|
||||
" - extraction_orientation: what kind of relational content a knowledge-graph "
|
||||
"extractor should look for, one sentence\n"
|
||||
" - one_sentence_summary: a single-sentence content summary\n\n"
|
||||
|
||||
"CONCERN 2 — STATE-TYPE CLASSIFICATION (for ingest routing):\n"
|
||||
"Classify the document's relationship to time and prior facts. This is independent "
|
||||
"of orientation: a document can be in a 'reference frame' (orientation) while "
|
||||
"describing 'current state' (state-type), or vice versa. Judge the document's "
|
||||
"ROLE, not its topic.\n"
|
||||
" - state_type: one of\n"
|
||||
" 'current' — describes the author's present state, recent decisions, or "
|
||||
"ongoing situations as of the document's date\n"
|
||||
" 'reference' — timeless or slow-changing material: external books, "
|
||||
"documentation, technical reference, conceptual writing\n"
|
||||
" 'historical' — describes past events, prior states, or archived material "
|
||||
"the author is recording but not living in\n"
|
||||
" - state_type_confidence: 'low' | 'medium' | 'high' — how confident you are in "
|
||||
"the classification. Use 'low' when genuinely uncertain.\n"
|
||||
" - supersedes_prior_state: true if this document describes facts that should "
|
||||
"REPLACE previously-recorded facts about the same subjects (e.g. a journal entry "
|
||||
"saying 'I no longer work at X', a status update, a corrected belief). false "
|
||||
"otherwise. Default to false when uncertain.\n"
|
||||
" - state_type_rationale: one sentence explaining the classification\n\n"
|
||||
|
||||
"Output schema (flat, all eight fields at the top level):\n"
|
||||
'{"active_frames": ["<frame 1>", "<frame 2>"], '
|
||||
'"frame_relationships": "<one sentence>", '
|
||||
'"extraction_orientation": "<one sentence>", '
|
||||
'"one_sentence_summary": "<one sentence>"}\n\n'
|
||||
'"one_sentence_summary": "<one sentence>", '
|
||||
'"state_type": "current|reference|historical", '
|
||||
'"state_type_confidence": "low|medium|high", '
|
||||
'"supersedes_prior_state": true|false, '
|
||||
'"state_type_rationale": "<one sentence>"}\n\n'
|
||||
|
||||
"Document:\n"
|
||||
)
|
||||
|
||||
@@ -100,6 +153,38 @@ def run_mistral(doc_text):
|
||||
return {"error": "parse_failed", "raw": raw[:200]}
|
||||
|
||||
|
||||
def normalize_state_fields(meta):
|
||||
"""Validate and normalize the four state-type fields from Mistral output.
|
||||
Anything missing or malformed falls through to safe-cheap defaults that
|
||||
route to bulk pathway (no temporal invalidation work)."""
|
||||
|
||||
raw_state_type = meta.get("state_type")
|
||||
if isinstance(raw_state_type, str) and raw_state_type.lower() in VALID_STATE_TYPES:
|
||||
state_type = raw_state_type.lower()
|
||||
else:
|
||||
state_type = DEFAULT_STATE_TYPE
|
||||
|
||||
raw_conf = meta.get("state_type_confidence")
|
||||
if isinstance(raw_conf, str) and raw_conf.lower() in VALID_CONFIDENCE:
|
||||
confidence = raw_conf.lower()
|
||||
else:
|
||||
confidence = DEFAULT_CONFIDENCE
|
||||
|
||||
raw_supersedes = meta.get("supersedes_prior_state")
|
||||
if isinstance(raw_supersedes, bool):
|
||||
supersedes = raw_supersedes
|
||||
else:
|
||||
supersedes = DEFAULT_SUPERSEDES
|
||||
|
||||
raw_rationale = meta.get("state_type_rationale")
|
||||
if isinstance(raw_rationale, str) and raw_rationale.strip():
|
||||
rationale = raw_rationale.strip()[:1000]
|
||||
else:
|
||||
rationale = DEFAULT_RATIONALE
|
||||
|
||||
return state_type, confidence, supersedes, rationale
|
||||
|
||||
|
||||
def build_orientation(meta):
|
||||
frames = ", ".join(meta.get("active_frames", []))
|
||||
rel = meta.get("frame_relationships", "")
|
||||
@@ -108,20 +193,46 @@ def build_orientation(meta):
|
||||
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
|
||||
|
||||
|
||||
def enqueue_stage3(pg, source, full_text, orientation, metadata):
|
||||
def enqueue_stage3(pg, source, full_text, orientation, metadata,
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale):
|
||||
"""Write Stage 3 queue row with orientation + explicit routing columns.
|
||||
|
||||
Routing columns (state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale) are first-class queue properties for Phase A.
|
||||
Stage 3 reads them on every dequeue to choose bulk vs single-episode pathway.
|
||||
The full Mistral metadata blob is also retained in stage2_metadata JSON for
|
||||
debugging and future cycle work."""
|
||||
cur = pg.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
INSERT INTO stage_3_queue (
|
||||
source, full_text, orientation, stage2_metadata,
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale
|
||||
)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (source) DO UPDATE SET
|
||||
full_text = EXCLUDED.full_text,
|
||||
orientation = EXCLUDED.orientation,
|
||||
stage2_metadata = EXCLUDED.stage2_metadata,
|
||||
state_type = EXCLUDED.state_type,
|
||||
state_type_confidence = EXCLUDED.state_type_confidence,
|
||||
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
|
||||
state_type_rationale = EXCLUDED.state_type_rationale,
|
||||
enqueued_at = NOW(),
|
||||
-- Reset all run-state fields on re-enqueue. Without this,
|
||||
-- stale started_at from a prior attempt makes the row
|
||||
-- invisible to the Stage 3 worker's claim filter (which
|
||||
-- typically uses started_at IS NULL).
|
||||
started_at = NULL,
|
||||
completed_at = NULL,
|
||||
failed_at = NULL,
|
||||
failure_reason = NULL,
|
||||
external_job_id = NULL,
|
||||
attempts = 0
|
||||
""", (source, full_text, orientation, json.dumps(metadata)))
|
||||
""", (source, full_text, orientation, json.dumps(metadata),
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale))
|
||||
pg.commit()
|
||||
|
||||
|
||||
@@ -144,7 +255,7 @@ def process_one(row):
|
||||
return True
|
||||
|
||||
# Run Mistral
|
||||
log.info(f" Running Mistral taxonomy-free pass...")
|
||||
log.info(f" Running Mistral taxonomy-free + state-type pass...")
|
||||
try:
|
||||
meta = run_mistral(full_text)
|
||||
except requests.exceptions.Timeout:
|
||||
@@ -177,14 +288,25 @@ def process_one(row):
|
||||
frames = meta.get("active_frames", [])
|
||||
log.info(f" Frames: {frames}")
|
||||
|
||||
# Normalize state-type fields with safe-cheap defaults on malformed output.
|
||||
# Note: Mistral may return valid orientation but malformed state-type;
|
||||
# we accept the orientation and default the routing rather than fail
|
||||
# the whole row, since defaults route to bulk (cheap, safe).
|
||||
state_type, confidence, supersedes, rationale = normalize_state_fields(meta)
|
||||
log.info(
|
||||
f" State-type: {state_type} (conf={confidence}, "
|
||||
f"supersedes={supersedes})"
|
||||
)
|
||||
|
||||
orientation = build_orientation(meta)
|
||||
meta["_model"] = "mistral:latest"
|
||||
meta["_worker_version"] = WORKER_VERSION
|
||||
meta["_generated_at"] = datetime.now().isoformat()
|
||||
meta["char_length"] = char_length
|
||||
|
||||
# Enqueue Stage 3
|
||||
enqueue_stage3(pg, source, full_text, orientation, meta)
|
||||
# Enqueue Stage 3 with explicit routing columns
|
||||
enqueue_stage3(pg, source, full_text, orientation, meta,
|
||||
state_type, confidence, supersedes, rationale)
|
||||
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
+319
-34
@@ -1,22 +1,58 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation
|
||||
Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti.
|
||||
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
|
||||
+ Encoder Instructions (v1.0)
|
||||
|
||||
Chunking rationale: Large documents sent as single episodes cause FalkorDB
|
||||
write lock contention during entity deduplication. Chunking at ~500 words
|
||||
(matching Stage 1) produces smaller deduplication passes that don't block.
|
||||
Each document's chunks are linked via Graphiti's saga mechanism, preserving
|
||||
document structure in the graph.
|
||||
Polls stage_3_queue, routes each row to one of two ingest pathways based on
|
||||
state-type classification produced by Stage 2:
|
||||
|
||||
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of
|
||||
17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents
|
||||
producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk
|
||||
commits, each tagged with the same saga value so Graphiti still links them.
|
||||
- BULK pathway (existing): supersedes_prior_state=false OR confidence=low
|
||||
OR routing fields missing. Fast, no temporal invalidation.
|
||||
|
||||
Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 —
|
||||
Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's
|
||||
consecutive_failures pattern with sidecar restart on threshold.
|
||||
- SINGLE-EPISODE pathway (new): supersedes_prior_state=true AND
|
||||
confidence in {medium, high}. Per-chunk POST to /episodes with shared
|
||||
saga tag, full edge invalidation, per-chunk timeout/retry independence.
|
||||
|
||||
Both pathways pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar via
|
||||
custom_extraction_instructions, which graphiti-core inserts into entity
|
||||
and edge extraction prompts (NOT dedup prompts — that's intentional under
|
||||
the encoder-stays-naive commitment).
|
||||
|
||||
Architectural posture: the encoder is content-naïve. It does not draw on
|
||||
prior knowledge of the user, the substrate, or the cycle's accumulated
|
||||
work. Schema and personality live in the cycle's consolidated substrate,
|
||||
where the dream phase shapes them. The encoder produces source-grounded
|
||||
ground truth for the cycle to work from. See EXTRACTION_INSTRUCTIONS_V1
|
||||
below for the extraction guidance text.
|
||||
|
||||
Routing rationale: the single-episode pathway is the correct API per
|
||||
graphiti-core's docs for content that supersedes prior facts (it does
|
||||
edge invalidation that bulk skips). It costs more per chunk because of
|
||||
the resolve_edge LLM call; the routing rule keeps that cost bounded to
|
||||
content that actually needs it.
|
||||
|
||||
Chunking rationale (preserved from prior versions): Large documents sent
|
||||
as single episodes cause FalkorDB write lock contention during entity
|
||||
deduplication. Chunking at ~500 words (matching Stage 1) produces smaller
|
||||
deduplication passes that don't block. Each document's chunks are linked
|
||||
via Graphiti's saga mechanism, preserving document structure in the graph.
|
||||
|
||||
Per-chunk heartbeat: single-episode pathway updates stage_3_queue.started_at
|
||||
after each successful chunk POST so a long-running document doesn't cross
|
||||
the 10-minute stale threshold mid-process and get re-dequeued by another
|
||||
worker (or the same worker on next loop iteration). started_at thus means
|
||||
"last activity timestamp" rather than "began at" — semantics that match
|
||||
the dequeue query's intent (catch dead workers, not slow ones).
|
||||
|
||||
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed bulk
|
||||
sagas of 17 and 19 chunks deadlock the sidecar's Python-side coordination.
|
||||
Documents producing more than MAX_CHUNKS_PER_SAGA chunks on the bulk
|
||||
pathway are split into multiple bulk commits, each tagged with the same
|
||||
saga value so Graphiti still links them. The single-episode pathway
|
||||
doesn't need this split since each chunk is its own POST.
|
||||
|
||||
Wedge detection: mirrors Stage 2's consecutive_failures pattern with
|
||||
sidecar restart on threshold.
|
||||
|
||||
Runs as systemd service: aaronai-stage3.service
|
||||
"""
|
||||
@@ -44,17 +80,104 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
|
||||
RETRY_ATTEMPTS = 2
|
||||
POLL_INTERVAL = 5
|
||||
INGEST_TIMEOUT = 600
|
||||
WORKER_VERSION = "2.2"
|
||||
WORKER_VERSION = "2.4"
|
||||
|
||||
# Match Stage 1 chunking parameters
|
||||
CHUNK_SIZE_WORDS = 500
|
||||
CHUNK_OVERLAP_WORDS = 50
|
||||
# Documents under this threshold ingested as single episode (no chunking overhead)
|
||||
SINGLE_EPISODE_THRESHOLD = 1500
|
||||
# Sagas larger than this many chunks split into multiple commits
|
||||
# Bulk-pathway sagas larger than this many chunks split into multiple commits
|
||||
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
|
||||
MAX_CHUNKS_PER_SAGA = 10
|
||||
|
||||
# Routing rule: single-episode pathway requires both signals positive.
|
||||
# Anything else (false, NULL, low confidence) routes to bulk — the
|
||||
# safer-cheaper default. Mistral parse drift can't accidentally trigger
|
||||
# the expensive pathway.
|
||||
HIGH_TRUST_CONFIDENCE = ("medium", "high")
|
||||
|
||||
# Encoder extraction guidance v1.0 — see module docstring for posture rationale.
|
||||
# Passed to graphiti-core via custom_extraction_instructions on both ingest
|
||||
# pathways. Inserted into entity-extraction and edge-extraction prompts only;
|
||||
# does NOT enter dedup prompts. Encoder-stays-naïve commitment is structural,
|
||||
# not versioned: this text gets refined over time but the encoder does not
|
||||
# acquire substrate context as the cycle matures.
|
||||
EXTRACTION_INSTRUCTIONS_V1 = """\
|
||||
EXTRACTION GUIDANCE — BirdAI cascade
|
||||
|
||||
The encoder's job is faithful capture from this chunk's text. It does
|
||||
not draw on prior knowledge of the user, the substrate, or the cycle's
|
||||
accumulated work. Schema, personality, and inferred context live in
|
||||
the cycle's consolidated substrate, where the dream phase shapes them
|
||||
through prediction-error replay and speculation. The encoder stays
|
||||
content-naïve so the cycle has source-grounded ground truth to work
|
||||
from.
|
||||
|
||||
The orientation produced by an upstream pass describes content shape,
|
||||
not content interpretation. Use it as forward-facing guidance for what
|
||||
to attend to in this document. Do not let it bound or limit what you
|
||||
extract.
|
||||
|
||||
PREDICATE NAMING
|
||||
|
||||
Produce semantic predicates that describe the actual relationship the
|
||||
text states. Use verbs or verb phrases — "wrote", "advised", "founded",
|
||||
"works at", "led to", "contradicts", "is autobiographical to" — not
|
||||
generic placeholders. Reserve generic forms (for example, "relates to"
|
||||
or "mentions") for cases where the text genuinely does not specify a
|
||||
more particular relationship. The verb is the load-bearing part of
|
||||
the fact; preserving it is what makes the relationship queryable later.
|
||||
|
||||
EXTRACTION POSTURE
|
||||
|
||||
Extract from this chunk's text as if each entity is encountered fresh.
|
||||
Do not try to reconcile entities you find here with entities that
|
||||
might already exist elsewhere in the graph. Redundant entity instances
|
||||
are acceptable. Cross-document entity resolution is downstream cycle
|
||||
work, not extraction work.
|
||||
|
||||
When the same entity appears multiple times within this chunk with
|
||||
slightly different spellings — a common artifact of voice transcription —
|
||||
prefer the more frequent or more canonical-looking form. Do not invent
|
||||
canonical forms; choose among the variants the text actually contains.
|
||||
|
||||
EXTRACT FROM THE SOURCE
|
||||
|
||||
Extract relationships the text states or strongly implies through
|
||||
direct linguistic markers ("X led to Y", "X works for Y", "X met Y at
|
||||
Z"). Do not extend extraction to relationships the text neither states
|
||||
nor directly implies. Inferred relationships are produced by the
|
||||
cycle's dream phase as speculative edges with explicit low-confidence
|
||||
tagging, where they can be evaluated and either ratified or pruned by
|
||||
subsequent cycle work. Encoding-time inference, mixed in with source-
|
||||
grounded extraction, would lose the speculation/source distinction the
|
||||
cycle's consolidation work relies on.
|
||||
|
||||
DO NOT PRE-EMPT CYCLE WORK
|
||||
|
||||
Do not omit relationships because they seem redundant with prior
|
||||
extractions or with the existing graph. Cross-document entity
|
||||
resolution and edge consolidation are downstream cycle operations;
|
||||
redundant extraction at this stage is intentional. Extracting the
|
||||
same fact from multiple sources gives the cycle's consolidation work
|
||||
the recurrence signal it relies on.
|
||||
|
||||
EXTRACTION DEPTH
|
||||
|
||||
Use the orientation's frame_relationships and extraction_orientation
|
||||
fields to inform what to attend to. If the orientation describes
|
||||
cross-domain relational content, look for relationships that bridge
|
||||
those domains explicitly, with named predicates for the bridging.
|
||||
If the orientation describes single-domain technical content, look
|
||||
for the structural relationships internal to that domain.
|
||||
|
||||
Extract every entity and every relationship the text states. Do not
|
||||
summarize, do not filter, do not omit content because it seems
|
||||
incidental. The orientation tells you what to look for; the source
|
||||
text tells you what is there.
|
||||
"""
|
||||
|
||||
|
||||
def get_pg():
|
||||
return psycopg2.connect(PG_DSN)
|
||||
@@ -109,6 +232,22 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
|
||||
return chunks
|
||||
|
||||
|
||||
def heartbeat_row(row_id):
|
||||
"""Refresh stage_3_queue.started_at to NOW() so a long-running single-episode
|
||||
ingest doesn't cross the 10-minute stale threshold mid-process. Called
|
||||
after each successful chunk POST. Best-effort: failures are logged but
|
||||
don't fail the chunk — the worst case is a stale-threshold re-dequeue,
|
||||
which graphiti's dedup will handle as a no-op."""
|
||||
try:
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute("UPDATE stage_3_queue SET started_at = NOW() WHERE id = %s", (row_id,))
|
||||
pg.commit()
|
||||
pg.close()
|
||||
except Exception as e:
|
||||
log.warning(f" Heartbeat update failed (continuing): {e}")
|
||||
|
||||
|
||||
def post_bulk(payload, batch_label=""):
|
||||
"""Single POST to /episodes/bulk with consistent error handling."""
|
||||
resp = requests.post(
|
||||
@@ -122,16 +261,34 @@ def post_bulk(payload, batch_label=""):
|
||||
return resp.json()
|
||||
|
||||
|
||||
def ingest_to_graphiti(source, full_text, orientation):
|
||||
"""
|
||||
Ingest document to Graphiti as chunked episodes linked by saga.
|
||||
def post_episode(payload, episode_label=""):
|
||||
"""Single POST to /episodes (singular) with consistent error handling.
|
||||
Used by the single-episode pathway, one call per chunk."""
|
||||
resp = requests.post(
|
||||
f"{GRAPHITI_URL}/episodes",
|
||||
json=payload,
|
||||
timeout=INGEST_TIMEOUT
|
||||
)
|
||||
if not resp.ok:
|
||||
prefix = f"{episode_label} " if episode_label else ""
|
||||
raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}")
|
||||
return resp.json()
|
||||
|
||||
|
||||
def ingest_bulk(source, full_text, orientation):
|
||||
"""
|
||||
Bulk-pathway ingest: documents that don't supersede prior state.
|
||||
Skips edge invalidation. Cheap. Three sub-paths by document size:
|
||||
|
||||
Three paths:
|
||||
- Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
|
||||
[note: 'single episode' here means one bulk call with one item, NOT
|
||||
the single-episode-pathway; naming overlap is unfortunate but local]
|
||||
- Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
|
||||
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
||||
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag
|
||||
so Graphiti links them as one document unit
|
||||
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
|
||||
tag so Graphiti links them as one document unit
|
||||
|
||||
All three sub-paths pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||
"""
|
||||
char_length = len(full_text)
|
||||
|
||||
@@ -142,8 +299,12 @@ def ingest_to_graphiti(source, full_text, orientation):
|
||||
"source_description": orientation,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}]
|
||||
log.info(f" Single episode ({char_length} chars)")
|
||||
return post_bulk({"episodes": episodes, "group_id": "aaron"})
|
||||
log.info(f" [bulk] Single episode ({char_length} chars)")
|
||||
return post_bulk({
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
})
|
||||
|
||||
chunks = chunk_text(full_text)
|
||||
total_chunks = len(chunks)
|
||||
@@ -158,15 +319,18 @@ def ingest_to_graphiti(source, full_text, orientation):
|
||||
}
|
||||
for i, chunk in enumerate(chunks)
|
||||
]
|
||||
log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)")
|
||||
return post_bulk(
|
||||
{"episodes": episodes, "group_id": "aaron", "saga": source}
|
||||
)
|
||||
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
|
||||
return post_bulk({
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
})
|
||||
|
||||
# Large document: split into batches sharing the same saga tag
|
||||
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
||||
log.info(
|
||||
f" Chunked into {total_chunks} episodes ({char_length} chars); "
|
||||
f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars); "
|
||||
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
|
||||
)
|
||||
last_result = None
|
||||
@@ -186,16 +350,126 @@ def ingest_to_graphiti(source, full_text, orientation):
|
||||
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
|
||||
log.info(f" {batch_label} starting")
|
||||
last_result = post_bulk(
|
||||
{"episodes": episodes, "group_id": "aaron", "saga": source},
|
||||
{
|
||||
"episodes": episodes,
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
},
|
||||
batch_label=batch_label,
|
||||
)
|
||||
log.info(f" {batch_label} committed")
|
||||
return last_result
|
||||
|
||||
|
||||
def ingest_single_episode(row_id, source, full_text, orientation):
|
||||
"""
|
||||
Single-episode pathway: documents that supersede prior state with
|
||||
medium-or-high confidence. Each chunk is its own POST to /episodes
|
||||
with shared saga tag. Each call independent: own timeout, own retry
|
||||
envelope, own failure semantics.
|
||||
|
||||
Each chunk POST passes EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
|
||||
|
||||
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
|
||||
stay committed (graphiti has already accepted them) and the function
|
||||
raises with detail about which chunk failed and how many succeeded.
|
||||
The caller marks the row failed_at with that detail; the operator
|
||||
decides whether to re-enqueue. Re-ingestion will re-POST chunks 1..N-1
|
||||
against the graph; graphiti's dedup will handle them as no-ops.
|
||||
|
||||
Heartbeats stage_3_queue.started_at after each successful chunk so the
|
||||
row doesn't cross the 10-minute stale threshold while actively progressing.
|
||||
"""
|
||||
char_length = len(full_text)
|
||||
|
||||
# Short documents: one POST, no chunking, no saga
|
||||
if char_length < SINGLE_EPISODE_THRESHOLD:
|
||||
payload = {
|
||||
"name": source,
|
||||
"content": full_text,
|
||||
"source_description": orientation,
|
||||
"group_id": "aaron",
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
}
|
||||
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
|
||||
return post_episode(payload, episode_label="single-ep")
|
||||
|
||||
chunks = chunk_text(full_text)
|
||||
total_chunks = len(chunks)
|
||||
log.info(
|
||||
f" [single-ep] Chunked into {total_chunks} episodes ({char_length} chars); "
|
||||
f"per-chunk POSTs with shared saga"
|
||||
)
|
||||
|
||||
succeeded = 0
|
||||
for i, chunk in enumerate(chunks):
|
||||
chunk_num = i + 1
|
||||
payload = {
|
||||
"name": f"{source} [{chunk_num}/{total_chunks}]",
|
||||
"content": chunk,
|
||||
"source_description": orientation,
|
||||
"group_id": "aaron",
|
||||
"saga": source,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
|
||||
}
|
||||
try:
|
||||
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
|
||||
succeeded += 1
|
||||
log.info(f" chunk {chunk_num}/{total_chunks} committed")
|
||||
heartbeat_row(row_id)
|
||||
except Exception as e:
|
||||
# Annotate the exception with partial-success detail so the
|
||||
# caller can write a clean failure_reason. Re-raise to abort
|
||||
# the document; previously-committed chunks stay in the graph.
|
||||
raise RuntimeError(
|
||||
f"single_episode_partial: chunk {chunk_num}/{total_chunks} failed "
|
||||
f"(succeeded: {succeeded}); error: {str(e)[:300]}"
|
||||
) from e
|
||||
|
||||
log.info(f" [single-ep] All {total_chunks} chunks committed")
|
||||
return {"ok": True, "chunks_committed": total_chunks}
|
||||
|
||||
|
||||
def should_route_single_episode(supersedes_prior_state, state_type_confidence):
|
||||
"""Routing decision for Phase A.
|
||||
|
||||
Single-episode pathway requires BOTH:
|
||||
- supersedes_prior_state is true (Mistral judged it temporally superseding)
|
||||
- confidence is medium or high (Mistral was confident enough to trust)
|
||||
|
||||
Anything else routes to bulk: false supersedes, NULL fields (legacy rows
|
||||
pre-dating Stage 2 v2.2), low confidence even on supersedes=true. This
|
||||
is the safer-cheaper default — bulk skips temporal invalidation, which
|
||||
is the right behavior when we're not confident the content needs it.
|
||||
"""
|
||||
if not supersedes_prior_state:
|
||||
return False
|
||||
if state_type_confidence not in HIGH_TRUST_CONFIDENCE:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def process_one(row):
|
||||
row_id, source, full_text, orientation = row
|
||||
log.info(f"Ingesting to Graphiti: {source}")
|
||||
(row_id, source, full_text, orientation,
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale) = row
|
||||
|
||||
# Route decision
|
||||
use_single_episode = should_route_single_episode(
|
||||
supersedes_prior_state, state_type_confidence
|
||||
)
|
||||
pathway = "single-episode" if use_single_episode else "bulk"
|
||||
|
||||
log.info(
|
||||
f"Ingesting to Graphiti: {source} "
|
||||
f"[pathway={pathway}, state_type={state_type}, "
|
||||
f"conf={state_type_confidence}, supersedes={supersedes_prior_state}]"
|
||||
)
|
||||
if state_type_rationale:
|
||||
log.info(f" rationale: {state_type_rationale[:200]}")
|
||||
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
@@ -204,9 +478,16 @@ def process_one(row):
|
||||
(row_id,)
|
||||
)
|
||||
pg.commit()
|
||||
pg.close()
|
||||
|
||||
try:
|
||||
result = ingest_to_graphiti(source, full_text, orientation)
|
||||
if use_single_episode:
|
||||
result = ingest_single_episode(row_id, source, full_text, orientation)
|
||||
else:
|
||||
result = ingest_bulk(source, full_text, orientation)
|
||||
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
||||
pg.commit()
|
||||
pg.close()
|
||||
@@ -214,6 +495,8 @@ def process_one(row):
|
||||
return True
|
||||
except Exception as e:
|
||||
log.error(f" Graphiti ingest failed for {source}: {e}")
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute("""
|
||||
UPDATE stage_3_queue
|
||||
SET failed_at = NOW(), failure_reason = %s
|
||||
@@ -235,7 +518,9 @@ def run():
|
||||
pg = get_pg()
|
||||
cur = pg.cursor()
|
||||
cur.execute("""
|
||||
SELECT id, source, full_text, orientation
|
||||
SELECT id, source, full_text, orientation,
|
||||
state_type, state_type_confidence, supersedes_prior_state,
|
||||
state_type_rationale
|
||||
FROM stage_3_queue
|
||||
WHERE completed_at IS NULL
|
||||
AND failed_at IS NULL
|
||||
|
||||
Reference in New Issue
Block a user