Compare commits

...

5 Commits

Author SHA1 Message Date
aaron 7f07972109 stage2_worker: ON CONFLICT clause resets all run-state fields on re-enqueue
Bug: when a row in stage_3_queue gets re-enqueued (same source ingested
again after Stage 2 re-runs), the ON CONFLICT (source) DO UPDATE clause
updated content fields and reset enqueued_at, completed_at, failed_at,
attempts — but did not reset started_at, failure_reason, or
external_job_id.

Stale started_at from a prior attempt makes the row invisible to the
Stage 3 worker's claim filter (which uses started_at IS NULL). The row
sits queued forever; Stage 3 never picks it up; the source effectively
fails silently after a re-trigger.

Discovered tonight while testing the bulk pathway after the substrate
fix: a journal entry that had been ingested earlier (and manually marked
completed during recovery from a worker timeout) showed enqueued_at
from the new touch but started_at from the original 01:40 attempt. Fix
extends the upsert clause to NULL all run-state fields so re-enqueue
behaves as 'fresh attempt.'

After fix, re-triggered journal entry routed cleanly through Stage 2 →
Stage 3 → bulk pathway → sidecar bulk job → 60ms commit (worst-case
dedup against already-known content).
2026-05-02 05:20:14 +00:00
aaron f645b74b1c graphiti_service: v2.0 — Pattern 1 async job model + search_interface bridge
Major rewrite of the Graphiti sidecar. Two architectural changes:

PATTERN 1 ASYNC JOB MODEL

Submission and completion are decoupled. POST /episodes and
POST /episodes/bulk return job_id immediately; the actual graphiti-core
work happens in a background asyncio task. Submitters poll
GET /jobs/{job_id} until terminal status (committed | failed).

Why: tonight's smoke test confirmed that bulk ingest against the
4,222-entity graph was committing successfully even when the worker's
HTTP read-timeout fired. The synchronous interface was producing
false-negative failures — work succeeded but the worker stopped
listening at the 10-minute read-timeout. Three days of 'saga deadlock'
failures reframe as scaling pathology of unindexed similarity search,
not substrate deadlocks. Pattern 1 separates submission from completion
observation so the worker can't false-negative this way.

Architectural commitments:

- One in-flight job per sidecar (per graph). Concurrent jobs against
  the same graph would race on graphiti-core's bulk-resolve path (no
  transaction boundary). Concurrent multi-tenancy is 'run multiple
  sidecars,' not 'make one sidecar concurrency-safe across graphs.'

- Postgres-backed job state. Survives sidecar restart. On startup the
  sidecar resets any 'running' rows to 'queued' (their previous run
  died); the background worker picks them up naturally.

- Both endpoints async-shaped for parity. Bulk pathway preserved —
  load-bearing for first-run corpus migration. Single-episode
  preserved — load-bearing for state-superseding content per the
  Stage 2/3 routing rule. graphiti-core's add_episode and
  add_episode_bulk are unchanged underneath; the async wrapper sits
  between the HTTP layer and the library call.

- Polling cadence: 2s flat at the worker, FOR UPDATE SKIP LOCKED so
  the design is safe for future multi-sidecar deployment without
  changes.

Postgres helpers (_pg, _job_insert, _job_get, _job_claim_next,
_job_complete, _job_fail, _startup_recovery) replace the synchronous
graphiti.add_episode call with persistent job state. Background worker
loop catches everything, logs everything, never dies from an unexpected
error.

SEARCH_INTERFACE BRIDGE

graphiti-core 0.29.0 builds FalkorSearchOperations as
driver._search_ops in FalkorDriver.__init__ but never assigns it to
driver.search_interface. search_utils.py:edge_similarity_search and
node_similarity_search check 'if driver.search_interface:' and
delegate when present, falling through to interpreted-Cypher cosine
math when not. The naming mismatch between the two halves of
graphiti-core means the per-driver implementation never gets used.

Bridge after Graphiti instance construction:
  driver.search_interface = driver._search_ops

This activates the per-driver path which (with our vendored patches)
uses db.idx.vector.queryNodes for FalkorDB's native vector index.
Empirical result: single-episode add_episode against a 4,277-entity
graph went from indefinite hang to 8.2 seconds.

The bridge is also a candidate for an upstream PR — pick one name and
stick to it across the codebase. Tonight it's local.
2026-05-02 05:19:46 +00:00
aaron c0e6159b5e graphiti_patches: vendored FalkorDB vector index support for graphiti-core 0.29.0
Adds native FalkorDB vector index support to graphiti-core's FalkorDB
driver. Three patched files (graph_queries.py, falkordb_driver.py,
falkordb/operations/search_ops.py) plus apply.sh that backs up venv
files and copies patches over.

Why this exists: graphiti-core 0.29.0 builds similarity queries using
interpreted Cypher cosine math (vec.cosineDistance) which produces a
full-table scan over Entity/RELATES_TO/Community nodes for every search.
At ~4,000+ entities, single-episode add_episode took 8+ minutes for the
resolve-against-existing-graph step and bulk ingest hung indefinitely.
FalkorDB itself supports db.idx.vector.queryNodes and queryRelationships
procedures backed by HNSW indexes; the driver just doesn't use them.

Patches:

1. graph_queries.py — adds get_vector_indices() returning CREATE VECTOR
   INDEX statements for FalkorDB (Entity.name_embedding,
   RELATES_TO.fact_embedding, Community.name_embedding). HNSW with
   cosine similarity. Adds VECTOR_INDEX_CANDIDATE_MULTIPLIER for
   over-fetch when WHERE filters reject some top-k results. Original
   get_vector_cosine_func_query preserved for fallback.

2. falkordb_driver.py — extends build_indices_and_constraints() to call
   get_vector_indices() alongside range and fulltext. Adds cache
   invalidation hook so the search_ops dispatcher re-probes for indexes
   after they're built.

3. falkordb/operations/search_ops.py — adds vector-index dispatcher
   helpers (_falkordb_vector_index_exists with module-level cache,
   _falkordb_vector_node_search_cypher, _falkordb_vector_edge_search_cypher).
   Rewrites the three vector-similarity call sites (Entity.name_embedding,
   RELATES_TO.fact_embedding, Community.name_embedding) to use
   db.idx.vector.queryNodes / queryRelationships when available, fall
   back to interpreted-Cypher cosine math when not. Index existence
   probed once per (label, attribute, entity_type) and cached.

Empirical result: single-episode add_episode against a 4,277-entity
graph went from indefinite hang to 8.2 seconds. Bulk re-ingest of
already-known content (worst case for entity dedup) committed in 60ms.

Activation requires bridging driver._search_ops to driver.search_interface
in the sidecar (see graphiti_service.py). graphiti-core declares
search_interface as the dispatcher attribute but never assigns the
per-driver implementation to it — naming mismatch in their internal
refactor. The bridge is one line in our sidecar's lifespan.

Upstream candidate: this is a known gap (referenced indirectly in
upstream issue #1263 RFC for external vector store overlay). Maintainers'
attention is on Milvus/Qdrant/Pinecone overlay; this is the FalkorDB-
native alternative for users who don't want to run a separate vector DB.
PR after empirical validation in production. Apache-2.0 graphiti-core
source is NOT vendored — backups/ is gitignored to keep the upstream
source out of this repo.
2026-05-02 05:19:01 +00:00
aaron d7b2a850c4 stage3_worker: v2.4 — encoder extraction instructions v1.0
Adds EXTRACTION_INSTRUCTIONS_V1 constant passed to the sidecar via
custom_extraction_instructions on both bulk and single-episode pathways.
graphiti-core inserts the text into entity and edge extraction prompts
only; it does NOT enter dedup prompts (that's the encoder-stays-naive
commitment).

Architectural posture: the encoder is content-naive. It does not draw on
prior knowledge of the user, the substrate, or the cycle's accumulated
work. Schema and personality live in the cycle's consolidated substrate
where the dream phase shapes them. The encoder produces source-grounded
ground truth for the cycle to work from.

Empirical validation in tonight's smoke test: 30+ verb-shaped predicates
from 3 chunks of real content, including IS_AUTOBIOGRAPHICAL_TO,
INFORMED_DESIGN_OF, EVALUATED_DOMAIN_PURITY, DISCONFIRMED_HYPOTHESIS_ABOUT.
Compare to default extraction's 4 predicate types across 22,289 edges.
RELATES_TO appears once as appropriate fallback rather than collapsing
everything generic.

Bumps WORKER_VERSION to 2.4.
2026-05-02 05:15:17 +00:00
aaron a0bf280075 Add Pattern 1 async job model migration
Adds graphiti_jobs table for sidecar's async ingest queue and
external_job_id column on stage_3_queue for worker's polling reference.

Tonight's smoke test diagnosed that bulk ingest against the 4,222-entity
graph commits successfully but the worker's 600s HTTP read-timeout fires
before the sidecar's response returns. Three days of 'saga deadlock'
failures were false negatives — the work succeeded; the worker just
stopped listening. Pattern 1 separates submission from completion
observation so the worker can't false-negative this way.

Migration only — sidecar and worker code changes follow in subsequent
commits.
2026-05-02 02:22:30 +00:00
10 changed files with 2328 additions and 75 deletions
+4
View File
@@ -0,0 +1,4 @@
# Local backups created by apply.sh — environment state, not source.
# Keeping these out of version control prevents repo bloat and avoids
# checking in graphiti-core's Apache-2.0 source under our repo's tree.
backups/
+58
View File
@@ -0,0 +1,58 @@
# graphiti-core Patches — FalkorDB Vector Index Support
Vendored patches against graphiti-core 0.29.0 adding native FalkorDB
vector index support. Three files modified, all under
`graphiti_core/driver/falkordb/` and `graphiti_core/graph_queries.py`.
No changes to Neo4j or Kuzu code paths.
## Why this exists
graphiti-core's FalkorDB driver uses interpreted Cypher cosine math
(`vec.cosineDistance(...)`) for similarity search. Each query becomes a
full table scan over Entity/RELATES_TO/Community nodes. At ~4,000+
entities, single-episode ingest's resolve-against-existing-graph step
takes 8+ minutes and bulk ingest hangs FalkorDB. FalkorDB itself
supports `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
procedures backed by HNSW indexes; graphiti-core's driver doesn't use
them.
These patches:
1. Add `get_vector_indices()` to `graph_queries.py` returning CREATE
VECTOR INDEX statements for FalkorDB on Entity.name_embedding,
RELATES_TO.fact_embedding, and Community.name_embedding.
2. Extend `falkordb_driver.py:build_indices_and_constraints()` to create
the vector indexes alongside range and fulltext indexes.
3. Rewrite the three vector-similarity call sites in
`falkordb/operations/search_ops.py` to use
`db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`
instead of full-scan cosine math. Over-fetches by a configurable
multiplier to handle filter rejections.
## Files
| Patched file | Source |
|---|---|
| `graphiti_core/graph_queries.py` | Adds `get_vector_indices()` |
| `graphiti_core/driver/falkordb/falkordb_driver.py` | Extends `build_indices_and_constraints` |
| `graphiti_core/driver/falkordb/operations/search_ops.py` | Three query rewrites |
## How to apply
`./apply.sh` — backs up the originals into `./backups/<timestamp>/`
and copies the patched files over.
## How to revert
Move the timestamped backup back over the venv:
cp backups/<ts>/graph_queries.py /home/aaron/aaronai/venv/lib/python3.12/site-packages/graphiti_core/graph_queries.py
# ...etc
## Upstream candidate
Documented gap (issue #1263 references it indirectly via vector store
overlay RFC). Maintainers' attention is on Milvus/external vector DB
overlay; this patch is the FalkorDB-native alternative for users who
don't want a separate vector DB. Consider PR after empirical validation
in production.
+77
View File
@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# apply.sh — Apply the BirdAI vendored graphiti-core patches.
#
# Backs up the original venv files into ./backups/<timestamp>/ before
# overwriting. The backup directory layout mirrors the venv layout so a
# revert is just a tree copy back.
#
# Usage: ./apply.sh
set -euo pipefail
PATCH_DIR="$(cd "$(dirname "$0")" && pwd)"
VENV_BASE="/home/aaron/aaronai/venv/lib/python3.12/site-packages"
TIMESTAMP="$(date +%Y%m%d-%H%M%S)"
BACKUP_DIR="$PATCH_DIR/backups/$TIMESTAMP"
# Files to patch — paths relative to graphiti_core/.
FILES=(
"graph_queries.py"
"driver/falkordb_driver.py"
"driver/falkordb/operations/search_ops.py"
)
echo "graphiti-core vendored patch apply — BirdAI"
echo "Patch directory: $PATCH_DIR"
echo "Venv target: $VENV_BASE/graphiti_core/"
echo "Backup to: $BACKUP_DIR"
echo
# Pre-flight: confirm all source patch files exist.
for rel in "${FILES[@]}"; do
if [ ! -f "$PATCH_DIR/graphiti_core/$rel" ]; then
echo "ERROR: missing patch file: $PATCH_DIR/graphiti_core/$rel" >&2
exit 1
fi
done
# Pre-flight: confirm all target venv files exist.
for rel in "${FILES[@]}"; do
if [ ! -f "$VENV_BASE/graphiti_core/$rel" ]; then
echo "ERROR: missing venv file: $VENV_BASE/graphiti_core/$rel" >&2
echo " graphiti-core may not be installed, or version differs from 0.29.0." >&2
exit 1
fi
done
# Backup originals.
echo "[1/3] Backing up originals..."
for rel in "${FILES[@]}"; do
backup_path="$BACKUP_DIR/graphiti_core/$rel"
mkdir -p "$(dirname "$backup_path")"
cp "$VENV_BASE/graphiti_core/$rel" "$backup_path"
echo " backed up: $rel"
done
echo
# Apply patches by copying.
echo "[2/3] Applying patches..."
for rel in "${FILES[@]}"; do
cp "$PATCH_DIR/graphiti_core/$rel" "$VENV_BASE/graphiti_core/$rel"
echo " patched: $rel"
done
echo
# Sanity check: confirm patched files have the marker.
echo "[3/3] Verifying patched files..."
for rel in "${FILES[@]}"; do
if grep -q "PATCHED 2026-05-02" "$VENV_BASE/graphiti_core/$rel"; then
echo " OK: $rel contains patch marker"
else
echo " WARNING: $rel missing patch marker (may be expected for graph_queries.py — its docstring uses the marker only in the module header)"
fi
done
echo
echo "Done. Backup: $BACKUP_DIR"
echo "Restart the sidecar to pick up changes:"
echo " sudo systemctl restart aaronai-graphiti.service"
@@ -0,0 +1,904 @@
"""
Copyright 2024, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from typing import Any
from graphiti_core.driver.driver import GraphProvider
from graphiti_core.driver.falkordb import STOPWORDS
from graphiti_core.driver.operations.search_ops import SearchOperations
from graphiti_core.driver.query_executor import QueryExecutor
from graphiti_core.driver.record_parsers import (
community_node_from_record,
entity_edge_from_record,
entity_node_from_record,
episodic_node_from_record,
)
from graphiti_core.edges import EntityEdge
from graphiti_core.graph_queries import (
get_nodes_query,
get_relationships_query,
get_vector_cosine_func_query,
)
from graphiti_core.models.edges.edge_db_queries import get_entity_edge_return_query
from graphiti_core.models.nodes.node_db_queries import (
COMMUNITY_NODE_RETURN,
EPISODIC_NODE_RETURN,
get_entity_node_return_query,
)
from graphiti_core.nodes import CommunityNode, EntityNode, EpisodicNode
from graphiti_core.search.search_filters import (
SearchFilters,
edge_search_filter_query_constructor,
node_search_filter_query_constructor,
)
logger = logging.getLogger(__name__)
MAX_QUERY_LENGTH = 128
# ---------------------------------------------------------------------------
# Vector index dispatcher (PATCHED 2026-05-02, BirdAI vendored patch).
#
# graphiti-core's FalkorDB driver historically composed similarity queries
# using `vec.cosineDistance(...)` in interpreted Cypher, which produces a
# full-table scan for every search. FalkorDB supports native vector indexes
# via `db.idx.vector.queryNodes` and `db.idx.vector.queryRelationships`;
# this dispatcher uses them when present and falls back to the cosine math
# otherwise.
#
# Index existence is checked once per (label, attribute, entity_type) and
# cached at module scope. The cache should be invalidated whenever
# `build_indices_and_constraints` runs (since indexes may have been created
# or dropped). FalkorDriver.build_indices_and_constraints is patched to
# call `_invalidate_falkordb_vector_index_cache()` after building.
#
# Over-fetch factor (VECTOR_INDEX_CANDIDATE_MULTIPLIER from graph_queries)
# preserves recall when WHERE filters reject some of the top-k candidates.
# ---------------------------------------------------------------------------
from graphiti_core.graph_queries import (
VECTOR_INDEX_CANDIDATE_MULTIPLIER,
get_vector_cosine_func_query,
)
# Cache: key = (label, attribute, entity_type), value = bool
# entity_type is 'NODE' or 'RELATIONSHIP'.
_FALKORDB_VECTOR_INDEX_CACHE: dict[tuple[str, str, str], bool] = {}
def _invalidate_falkordb_vector_index_cache() -> None:
"""Clear the vector-index existence cache. Call after build_indices_and_constraints."""
_FALKORDB_VECTOR_INDEX_CACHE.clear()
async def _falkordb_vector_index_exists(
executor: QueryExecutor,
label: str,
attribute: str,
entity_type: str,
) -> bool:
"""Check whether a FalkorDB vector index exists for the given target.
entity_type is 'NODE' for node-label indexes, 'RELATIONSHIP' for edge-type indexes.
Result is cached at module scope; call _invalidate_falkordb_vector_index_cache()
after building or dropping indexes.
"""
key = (label, attribute, entity_type)
if key in _FALKORDB_VECTOR_INDEX_CACHE:
return _FALKORDB_VECTOR_INDEX_CACHE[key]
try:
records, _, _ = await executor.execute_query(
"CALL db.indexes() YIELD label, properties, types, entitytype "
"RETURN label, properties, types, entitytype"
)
except Exception as e:
# If we cannot enumerate indexes, fall back to "no index" rather than
# propagating the error. The fallback cosine-math path is correct,
# just slower.
logger.warning(f"FalkorDB vector index probe failed; assuming none exist: {e}")
_FALKORDB_VECTOR_INDEX_CACHE[key] = False
return False
found = False
for r in records:
# Records come back as dict-like rows keyed by column name (not
# tuples). Access by string keys matching the YIELD clause above.
rec_label = r.get('label') if hasattr(r, 'get') else r['label']
rec_props = r.get('properties') if hasattr(r, 'get') else r['properties']
rec_types = r.get('types') if hasattr(r, 'get') else r['types']
rec_entitytype = r.get('entitytype') if hasattr(r, 'get') else r['entitytype']
if rec_props is None:
rec_props = []
if rec_types is None:
rec_types = {}
if rec_label != label:
continue
if rec_entitytype is not None and rec_entitytype != entity_type:
continue
if attribute not in rec_props:
continue
# rec_types is a dict like {attribute: ['VECTOR', ...], ...} or sometimes
# a flat list — handle both shapes.
if isinstance(rec_types, dict):
attr_types = rec_types.get(attribute, [])
else:
attr_types = rec_types
if 'VECTOR' in attr_types:
found = True
break
_FALKORDB_VECTOR_INDEX_CACHE[key] = found
return found
def _falkordb_vector_node_search_cypher(
label: str,
embedding_attr: str,
search_vector_param: str,
use_index: bool,
) -> tuple[str, str]:
"""Build the cypher prefix and node-binding for a node-vector search.
Returns (prefix, node_var) where:
- prefix is the Cypher fragment that binds the node variable and a
`score` variable. With index, it's a CALL ... YIELD; without, it's
a MATCH plus WITH cosine math.
- node_var is the variable name the caller's downstream Cypher should
reference (always 'n' here for parity with the existing code).
The caller appends WHERE filters and RETURN/ORDER BY/LIMIT as usual.
The over-fetch parameter `$candidate_k` must be passed by the caller
when use_index is True.
"""
if use_index:
return (
f"CALL db.idx.vector.queryNodes("
f"'{label}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
f") YIELD node, score "
f"WITH node AS n, score "
), "n"
# Fallback: original cosine math path
cosine = get_vector_cosine_func_query(
f"n.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
)
return (
f"MATCH (n:{label}) "
f"WITH n, {cosine} AS score "
), "n"
def _falkordb_vector_edge_search_cypher(
relationship_type: str,
embedding_attr: str,
search_vector_param: str,
use_index: bool,
) -> tuple[str, str]:
"""Build the cypher prefix and edge-binding for an edge-vector search.
Returns (prefix, edge_var). With the index, the procedure binds the
relationship variable; we then MATCH source and target via the existing
edge to recover (n)-[e]->(m). Without the index, it's the original
MATCH-and-cosine path.
Variable name is 'e' for parity with existing code; source/target are
'n' and 'm' respectively, also for parity.
"""
if use_index:
return (
f"CALL db.idx.vector.queryRelationships("
f"'{relationship_type}', '{embedding_attr}', $candidate_k, vecf32({search_vector_param})"
f") YIELD relationship, score "
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
f"WHERE e = relationship "
f"WITH DISTINCT e, n, m, score "
), "e"
# Fallback
cosine = get_vector_cosine_func_query(
f"e.{embedding_attr}", search_vector_param, GraphProvider.FALKORDB
)
return (
f"MATCH (n:Entity)-[e:{relationship_type}]->(m:Entity) "
f"WITH DISTINCT e, n, m, {cosine} AS score "
), "e"
# FalkorDB separator characters that break text into tokens
_SEPARATOR_MAP = str.maketrans(
{
',': ' ',
'.': ' ',
'<': ' ',
'>': ' ',
'{': ' ',
'}': ' ',
'[': ' ',
']': ' ',
'"': ' ',
"'": ' ',
':': ' ',
';': ' ',
'!': ' ',
'@': ' ',
'#': ' ',
'$': ' ',
'%': ' ',
'^': ' ',
'&': ' ',
'*': ' ',
'(': ' ',
')': ' ',
'-': ' ',
'+': ' ',
'=': ' ',
'~': ' ',
'?': ' ',
'|': ' ',
'/': ' ',
'\\': ' ',
}
)
def _sanitize(query: str) -> str:
"""Replace FalkorDB special characters with whitespace."""
sanitized = query.translate(_SEPARATOR_MAP)
return ' '.join(sanitized.split())
def _build_falkor_fulltext_query(
query: str,
group_ids: list[str] | None = None,
max_query_length: int = MAX_QUERY_LENGTH,
) -> str:
"""Build a fulltext query string for FalkorDB using RedisSearch syntax."""
if group_ids is None or len(group_ids) == 0:
group_filter = ''
else:
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
group_values = '|'.join(escaped_group_ids)
group_filter = f'(@group_id:{group_values})'
sanitized_query = _sanitize(query)
# Remove stopwords and empty tokens
query_words = sanitized_query.split()
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
sanitized_query = ' | '.join(filtered_words)
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
return ''
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
class FalkorSearchOperations(SearchOperations):
# --- Node search ---
async def node_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
get_nodes_query(
'node_name_and_summary', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ 'YIELD node AS n, score'
+ filter_query
+ """
WITH n, score
ORDER BY score DESC
LIMIT $limit
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
)
records, _, _ = await executor.execute_query(
cypher,
query=fuzzy_query,
limit=limit,
**filter_params,
)
return [entity_node_from_record(r) for r in records]
async def node_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[EntityNode]:
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index when available; fall back to interpreted-Cypher cosine math
# otherwise. The filter clause's position changes between paths
# (after MATCH for fallback, after YIELD for index path), but the
# filter expressions themselves are identical because they reference
# the bound variable `n` either way.
use_index = await _falkordb_vector_index_exists(
executor, 'Entity', 'name_embedding', 'NODE'
)
prefix, _ = _falkordb_vector_node_search_cypher(
'Entity', 'name_embedding', '$search_vector', use_index
)
where_clauses = []
if filter_query:
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**filter_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [entity_node_from_record(r) for r in records]
async def node_bfs_search(
self,
executor: QueryExecutor,
origin_uuids: list[str],
search_filter: SearchFilters,
max_depth: int,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityNode]:
if not origin_uuids or max_depth < 1:
return []
filter_queries, filter_params = node_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('n.group_id IN $group_ids')
filter_queries.append('origin.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' AND ' + (' AND '.join(filter_queries))
cypher = (
f"""
UNWIND $bfs_origin_node_uuids AS origin_uuid
MATCH (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(n:Entity)
WHERE n.group_id = origin.group_id
"""
+ filter_query
+ """
RETURN
"""
+ get_entity_node_return_query(GraphProvider.FALKORDB)
+ """
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
bfs_origin_node_uuids=origin_uuids,
limit=limit,
**filter_params,
)
return [entity_node_from_record(r) for r in records]
# --- Edge search ---
async def edge_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityEdge]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
get_relationships_query(
'edge_name_and_fact', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD relationship AS rel, score
MATCH (n:Entity)-[e:RELATES_TO {uuid: rel.uuid}]->(m:Entity)
"""
+ filter_query
+ """
WITH e, score, n, m
RETURN
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
query=fuzzy_query,
limit=limit,
**filter_params,
)
return [entity_edge_from_record(r) for r in records]
async def edge_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
source_node_uuid: str | None,
target_node_uuid: str | None,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[EntityEdge]:
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
if source_node_uuid is not None:
filter_params['source_uuid'] = source_node_uuid
filter_queries.append('n.uuid = $source_uuid')
if target_node_uuid is not None:
filter_params['target_uuid'] = target_node_uuid
filter_queries.append('m.uuid = $target_uuid')
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index on RELATES_TO.fact_embedding when available. The unindexed
# fallback is the same MATCH-and-cosine math that previously hung
# for 6+ minutes on a 4,000-entity graph; this is the load-bearing
# call site that motivated the patch.
use_index = await _falkordb_vector_index_exists(
executor, 'RELATES_TO', 'fact_embedding', 'RELATIONSHIP'
)
prefix, _ = _falkordb_vector_edge_search_cypher(
'RELATES_TO', 'fact_embedding', '$search_vector', use_index
)
where_clauses = []
if filter_query:
where_clauses.append(filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**filter_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [entity_edge_from_record(r) for r in records]
async def edge_bfs_search(
self,
executor: QueryExecutor,
origin_uuids: list[str],
max_depth: int,
search_filter: SearchFilters,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EntityEdge]:
if not origin_uuids:
return []
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filter, GraphProvider.FALKORDB
)
if group_ids is not None:
filter_queries.append('e.group_id IN $group_ids')
filter_params['group_ids'] = group_ids
filter_query = ''
if filter_queries:
filter_query = ' WHERE ' + (' AND '.join(filter_queries))
cypher = (
f"""
UNWIND $bfs_origin_node_uuids AS origin_uuid
MATCH path = (origin {{uuid: origin_uuid}})-[:RELATES_TO|MENTIONS*1..{max_depth}]->(:Entity)
UNWIND relationships(path) AS rel
MATCH (n:Entity)-[e:RELATES_TO {{uuid: rel.uuid}}]-(m:Entity)
"""
+ filter_query
+ """
RETURN DISTINCT
"""
+ get_entity_edge_return_query(GraphProvider.FALKORDB)
+ """
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher,
bfs_origin_node_uuids=origin_uuids,
depth=max_depth,
limit=limit,
**filter_params,
)
return [entity_edge_from_record(r) for r in records]
# --- Episode search ---
async def episode_fulltext_search(
self,
executor: QueryExecutor,
query: str,
search_filter: SearchFilters, # noqa: ARG002
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[EpisodicNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query += '\nAND e.group_id IN $group_ids'
filter_params['group_ids'] = group_ids
cypher = (
get_nodes_query(
'episode_content', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD node AS episode, score
MATCH (e:Episodic)
WHERE e.uuid = episode.uuid
"""
+ group_filter_query
+ """
RETURN
"""
+ EPISODIC_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher, query=fuzzy_query, limit=limit, **filter_params
)
return [episodic_node_from_record(r) for r in records]
# --- Community search ---
async def community_fulltext_search(
self,
executor: QueryExecutor,
query: str,
group_ids: list[str] | None = None,
limit: int = 10,
) -> list[CommunityNode]:
fuzzy_query = _build_falkor_fulltext_query(query, group_ids)
if fuzzy_query == '':
return []
filter_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query = 'WHERE c.group_id IN $group_ids'
filter_params['group_ids'] = group_ids
cypher = (
get_nodes_query(
'community_name', '$query', limit=limit, provider=GraphProvider.FALKORDB
)
+ """
YIELD node AS c, score
WITH c, score
"""
+ group_filter_query
+ """
RETURN
"""
+ COMMUNITY_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
records, _, _ = await executor.execute_query(
cypher, query=fuzzy_query, limit=limit, **filter_params
)
return [community_node_from_record(r) for r in records]
async def community_similarity_search(
self,
executor: QueryExecutor,
search_vector: list[float],
group_ids: list[str] | None = None,
limit: int = 10,
min_score: float = 0.6,
) -> list[CommunityNode]:
query_params: dict[str, Any] = {}
group_filter_query = ''
if group_ids is not None:
group_filter_query += ' WHERE c.group_id IN $group_ids'
query_params['group_ids'] = group_ids
# PATCHED 2026-05-02 (BirdAI vendored patch): use FalkorDB native vector
# index on Community.name_embedding when available. Note: the existing
# filter is built into `group_filter_query` (already prefixed with
# ' WHERE ' if non-empty) and uses variable `c`. The dispatcher binds
# the node as `n` for parity with the helper signature, then we
# re-bind to `c` via WITH so the rest of the query is unchanged.
use_index = await _falkordb_vector_index_exists(
executor, 'Community', 'name_embedding', 'NODE'
)
prefix, _ = _falkordb_vector_node_search_cypher(
'Community', 'name_embedding', '$search_vector', use_index
)
prefix = prefix + ' WITH n AS c, score '
where_clauses = []
if group_filter_query:
where_clauses.append(group_filter_query.replace(' WHERE ', '', 1).strip())
where_clauses.append('score > $min_score')
unified_where = ' WHERE ' + ' AND '.join(where_clauses)
cypher = (
prefix
+ unified_where
+ """
RETURN
"""
+ COMMUNITY_NODE_RETURN
+ """
ORDER BY score DESC
LIMIT $limit
"""
)
params = dict(
search_vector=search_vector,
limit=limit,
min_score=min_score,
**query_params,
)
if use_index:
params['candidate_k'] = limit * VECTOR_INDEX_CANDIDATE_MULTIPLIER
records, _, _ = await executor.execute_query(cypher, **params)
return [community_node_from_record(r) for r in records]
# --- Rerankers ---
async def node_distance_reranker(
self,
executor: QueryExecutor,
node_uuids: list[str],
center_node_uuid: str,
min_score: float = 0,
) -> list[EntityNode]:
filtered_uuids = [u for u in node_uuids if u != center_node_uuid]
scores: dict[str, float] = {center_node_uuid: 0.0}
cypher = """
UNWIND $node_uuids AS node_uuid
MATCH (center:Entity {uuid: $center_uuid})-[:RELATES_TO]-(n:Entity {uuid: node_uuid})
RETURN 1 AS score, node_uuid AS uuid
"""
results, _, _ = await executor.execute_query(
cypher,
node_uuids=filtered_uuids,
center_uuid=center_node_uuid,
)
for result in results:
scores[result['uuid']] = result['score']
for uuid in filtered_uuids:
if uuid not in scores:
scores[uuid] = float('inf')
filtered_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
if center_node_uuid in node_uuids:
scores[center_node_uuid] = 0.1
filtered_uuids = [center_node_uuid] + filtered_uuids
reranked_uuids = [u for u in filtered_uuids if (1 / scores[u]) >= min_score]
if not reranked_uuids:
return []
get_query = """
MATCH (n:Entity)
WHERE n.uuid IN $uuids
RETURN
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
return [node_map[u] for u in reranked_uuids if u in node_map]
async def episode_mentions_reranker(
self,
executor: QueryExecutor,
node_uuids: list[str],
min_score: float = 0,
) -> list[EntityNode]:
if not node_uuids:
return []
scores: dict[str, float] = {}
results, _, _ = await executor.execute_query(
"""
UNWIND $node_uuids AS node_uuid
MATCH (episode:Episodic)-[r:MENTIONS]->(n:Entity {uuid: node_uuid})
RETURN count(*) AS score, n.uuid AS uuid
""",
node_uuids=node_uuids,
)
for result in results:
scores[result['uuid']] = result['score']
for uuid in node_uuids:
if uuid not in scores:
scores[uuid] = float('inf')
sorted_uuids = list(node_uuids)
sorted_uuids.sort(key=lambda cur_uuid: scores[cur_uuid])
reranked_uuids = [u for u in sorted_uuids if scores[u] >= min_score]
if not reranked_uuids:
return []
get_query = """
MATCH (n:Entity)
WHERE n.uuid IN $uuids
RETURN
""" + get_entity_node_return_query(GraphProvider.FALKORDB)
records, _, _ = await executor.execute_query(get_query, uuids=reranked_uuids)
node_map = {r['uuid']: entity_node_from_record(r) for r in records}
return [node_map[u] for u in reranked_uuids if u in node_map]
# --- Filter builders ---
def build_node_search_filters(self, search_filters: SearchFilters) -> Any:
filter_queries, filter_params = node_search_filter_query_constructor(
search_filters, GraphProvider.FALKORDB
)
return {'filter_queries': filter_queries, 'filter_params': filter_params}
def build_edge_search_filters(self, search_filters: SearchFilters) -> Any:
filter_queries, filter_params = edge_search_filter_query_constructor(
search_filters, GraphProvider.FALKORDB
)
return {'filter_queries': filter_queries, 'filter_params': filter_params}
# --- Fulltext query builder ---
def build_fulltext_query(
self,
query: str,
group_ids: list[str] | None = None,
max_query_length: int = MAX_QUERY_LENGTH,
) -> str:
return _build_falkor_fulltext_query(query, group_ids, max_query_length)
@@ -0,0 +1,444 @@
"""
Copyright 2024, Zep Software, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import asyncio
import datetime
import logging
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from falkordb import Graph as FalkorGraph
from falkordb.asyncio import FalkorDB
else:
try:
from falkordb import Graph as FalkorGraph
from falkordb.asyncio import FalkorDB
except ImportError:
# If falkordb is not installed, raise an ImportError
raise ImportError(
'falkordb is required for FalkorDriver. '
'Install it with: pip install graphiti-core[falkordb]'
) from None
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
from graphiti_core.driver.falkordb import STOPWORDS as STOPWORDS
from graphiti_core.driver.falkordb.operations.community_edge_ops import (
FalkorCommunityEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.community_node_ops import (
FalkorCommunityNodeOperations,
)
from graphiti_core.driver.falkordb.operations.entity_edge_ops import FalkorEntityEdgeOperations
from graphiti_core.driver.falkordb.operations.entity_node_ops import FalkorEntityNodeOperations
from graphiti_core.driver.falkordb.operations.episode_node_ops import FalkorEpisodeNodeOperations
from graphiti_core.driver.falkordb.operations.episodic_edge_ops import FalkorEpisodicEdgeOperations
from graphiti_core.driver.falkordb.operations.graph_ops import FalkorGraphMaintenanceOperations
from graphiti_core.driver.falkordb.operations.has_episode_edge_ops import (
FalkorHasEpisodeEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.next_episode_edge_ops import (
FalkorNextEpisodeEdgeOperations,
)
from graphiti_core.driver.falkordb.operations.saga_node_ops import FalkorSagaNodeOperations
from graphiti_core.driver.falkordb.operations.search_ops import FalkorSearchOperations
from graphiti_core.driver.operations.community_edge_ops import CommunityEdgeOperations
from graphiti_core.driver.operations.community_node_ops import CommunityNodeOperations
from graphiti_core.driver.operations.entity_edge_ops import EntityEdgeOperations
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
from graphiti_core.driver.operations.episode_node_ops import EpisodeNodeOperations
from graphiti_core.driver.operations.episodic_edge_ops import EpisodicEdgeOperations
from graphiti_core.driver.operations.graph_ops import GraphMaintenanceOperations
from graphiti_core.driver.operations.has_episode_edge_ops import HasEpisodeEdgeOperations
from graphiti_core.driver.operations.next_episode_edge_ops import NextEpisodeEdgeOperations
from graphiti_core.driver.operations.saga_node_ops import SagaNodeOperations
from graphiti_core.driver.operations.search_ops import SearchOperations
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices, get_vector_indices
from graphiti_core.helpers import validate_group_ids
from graphiti_core.utils.datetime_utils import convert_datetimes_to_strings
logger = logging.getLogger(__name__)
class FalkorDriverSession(GraphDriverSession):
provider = GraphProvider.FALKORDB
def __init__(self, graph: FalkorGraph):
self.graph = graph
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
# No cleanup needed for Falkor, but method must exist
pass
async def close(self):
# No explicit close needed for FalkorDB, but method must exist
pass
async def execute_write(self, func, *args, **kwargs):
# Directly await the provided async function with `self` as the transaction/session
return await func(self, *args, **kwargs)
async def run(self, query: str | list, **kwargs: Any) -> Any:
# FalkorDB does not support argument for Label Set, so it's converted into an array of queries
if isinstance(query, list):
for cypher, params in query:
params = convert_datetimes_to_strings(params)
await self.graph.query(str(cypher), params) # type: ignore[reportUnknownArgumentType]
else:
params = dict(kwargs)
params = convert_datetimes_to_strings(params)
await self.graph.query(str(query), params) # type: ignore[reportUnknownArgumentType]
# Assuming `graph.query` is async (ideal); otherwise, wrap in executor
return None
class FalkorDriver(GraphDriver):
provider = GraphProvider.FALKORDB
default_group_id: str = '\\_'
fulltext_syntax: str = '@' # FalkorDB uses a redisearch-like syntax for fulltext queries
aoss_client: None = None
def __init__(
self,
host: str = 'localhost',
port: int = 6379,
username: str | None = None,
password: str | None = None,
falkor_db: FalkorDB | None = None,
database: str = 'default_db',
):
"""
Initialize the FalkorDB driver.
FalkorDB is a multi-tenant graph database.
To connect, provide the host and port.
The default parameters assume a local (on-premises) FalkorDB instance.
Args:
host (str): The host where FalkorDB is running.
port (int): The port on which FalkorDB is listening.
username (str | None): The username for authentication (if required).
password (str | None): The password for authentication (if required).
falkor_db (FalkorDB | None): An existing FalkorDB instance to use instead of creating a new one.
database (str): The name of the database to connect to. Defaults to 'default_db'.
"""
super().__init__()
self._database = database
if falkor_db is not None:
# If a FalkorDB instance is provided, use it directly
self.client = falkor_db
else:
self.client = FalkorDB(host=host, port=port, username=username, password=password)
# Instantiate FalkorDB operations
self._entity_node_ops = FalkorEntityNodeOperations()
self._episode_node_ops = FalkorEpisodeNodeOperations()
self._community_node_ops = FalkorCommunityNodeOperations()
self._saga_node_ops = FalkorSagaNodeOperations()
self._entity_edge_ops = FalkorEntityEdgeOperations()
self._episodic_edge_ops = FalkorEpisodicEdgeOperations()
self._community_edge_ops = FalkorCommunityEdgeOperations()
self._has_episode_edge_ops = FalkorHasEpisodeEdgeOperations()
self._next_episode_edge_ops = FalkorNextEpisodeEdgeOperations()
self._search_ops = FalkorSearchOperations()
self._graph_ops = FalkorGraphMaintenanceOperations()
# Schedule the indices and constraints to be built
try:
# Try to get the current event loop
loop = asyncio.get_running_loop()
# Schedule the build_indices_and_constraints to run
loop.create_task(self.build_indices_and_constraints())
except RuntimeError:
# No event loop running, this will be handled later
pass
# --- Operations properties ---
@property
def entity_node_ops(self) -> EntityNodeOperations:
return self._entity_node_ops
@property
def episode_node_ops(self) -> EpisodeNodeOperations:
return self._episode_node_ops
@property
def community_node_ops(self) -> CommunityNodeOperations:
return self._community_node_ops
@property
def saga_node_ops(self) -> SagaNodeOperations:
return self._saga_node_ops
@property
def entity_edge_ops(self) -> EntityEdgeOperations:
return self._entity_edge_ops
@property
def episodic_edge_ops(self) -> EpisodicEdgeOperations:
return self._episodic_edge_ops
@property
def community_edge_ops(self) -> CommunityEdgeOperations:
return self._community_edge_ops
@property
def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations:
return self._has_episode_edge_ops
@property
def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations:
return self._next_episode_edge_ops
@property
def search_ops(self) -> SearchOperations:
return self._search_ops
@property
def graph_ops(self) -> GraphMaintenanceOperations:
return self._graph_ops
def _get_graph(self, graph_name: str | None) -> FalkorGraph:
# FalkorDB requires a non-None database name for multi-tenant graphs; the default is "default_db"
if graph_name is None:
graph_name = self._database
return self.client.select_graph(graph_name)
async def execute_query(self, cypher_query_, **kwargs: Any):
graph = self._get_graph(self._database)
# Convert datetime objects to ISO strings (FalkorDB does not support datetime objects directly)
params = convert_datetimes_to_strings(dict(kwargs))
try:
result = await graph.query(cypher_query_, params) # type: ignore[reportUnknownArgumentType]
except Exception as e:
if 'already indexed' in str(e):
# check if index already exists
logger.info(f'Index already exists: {e}')
return None
logger.error(f'Error executing FalkorDB query: {e}\n{cypher_query_}\n{params}')
raise
# Convert the result header to a list of strings
header = [h[1] for h in result.header]
# Convert FalkorDB's result format (list of lists) to the format expected by Graphiti (list of dicts)
records = []
for row in result.result_set:
record = {}
for i, field_name in enumerate(header):
if i < len(row):
record[field_name] = row[i]
else:
# If there are more fields in header than values in row, set to None
record[field_name] = None
records.append(record)
return records, header, None
def session(self, database: str | None = None) -> GraphDriverSession:
return FalkorDriverSession(self._get_graph(database))
async def close(self) -> None:
"""Close the driver connection."""
if hasattr(self.client, 'aclose'):
await self.client.aclose() # type: ignore[reportUnknownMemberType]
elif hasattr(self.client.connection, 'aclose'):
await self.client.connection.aclose()
elif hasattr(self.client.connection, 'close'):
await self.client.connection.close()
async def delete_all_indexes(self) -> None:
result = await self.execute_query('CALL db.indexes()')
if not result:
return
records, _, _ = result
drop_tasks = []
for record in records:
label = record['label']
entity_type = record['entitytype']
for field_name, index_type in record['types'].items():
if 'RANGE' in index_type:
drop_tasks.append(self.execute_query(f'DROP INDEX ON :{label}({field_name})'))
elif 'FULLTEXT' in index_type:
if entity_type == 'NODE':
drop_tasks.append(
self.execute_query(
f'DROP FULLTEXT INDEX FOR (n:{label}) ON (n.{field_name})'
)
)
elif entity_type == 'RELATIONSHIP':
drop_tasks.append(
self.execute_query(
f'DROP FULLTEXT INDEX FOR ()-[e:{label}]-() ON (e.{field_name})'
)
)
if drop_tasks:
await asyncio.gather(*drop_tasks)
async def build_indices_and_constraints(self, delete_existing=False):
if delete_existing:
await self.delete_all_indexes()
# PATCHED 2026-05-02 (BirdAI vendored patch): add vector indexes alongside
# range and fulltext. FalkorDB supports native vector indexes via
# db.idx.vector.queryNodes / queryRelationships; without these, similarity
# search runs as full-table-scan cosine math in interpreted Cypher.
index_queries = (
get_range_indices(self.provider)
+ get_fulltext_indices(self.provider)
+ get_vector_indices(self.provider)
)
for query in index_queries:
await self.execute_query(query)
# Invalidate the search_ops vector-index existence cache so subsequent
# similarity queries re-probe and discover the indexes we just built.
try:
from graphiti_core.driver.falkordb.operations.search_ops import (
_invalidate_falkordb_vector_index_cache,
)
_invalidate_falkordb_vector_index_cache()
except ImportError:
# search_ops module not yet imported (cold start); cache is empty
# by default, so no invalidation needed.
pass
def clone(self, database: str) -> 'GraphDriver':
"""
Returns a shallow copy of this driver with a different default database.
Reuses the same connection (e.g. FalkorDB, Neo4j).
"""
if database == self._database:
cloned = self
elif database == self.default_group_id:
cloned = FalkorDriver(falkor_db=self.client)
else:
# Create a new instance of FalkorDriver with the same connection but a different database
cloned = FalkorDriver(falkor_db=self.client, database=database)
return cloned
async def health_check(self) -> None:
"""Check FalkorDB connectivity by running a simple query."""
try:
await self.execute_query('MATCH (n) RETURN 1 LIMIT 1')
return None
except Exception as e:
print(f'FalkorDB health check failed: {e}')
raise
@staticmethod
def convert_datetimes_to_strings(obj):
if isinstance(obj, dict):
return {k: FalkorDriver.convert_datetimes_to_strings(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [FalkorDriver.convert_datetimes_to_strings(item) for item in obj]
elif isinstance(obj, tuple):
return tuple(FalkorDriver.convert_datetimes_to_strings(item) for item in obj)
elif isinstance(obj, datetime):
return obj.isoformat()
else:
return obj
def sanitize(self, query: str) -> str:
"""
Replace FalkorDB special characters with whitespace.
Based on FalkorDB tokenization rules: ,.<>{}[]"':;!@#$%^&*()-+=~
"""
# FalkorDB separator characters that break text into tokens
separator_map = str.maketrans(
{
',': ' ',
'.': ' ',
'<': ' ',
'>': ' ',
'{': ' ',
'}': ' ',
'[': ' ',
']': ' ',
'"': ' ',
"'": ' ',
':': ' ',
';': ' ',
'!': ' ',
'@': ' ',
'#': ' ',
'$': ' ',
'%': ' ',
'^': ' ',
'&': ' ',
'*': ' ',
'(': ' ',
')': ' ',
'-': ' ',
'+': ' ',
'=': ' ',
'~': ' ',
'?': ' ',
'|': ' ',
'/': ' ',
'\\': ' ',
}
)
sanitized = query.translate(separator_map)
# Clean up multiple spaces
sanitized = ' '.join(sanitized.split())
return sanitized
def build_fulltext_query(
self, query: str, group_ids: list[str] | None = None, max_query_length: int = 128
) -> str:
"""
Build a fulltext query string for FalkorDB using RedisSearch syntax.
FalkorDB uses RedisSearch-like syntax where:
- Field queries use @ prefix: @field:value
- Multiple values for same field: (@field:value1|value2)
- Text search doesn't need @ prefix for content fields
- AND is implicit with space: (@group_id:value) (text)
- OR uses pipe within parentheses: (@group_id:value1|value2)
"""
validate_group_ids(group_ids)
if group_ids is None or len(group_ids) == 0:
group_filter = ''
else:
# Escape group_ids with quotes to prevent RediSearch syntax errors
# with reserved words like "main" or special characters like hyphens
escaped_group_ids = [f'"{gid}"' for gid in group_ids]
group_values = '|'.join(escaped_group_ids)
group_filter = f'(@group_id:{group_values})'
sanitized_query = self.sanitize(query)
# Remove stopwords and empty tokens from the sanitized query
query_words = sanitized_query.split()
filtered_words = [word for word in query_words if word and word.lower() not in STOPWORDS]
sanitized_query = ' | '.join(filtered_words)
# If the query is too long return no query
if len(sanitized_query.split(' ')) + len(group_ids or '') >= max_query_length:
return ''
full_query = group_filter + ' (' + sanitized_query + ')'
return full_query
@@ -0,0 +1,242 @@
"""
Database query utilities for different graph database backends.
This module provides database-agnostic query generation for Neo4j and FalkorDB,
supporting index creation, fulltext search, and bulk operations.
PATCHED for FalkorDB native vector index support (BirdAI vendored patch,
2026-05-02). Adds:
- get_vector_indices(): CREATE VECTOR INDEX statements for FalkorDB
- get_vector_search_query(): Cypher fragment for vector similarity using
FalkorDB's db.idx.vector procedures, with fallback to cosine math when
the index does not yet exist
- VECTOR_INDEX_CANDIDATE_MULTIPLIER: over-fetch factor for vector index
queries to handle filter rejections after index lookup
No changes to Neo4j or Kuzu code paths.
"""
from typing_extensions import LiteralString
from graphiti_core.driver.driver import GraphProvider
# Mapping from Neo4j fulltext index names to FalkorDB node labels
NEO4J_TO_FALKORDB_MAPPING = {
'node_name_and_summary': 'Entity',
'community_name': 'Community',
'episode_content': 'Episodic',
'edge_name_and_fact': 'RELATES_TO',
}
# Mapping from fulltext index names to Kuzu node labels
INDEX_TO_LABEL_KUZU_MAPPING = {
'node_name_and_summary': 'Entity',
'community_name': 'Community',
'episode_content': 'Episodic',
'edge_name_and_fact': 'RelatesToNode_',
}
# Vector index over-fetch multiplier. When a vector index search is
# combined with WHERE filters (group_id, source_uuid, etc.), some of
# the top-k index results may be filtered out. Over-fetching by this
# factor preserves recall against the final LIMIT after filtering.
# Conservative default; tunable per-deployment by editing this constant
# or via environment-variable override at the driver level (future).
VECTOR_INDEX_CANDIDATE_MULTIPLIER = 5
def get_range_indices(provider: GraphProvider) -> list[LiteralString]:
if provider == GraphProvider.FALKORDB:
return [
# Entity node
'CREATE INDEX FOR (n:Entity) ON (n.uuid, n.group_id, n.name, n.created_at)',
# Episodic node
'CREATE INDEX FOR (n:Episodic) ON (n.uuid, n.group_id, n.created_at, n.valid_at)',
# Community node
'CREATE INDEX FOR (n:Community) ON (n.uuid)',
# Saga node
'CREATE INDEX FOR (n:Saga) ON (n.uuid, n.group_id, n.name)',
# RELATES_TO edge
'CREATE INDEX FOR ()-[e:RELATES_TO]-() ON (e.uuid, e.group_id, e.name, e.created_at, e.expired_at, e.valid_at, e.invalid_at)',
# MENTIONS edge
'CREATE INDEX FOR ()-[e:MENTIONS]-() ON (e.uuid, e.group_id)',
# HAS_MEMBER edge
'CREATE INDEX FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
# HAS_EPISODE edge
'CREATE INDEX FOR ()-[e:HAS_EPISODE]-() ON (e.uuid, e.group_id)',
# NEXT_EPISODE edge
'CREATE INDEX FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid, e.group_id)',
]
if provider == GraphProvider.KUZU:
return []
return [
'CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid)',
'CREATE INDEX episode_uuid IF NOT EXISTS FOR (n:Episodic) ON (n.uuid)',
'CREATE INDEX community_uuid IF NOT EXISTS FOR (n:Community) ON (n.uuid)',
'CREATE INDEX saga_uuid IF NOT EXISTS FOR (n:Saga) ON (n.uuid)',
'CREATE INDEX relation_uuid IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.uuid)',
'CREATE INDEX mention_uuid IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.uuid)',
'CREATE INDEX has_member_uuid IF NOT EXISTS FOR ()-[e:HAS_MEMBER]-() ON (e.uuid)',
'CREATE INDEX has_episode_uuid IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.uuid)',
'CREATE INDEX next_episode_uuid IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.uuid)',
'CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id)',
'CREATE INDEX episode_group_id IF NOT EXISTS FOR (n:Episodic) ON (n.group_id)',
'CREATE INDEX community_group_id IF NOT EXISTS FOR (n:Community) ON (n.group_id)',
'CREATE INDEX saga_group_id IF NOT EXISTS FOR (n:Saga) ON (n.group_id)',
'CREATE INDEX relation_group_id IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.group_id)',
'CREATE INDEX mention_group_id IF NOT EXISTS FOR ()-[e:MENTIONS]-() ON (e.group_id)',
'CREATE INDEX has_episode_group_id IF NOT EXISTS FOR ()-[e:HAS_EPISODE]-() ON (e.group_id)',
'CREATE INDEX next_episode_group_id IF NOT EXISTS FOR ()-[e:NEXT_EPISODE]-() ON (e.group_id)',
'CREATE INDEX name_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.name)',
'CREATE INDEX saga_name IF NOT EXISTS FOR (n:Saga) ON (n.name)',
'CREATE INDEX created_at_entity_index IF NOT EXISTS FOR (n:Entity) ON (n.created_at)',
'CREATE INDEX created_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.created_at)',
'CREATE INDEX valid_at_episodic_index IF NOT EXISTS FOR (n:Episodic) ON (n.valid_at)',
'CREATE INDEX name_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.name)',
'CREATE INDEX created_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.created_at)',
'CREATE INDEX expired_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.expired_at)',
'CREATE INDEX valid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.valid_at)',
'CREATE INDEX invalid_at_edge_index IF NOT EXISTS FOR ()-[e:RELATES_TO]-() ON (e.invalid_at)',
]
def get_fulltext_indices(provider: GraphProvider) -> list[LiteralString]:
if provider == GraphProvider.FALKORDB:
from typing import cast
from graphiti_core.driver.falkordb import STOPWORDS
# Convert to string representation for embedding in queries
stopwords_str = str(STOPWORDS)
# Use type: ignore to satisfy LiteralString requirement while maintaining single source of truth
return cast(
list[LiteralString],
[
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Episodic',
stopwords: {stopwords_str}
}},
'content', 'source', 'source_description', 'group_id'
)""",
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Entity',
stopwords: {stopwords_str}
}},
'name', 'summary', 'group_id'
)""",
f"""CALL db.idx.fulltext.createNodeIndex(
{{
label: 'Community',
stopwords: {stopwords_str}
}},
'name', 'group_id'
)""",
"""CREATE FULLTEXT INDEX FOR ()-[e:RELATES_TO]-() ON (e.name, e.fact, e.group_id)""",
],
)
if provider == GraphProvider.KUZU:
return [
"CALL CREATE_FTS_INDEX('Episodic', 'episode_content', ['content', 'source', 'source_description']);",
"CALL CREATE_FTS_INDEX('Entity', 'node_name_and_summary', ['name', 'summary']);",
"CALL CREATE_FTS_INDEX('Community', 'community_name', ['name']);",
"CALL CREATE_FTS_INDEX('RelatesToNode_', 'edge_name_and_fact', ['name', 'fact']);",
]
return [
"""CREATE FULLTEXT INDEX episode_content IF NOT EXISTS
FOR (e:Episodic) ON EACH [e.content, e.source, e.source_description, e.group_id]""",
"""CREATE FULLTEXT INDEX node_name_and_summary IF NOT EXISTS
FOR (n:Entity) ON EACH [n.name, n.summary, n.group_id]""",
"""CREATE FULLTEXT INDEX community_name IF NOT EXISTS
FOR (n:Community) ON EACH [n.name, n.group_id]""",
"""CREATE FULLTEXT INDEX edge_name_and_fact IF NOT EXISTS
FOR ()-[e:RELATES_TO]-() ON EACH [e.name, e.fact, e.group_id]""",
]
def get_vector_indices(provider: GraphProvider, dimension: int = 384) -> list[LiteralString]:
"""Return CREATE VECTOR INDEX statements for the given provider.
For FalkorDB: creates HNSW vector indexes on Entity.name_embedding,
RELATES_TO.fact_embedding, and Community.name_embedding. Backed by
FalkorDB's native vector index (db.idx.vector.queryNodes /
queryRelationships).
For Neo4j and Kuzu: returns an empty list. Those backends create vector
indexes via different mechanisms (Neo4j auto-creates them when needed
via its vector.similarity.cosine function; Kuzu uses array_cosine_similarity
and does not require pre-built vector indexes for graphiti-core's usage).
Args:
provider: The graph database provider.
dimension: Embedding dimension. Defaults to 384 (all-MiniLM-L6-v2).
Embedders with different dimensions should pass their own value
through driver configuration. graphiti-core's default embedder
is 1536 (OpenAI ada-002); BirdAI uses 384 (sentence-transformers).
Returns:
List of CREATE VECTOR INDEX statements. Idempotent at FalkorDB level
if the index already exists with matching options.
"""
if provider == GraphProvider.FALKORDB:
from typing import cast
return cast(
list[LiteralString],
[
f"CREATE VECTOR INDEX FOR (n:Entity) ON (n.name_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
f"CREATE VECTOR INDEX FOR ()-[e:RELATES_TO]-() ON (e.fact_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
f"CREATE VECTOR INDEX FOR (n:Community) ON (n.name_embedding) "
f"OPTIONS {{dimension: {dimension}, similarityFunction: 'cosine'}}",
],
)
return []
def get_nodes_query(name: str, query: str, limit: int, provider: GraphProvider) -> str:
if provider == GraphProvider.FALKORDB:
label = NEO4J_TO_FALKORDB_MAPPING[name]
return f"CALL db.idx.fulltext.queryNodes('{label}', {query})"
if provider == GraphProvider.KUZU:
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', {query}, TOP := $limit)"
return f'CALL db.index.fulltext.queryNodes("{name}", {query}, {{limit: $limit}})'
def get_vector_cosine_func_query(vec1, vec2, provider: GraphProvider) -> str:
"""Return a Cypher fragment for cosine similarity score in [0, 1].
PRESERVED for backward compatibility and as fallback when vector indexes
do not yet exist on the FalkorDB backend. New code paths should prefer
get_vector_search_query() which uses the native vector index when
available.
"""
if provider == GraphProvider.FALKORDB:
# FalkorDB uses a different syntax for regular cosine similarity and Neo4j uses normalized cosine similarity
return f'(2 - vec.cosineDistance({vec1}, vecf32({vec2})))/2'
if provider == GraphProvider.KUZU:
return f'array_cosine_similarity({vec1}, {vec2})'
return f'vector.similarity.cosine({vec1}, {vec2})'
def get_relationships_query(name: str, limit: int, provider: GraphProvider) -> str:
if provider == GraphProvider.FALKORDB:
label = NEO4J_TO_FALKORDB_MAPPING[name]
return f"CALL db.idx.fulltext.queryRelationships('{label}', $query)"
if provider == GraphProvider.KUZU:
label = INDEX_TO_LABEL_KUZU_MAPPING[name]
return f"CALL QUERY_FTS_INDEX('{label}', '{name}', cast($query AS STRING), TOP := $limit)"
return f'CALL db.index.fulltext.queryRelationships("{name}", $query, {{limit: $limit}})'
@@ -0,0 +1,55 @@
-- Migration: 20260502-001_async_job_model
-- Purpose: Pattern 1 async job model — sidecar processes ingest jobs serially
-- via Postgres-backed queue. Worker submits and polls rather than
-- blocking on synchronous HTTP response.
--
-- Architectural rationale: tonight's smoke test (2026-05-02 ~01:40-01:50 UTC)
-- diagnosed that bulk ingest against a 4,222-entity graph commits successfully
-- but the worker's HTTP read-timeout fires before the response returns. Three
-- days of "saga deadlock" failures were false negatives — the work succeeded;
-- the worker just stopped listening. Pattern 1 separates submission from
-- completion observation so the worker can't false-negative this way.
--
-- The job model is also the natural data source for Phase A items 6-7
-- (metrics tables) — graphiti_jobs records duration, status transitions,
-- and per-job summary that those tables will aggregate.
--
-- Idempotent: safe to re-run.
-- Job state for sidecar's async ingest queue.
-- One row per submitted bulk-or-single ingest. Sidecar reads queued jobs
-- on startup to resume after restart. Worker polls status until terminal.
CREATE TABLE IF NOT EXISTS graphiti_jobs (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL CHECK (job_type IN ('bulk', 'single')),
payload JSONB NOT NULL, -- full submitted request body
status TEXT NOT NULL DEFAULT 'queued' -- 'queued'|'running'|'committed'|'failed'
CHECK (status IN ('queued', 'running', 'committed', 'failed')),
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
error TEXT, -- non-null when status='failed'
summary JSONB, -- {nodes: N, edges: N, episodes: N}
submitted_by TEXT -- worker name for traceability
);
-- Index supporting sidecar's "pick next queued job" query
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_queued
ON graphiti_jobs (enqueued_at)
WHERE status = 'queued';
-- Index supporting worker's "poll my job by id" query (PK already does this,
-- but explicit index aids ANALYZE behavior on small tables)
CREATE INDEX IF NOT EXISTS idx_graphiti_jobs_status
ON graphiti_jobs (status);
-- Stage 3 queue gains a reference to the sidecar job processing the row.
-- When set, worker polls graphiti_jobs.status rather than blocking on HTTP.
-- NULL means: row not yet submitted, or pre-Pattern-1 row.
ALTER TABLE stage_3_queue
ADD COLUMN IF NOT EXISTS external_job_id UUID;
-- Index for "find rows that submitted but didn't complete" recovery scans
CREATE INDEX IF NOT EXISTS idx_stage_3_queue_external_job
ON stage_3_queue (external_job_id)
WHERE external_job_id IS NOT NULL AND completed_at IS NULL AND failed_at IS NULL;
+419 -69
View File
@@ -1,14 +1,44 @@
""" """
Aaron AI — Graphiti Sidecar Service Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model)
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest
submission and completion are decoupled. Submitters POST to /episodes or
/episodes/bulk and receive a job_id; an in-process background worker
processes jobs serially against the graph; submitters poll GET /jobs/{id}
until terminal status.
Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk
ingest against the 4,222-entity graph commits successfully even when the
worker's HTTP read-timeout fires. The synchronous interface was producing
false-negative failures — work succeeded but the worker stopped listening.
Pattern 1 separates submission from completion observation so the worker
can't false-negative this way.
Architectural commitments:
- One in-flight job per sidecar (per graph). Concurrent jobs against the
same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk
(no transaction boundary, no internal coordination). Concurrent
multi-tenancy is "run multiple sidecars," not "make one sidecar
concurrency-safe across graphs."
- Postgres-backed job state. Survives sidecar restart. On startup the
sidecar resets any 'running' rows to 'queued' (their previous run died);
the background worker picks them up naturally.
- Both /episodes and /episodes/bulk are async-shaped for parity. graphiti-
core operations underneath (add_episode, add_episode_bulk) are unchanged.
- The bulk pathway is preserved — load-bearing for first-run corpus
migration. Single-episode is preserved — load-bearing for state-
superseding content per the Stage 2/3 routing rule.
Port 8001 (internal only). No OpenAI dependency. Port 8001 (internal only). No OpenAI dependency.
""" """
import os, logging, sys, traceback import os, logging, sys, asyncio, traceback, uuid, json
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
import psycopg2
import psycopg2.extras
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
@@ -31,8 +61,18 @@ FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic") LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6") LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY") LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
PG_DSN = os.getenv("PG_DSN")
SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1")
os.environ["EMBEDDING_DIM"] = "384" os.environ["EMBEDDING_DIM"] = "384"
# Background worker configuration. Polls Postgres for queued jobs every
# WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design;
# no concurrency primitive beyond the serial loop. The sleep is brief
# enough to feel responsive but long enough to avoid burning CPU on an
# empty queue.
WORKER_POLL_INTERVAL = 2.0
def get_llm_client(): def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL) config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
@@ -50,16 +90,286 @@ def get_llm_client():
return GroqClient(config) return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}") raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
async def get_graphiti(): graphiti_instance = None
if graphiti_instance is None: worker_task = None
raise HTTPException(status_code=503, detail="Graphiti not initialized")
return graphiti_instance
# ---------------------------------------------------------------------------
# Postgres job-state helpers. Synchronous psycopg2 calls inside async
# functions: each call opens a fresh connection, runs one statement, closes.
# Acceptable here because traffic is low (single-digit jobs/min steady state)
# and the simplicity is worth more than connection pooling. If this ever
# becomes a bottleneck, swap to asyncpg or psycopg3 async.
# ---------------------------------------------------------------------------
def _pg():
return psycopg2.connect(PG_DSN)
def _job_insert(job_id: str, job_type: str, payload: dict) -> None:
"""Write a new job row in 'queued' status."""
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by)
VALUES (%s, %s, %s::jsonb, 'queued', %s)
""",
(job_id, job_type, json.dumps(payload), SIDECAR_NAME),
)
pg.commit()
pg.close()
def _job_get(job_id: str) -> dict | None:
"""Read a single job by id. Returns None if not found."""
pg = _pg()
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"""
SELECT job_id, job_type, status, enqueued_at, started_at, finished_at,
error, summary, submitted_by
FROM graphiti_jobs
WHERE job_id = %s
""",
(job_id,),
)
row = cur.fetchone()
pg.close()
if row is None:
return None
# Convert UUID, datetimes for JSON serialization
return {
"job_id": str(row["job_id"]),
"job_type": row["job_type"],
"status": row["status"],
"enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None,
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else None,
"error": row["error"],
"summary": row["summary"],
"submitted_by": row["submitted_by"],
}
def _job_claim_next() -> dict | None:
"""Atomically claim the oldest queued job for processing.
Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances
(future multi-tenant deployment) don't fight over the same row. For
single-sidecar deployments this is just a clean atomic transition.
Returns the full job row (including payload) or None if queue is empty.
"""
pg = _pg()
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"""
WITH next_job AS (
SELECT job_id
FROM graphiti_jobs
WHERE status = 'queued'
ORDER BY enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE graphiti_jobs g
SET status = 'running', started_at = NOW()
FROM next_job
WHERE g.job_id = next_job.job_id
RETURNING g.job_id, g.job_type, g.payload
"""
)
row = cur.fetchone()
pg.commit()
pg.close()
if row is None:
return None
return {
"job_id": str(row["job_id"]),
"job_type": row["job_type"],
"payload": row["payload"], # already a dict via JSONB
}
def _job_complete(job_id: str, summary: dict) -> None:
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'committed', finished_at = NOW(), summary = %s::jsonb
WHERE job_id = %s
""",
(json.dumps(summary), job_id),
)
pg.commit()
pg.close()
def _job_fail(job_id: str, error: str) -> None:
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'failed', finished_at = NOW(), error = %s
WHERE job_id = %s
""",
(error[:2000], job_id), # truncate to keep error column reasonable
)
pg.commit()
pg.close()
def _startup_recovery() -> int:
"""Reset any 'running' jobs to 'queued' on startup.
Rationale: if the sidecar died while processing a job, that row is
stuck in 'running' with no process advancing it. The right behavior
on restart is to retry. graphiti-core's add_episode_bulk and
add_episode are idempotent against the graph (dedup handles duplicate
submission), so re-running a job is safe — at worst, a second run
incurs API spend on resolve calls that no-op against an already-
committed entity set.
Returns the count of recovered jobs.
"""
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'queued', started_at = NULL
WHERE status = 'running'
"""
)
count = cur.rowcount
pg.commit()
pg.close()
return count
# ---------------------------------------------------------------------------
# Background worker — single asyncio task running for the sidecar lifetime.
# Processes one job at a time. No concurrency. Restart recovery is handled
# by _startup_recovery() before this task starts.
# ---------------------------------------------------------------------------
async def background_worker():
"""Serial job processor. Polls graphiti_jobs, processes one at a time."""
log.info("Background worker started")
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
while True:
try:
claimed = _job_claim_next()
if claimed is None:
await asyncio.sleep(WORKER_POLL_INTERVAL)
continue
job_id = claimed["job_id"]
job_type = claimed["job_type"]
payload = claimed["payload"]
log.info(f"Processing job {job_id} (type={job_type})")
start = datetime.now()
try:
if job_type == "bulk":
summary = await _process_bulk_job(payload, EpisodeType, RawEpisode)
elif job_type == "single":
summary = await _process_single_job(payload, EpisodeType)
else:
raise ValueError(f"Unknown job_type: {job_type}")
duration = (datetime.now() - start).total_seconds()
summary["duration_seconds"] = duration
_job_complete(job_id, summary)
log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}")
except Exception as e:
duration = (datetime.now() - start).total_seconds()
err = f"{type(e).__name__}: {e}"
log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}")
_job_fail(job_id, err)
except asyncio.CancelledError:
log.info("Background worker cancelled")
raise
except Exception as e:
# Defensive: don't let the worker loop die from an unexpected error.
# Log it, sleep briefly, continue.
log.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
await asyncio.sleep(5.0)
async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict:
"""Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest."""
raw_episodes = []
for ep in payload["episodes"]:
ref_time = (
datetime.fromisoformat(ep["timestamp"])
if ep.get("timestamp") else datetime.now()
)
raw_episodes.append(RawEpisode(
name=ep["name"],
content=ep["content"],
source_description=ep.get("source_description", ""),
source=EpisodeType.text,
reference_time=ref_time,
))
kwargs = dict(
bulk_episodes=raw_episodes,
group_id=payload.get("group_id") or GROUP_ID,
saga=payload.get("saga"),
)
if payload.get("custom_extraction_instructions") is not None:
kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"]
result = await graphiti_instance.add_episode_bulk(**kwargs)
return {
"type": "bulk",
"episodes": len(result.episodes) if result and result.episodes else len(raw_episodes),
"nodes": len(result.nodes) if result and result.nodes else 0,
"edges": len(result.edges) if result and result.edges else 0,
}
async def _process_single_job(payload: dict, EpisodeType) -> dict:
"""Run add_episode for a 'single' job. Payload mirrors EpisodeRequest."""
ref_time = (
datetime.fromisoformat(payload["timestamp"])
if payload.get("timestamp") else datetime.now()
)
kwargs = dict(
name=payload["name"],
episode_body=payload["content"],
source=EpisodeType.text,
reference_time=ref_time,
source_description=payload.get("source_description", ""),
group_id=payload.get("group_id") or GROUP_ID,
custom_extraction_instructions=payload.get("custom_extraction_instructions"),
)
if payload.get("saga") is not None:
kwargs["saga"] = payload["saga"]
await graphiti_instance.add_episode(**kwargs)
return {"type": "single", "episodes": 1}
# ---------------------------------------------------------------------------
# Lifespan & app
# ---------------------------------------------------------------------------
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
global graphiti_instance global graphiti_instance, worker_task
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts")) sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...") log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder from st_embedder import SentenceTransformerEmbedder
@@ -75,11 +385,51 @@ async def lifespan(app: FastAPI):
max_coroutines=2, max_coroutines=2,
) )
await graphiti_instance.build_indices_and_constraints() await graphiti_instance.build_indices_and_constraints()
# PATCHED 2026-05-02: bridge the per-driver SearchOperations to the
# search_interface attribute that search_utils.py dispatches on.
# graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops
# but never assigns it to driver.search_interface — naming mismatch
# between the two halves of the codebase. Without this, search_utils.py
# falls through to interpreted-Cypher cosine math (full-table scan) even
# when our patched FalkorSearchOperations exists. Setting search_interface
# activates the per-driver vector-index path.
if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None:
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}") log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
# Recover any jobs left 'running' from a previous sidecar instance.
# They become 'queued' again and the background worker picks them up.
recovered = _startup_recovery()
if recovered > 0:
log.info(f"Startup recovery: reset {recovered} running job(s) to queued")
# Start the background job worker.
worker_task = asyncio.create_task(background_worker())
log.info("Sidecar ready — accepting job submissions on :8001")
yield yield
# Shutdown: cancel worker, close graphiti.
if worker_task is not None:
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
await graphiti_instance.close() await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan)
# ---------------------------------------------------------------------------
# Request models — preserved from v1.0 with no payload-shape changes. The
# only API change is the response shape: instead of blocking until
# graphiti-core returns, submission endpoints return a job_id immediately.
# ---------------------------------------------------------------------------
class BulkEpisodeItem(BaseModel): class BulkEpisodeItem(BaseModel):
name: str name: str
@@ -92,11 +442,6 @@ class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem] episodes: list[BulkEpisodeItem]
group_id: str | None = None group_id: str | None = None
saga: str | None = None saga: str | None = None
# Batch-level extraction guidance. graphiti-core inserts this into the
# entity-extraction and edge-extraction prompts only — NOT into dedup
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
custom_extraction_instructions: str | None = None custom_extraction_instructions: str | None = None
@@ -109,72 +454,76 @@ class EpisodeRequest(BaseModel):
custom_extraction_instructions: str | None = None custom_extraction_instructions: str | None = None
saga: str | None = None saga: str | None = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health") @app.get("/health")
async def health(): async def health():
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID} return {
"ok": True,
"provider": LLM_PROVIDER,
"group": GROUP_ID,
"sidecar": SIDECAR_NAME,
"version": "2.0",
}
@app.post("/episodes")
async def add_episode(req: EpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
kwargs = dict(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
reference_time=ref_time,
source_description=req.source_description,
group_id=req.group_id or GROUP_ID,
custom_extraction_instructions=req.custom_extraction_instructions,
)
# Saga is supported on graphiti-core add_episode but kept optional
# so older callers don't need to know about it.
if req.saga is not None:
kwargs["saga"] = req.saga
await g.add_episode(**kwargs)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/episodes/bulk") @app.post("/episodes/bulk")
async def add_episodes_bulk(req: BulkEpisodeRequest): async def submit_bulk(req: BulkEpisodeRequest):
g = await get_graphiti() """Submit a bulk ingest job. Returns job_id for polling.
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode Job is processed serially by the sidecar's background worker; one
raw_episodes = [] bulk-or-single job at a time per graph. No HTTP read-timeout
for ep in req.episodes: blocking. Submitter polls GET /jobs/{job_id} until terminal status.
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now() """
raw_episodes.append(RawEpisode( if graphiti_instance is None:
name=ep.name, raise HTTPException(status_code=503, detail="Graphiti not initialized")
content=ep.content,
source_description=ep.source_description, job_id = str(uuid.uuid4())
source=EpisodeType.text, payload = req.model_dump()
reference_time=ref_time,
))
try: try:
kwargs = dict( _job_insert(job_id, "bulk", payload)
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
saga=req.saga or None,
)
# Pass-through only when set, so callers that don't supply
# instructions get graphiti-core's default behavior unchanged.
if req.custom_extraction_instructions is not None:
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
result = await g.add_episode_bulk(**kwargs)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e: except Exception as e:
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}") log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
return {"job_id": job_id, "status": "queued"}
@app.post("/episodes")
async def submit_single(req: EpisodeRequest):
"""Submit a single-episode ingest job. Returns job_id for polling."""
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
job_id = str(uuid.uuid4())
payload = req.model_dump()
try:
_job_insert(job_id, "single", payload)
except Exception as e:
log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
return {"job_id": job_id, "status": "queued"}
@app.get("/jobs/{job_id}")
async def get_job(job_id: str):
"""Poll a job's status. Returns 404 if job not found."""
job = _job_get(job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return job
@app.get("/search") @app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None): async def search(query: str, limit: int = 8, group_id: str | None = None):
g = await get_graphiti() if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
try: try:
results = await g.search( results = await graphiti_instance.search(
query=query, query=query,
num_results=limit, num_results=limit,
group_ids=[group_id or GROUP_ID], group_ids=[group_id or GROUP_ID],
@@ -195,6 +544,7 @@ async def search(query: str, limit: int = 8, group_id: str | None = None):
log.error(f"Search failed: {e}\n{traceback.format_exc()}") log.error(f"Search failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info") uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")
+7
View File
@@ -220,8 +220,15 @@ def enqueue_stage3(pg, source, full_text, orientation, metadata,
supersedes_prior_state = EXCLUDED.supersedes_prior_state, supersedes_prior_state = EXCLUDED.supersedes_prior_state,
state_type_rationale = EXCLUDED.state_type_rationale, state_type_rationale = EXCLUDED.state_type_rationale,
enqueued_at = NOW(), enqueued_at = NOW(),
-- Reset all run-state fields on re-enqueue. Without this,
-- stale started_at from a prior attempt makes the row
-- invisible to the Stage 3 worker's claim filter (which
-- typically uses started_at IS NULL).
started_at = NULL,
completed_at = NULL, completed_at = NULL,
failed_at = NULL, failed_at = NULL,
failure_reason = NULL,
external_job_id = NULL,
attempts = 0 attempts = 0
""", (source, full_text, orientation, json.dumps(metadata), """, (source, full_text, orientation, json.dumps(metadata),
state_type, state_type_confidence, supersedes_prior_state, state_type, state_type_confidence, supersedes_prior_state,
+118 -6
View File
@@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
+ Encoder Instructions (v1.0)
Polls stage_3_queue, routes each row to one of two ingest pathways based on Polls stage_3_queue, routes each row to one of two ingest pathways based on
state-type classification produced by Stage 2: state-type classification produced by Stage 2:
@@ -12,6 +13,18 @@ state-type classification produced by Stage 2:
confidence in {medium, high}. Per-chunk POST to /episodes with shared confidence in {medium, high}. Per-chunk POST to /episodes with shared
saga tag, full edge invalidation, per-chunk timeout/retry independence. saga tag, full edge invalidation, per-chunk timeout/retry independence.
Both pathways pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar via
custom_extraction_instructions, which graphiti-core inserts into entity
and edge extraction prompts (NOT dedup prompts — that's intentional under
the encoder-stays-naive commitment).
Architectural posture: the encoder is content-naïve. It does not draw on
prior knowledge of the user, the substrate, or the cycle's accumulated
work. Schema and personality live in the cycle's consolidated substrate,
where the dream phase shapes them. The encoder produces source-grounded
ground truth for the cycle to work from. See EXTRACTION_INSTRUCTIONS_V1
below for the extraction guidance text.
Routing rationale: the single-episode pathway is the correct API per Routing rationale: the single-episode pathway is the correct API per
graphiti-core's docs for content that supersedes prior facts (it does graphiti-core's docs for content that supersedes prior facts (it does
edge invalidation that bulk skips). It costs more per chunk because of edge invalidation that bulk skips). It costs more per chunk because of
@@ -67,7 +80,7 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
RETRY_ATTEMPTS = 2 RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5 POLL_INTERVAL = 5
INGEST_TIMEOUT = 600 INGEST_TIMEOUT = 600
WORKER_VERSION = "2.3" WORKER_VERSION = "2.4"
# Match Stage 1 chunking parameters # Match Stage 1 chunking parameters
CHUNK_SIZE_WORDS = 500 CHUNK_SIZE_WORDS = 500
@@ -84,6 +97,87 @@ MAX_CHUNKS_PER_SAGA = 10
# the expensive pathway. # the expensive pathway.
HIGH_TRUST_CONFIDENCE = ("medium", "high") HIGH_TRUST_CONFIDENCE = ("medium", "high")
# Encoder extraction guidance v1.0 — see module docstring for posture rationale.
# Passed to graphiti-core via custom_extraction_instructions on both ingest
# pathways. Inserted into entity-extraction and edge-extraction prompts only;
# does NOT enter dedup prompts. Encoder-stays-naïve commitment is structural,
# not versioned: this text gets refined over time but the encoder does not
# acquire substrate context as the cycle matures.
EXTRACTION_INSTRUCTIONS_V1 = """\
EXTRACTION GUIDANCE — BirdAI cascade
The encoder's job is faithful capture from this chunk's text. It does
not draw on prior knowledge of the user, the substrate, or the cycle's
accumulated work. Schema, personality, and inferred context live in
the cycle's consolidated substrate, where the dream phase shapes them
through prediction-error replay and speculation. The encoder stays
content-naïve so the cycle has source-grounded ground truth to work
from.
The orientation produced by an upstream pass describes content shape,
not content interpretation. Use it as forward-facing guidance for what
to attend to in this document. Do not let it bound or limit what you
extract.
PREDICATE NAMING
Produce semantic predicates that describe the actual relationship the
text states. Use verbs or verb phrases — "wrote", "advised", "founded",
"works at", "led to", "contradicts", "is autobiographical to" — not
generic placeholders. Reserve generic forms (for example, "relates to"
or "mentions") for cases where the text genuinely does not specify a
more particular relationship. The verb is the load-bearing part of
the fact; preserving it is what makes the relationship queryable later.
EXTRACTION POSTURE
Extract from this chunk's text as if each entity is encountered fresh.
Do not try to reconcile entities you find here with entities that
might already exist elsewhere in the graph. Redundant entity instances
are acceptable. Cross-document entity resolution is downstream cycle
work, not extraction work.
When the same entity appears multiple times within this chunk with
slightly different spellings — a common artifact of voice transcription —
prefer the more frequent or more canonical-looking form. Do not invent
canonical forms; choose among the variants the text actually contains.
EXTRACT FROM THE SOURCE
Extract relationships the text states or strongly implies through
direct linguistic markers ("X led to Y", "X works for Y", "X met Y at
Z"). Do not extend extraction to relationships the text neither states
nor directly implies. Inferred relationships are produced by the
cycle's dream phase as speculative edges with explicit low-confidence
tagging, where they can be evaluated and either ratified or pruned by
subsequent cycle work. Encoding-time inference, mixed in with source-
grounded extraction, would lose the speculation/source distinction the
cycle's consolidation work relies on.
DO NOT PRE-EMPT CYCLE WORK
Do not omit relationships because they seem redundant with prior
extractions or with the existing graph. Cross-document entity
resolution and edge consolidation are downstream cycle operations;
redundant extraction at this stage is intentional. Extracting the
same fact from multiple sources gives the cycle's consolidation work
the recurrence signal it relies on.
EXTRACTION DEPTH
Use the orientation's frame_relationships and extraction_orientation
fields to inform what to attend to. If the orientation describes
cross-domain relational content, look for relationships that bridge
those domains explicitly, with named predicates for the bridging.
If the orientation describes single-domain technical content, look
for the structural relationships internal to that domain.
Extract every entity and every relationship the text states. Do not
summarize, do not filter, do not omit content because it seems
incidental. The orientation tells you what to look for; the source
text tells you what is there.
"""
def get_pg(): def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
@@ -193,6 +287,8 @@ def ingest_bulk(source, full_text, orientation):
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of - Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
tag so Graphiti links them as one document unit tag so Graphiti links them as one document unit
All three sub-paths pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
""" """
char_length = len(full_text) char_length = len(full_text)
@@ -204,7 +300,11 @@ def ingest_bulk(source, full_text, orientation):
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
}] }]
log.info(f" [bulk] Single episode ({char_length} chars)") log.info(f" [bulk] Single episode ({char_length} chars)")
return post_bulk({"episodes": episodes, "group_id": "aaron"}) return post_bulk({
"episodes": episodes,
"group_id": "aaron",
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
})
chunks = chunk_text(full_text) chunks = chunk_text(full_text)
total_chunks = len(chunks) total_chunks = len(chunks)
@@ -220,9 +320,12 @@ def ingest_bulk(source, full_text, orientation):
for i, chunk in enumerate(chunks) for i, chunk in enumerate(chunks)
] ]
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)") log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
return post_bulk( return post_bulk({
{"episodes": episodes, "group_id": "aaron", "saga": source} "episodes": episodes,
) "group_id": "aaron",
"saga": source,
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
})
# Large document: split into batches sharing the same saga tag # Large document: split into batches sharing the same saga tag
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
@@ -247,7 +350,12 @@ def ingest_bulk(source, full_text, orientation):
batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})" batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})"
log.info(f" {batch_label} starting") log.info(f" {batch_label} starting")
last_result = post_bulk( last_result = post_bulk(
{"episodes": episodes, "group_id": "aaron", "saga": source}, {
"episodes": episodes,
"group_id": "aaron",
"saga": source,
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
},
batch_label=batch_label, batch_label=batch_label,
) )
log.info(f" {batch_label} committed") log.info(f" {batch_label} committed")
@@ -261,6 +369,8 @@ def ingest_single_episode(row_id, source, full_text, orientation):
with shared saga tag. Each call independent: own timeout, own retry with shared saga tag. Each call independent: own timeout, own retry
envelope, own failure semantics. envelope, own failure semantics.
Each chunk POST passes EXTRACTION_INSTRUCTIONS_V1 to the sidecar.
Partial-success behavior: if chunk N of total fails, chunks 1..N-1 Partial-success behavior: if chunk N of total fails, chunks 1..N-1
stay committed (graphiti has already accepted them) and the function stay committed (graphiti has already accepted them) and the function
raises with detail about which chunk failed and how many succeeded. raises with detail about which chunk failed and how many succeeded.
@@ -281,6 +391,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
"source_description": orientation, "source_description": orientation,
"group_id": "aaron", "group_id": "aaron",
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
} }
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)") log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
return post_episode(payload, episode_label="single-ep") return post_episode(payload, episode_label="single-ep")
@@ -302,6 +413,7 @@ def ingest_single_episode(row_id, source, full_text, orientation):
"group_id": "aaron", "group_id": "aaron",
"saga": source, "saga": source,
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1,
} }
try: try:
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}") post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")