Files
aaronAI/scripts/graphiti_service.py
T
aaron 313c0f0341 graphiti_service.py: bridge driver._search_ops to driver.search_interface
graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops in
FalkorDriver.__init__ but never assigns it to driver.search_interface.
search_utils.py dispatches on search_interface; without this one-line bridge
it falls back to interpreted-Cypher cosine math doing full table scans for
every entity dedup similarity check.

Combined with the vendored patches in graphiti_patches/ (restored in the
previous commit d2ec20e), this activates FalkorDB's native vector index for
the dedup similarity path. Empirical impact (per the original f645b74 commit
message): single-episode add_episode against a ~4,277-entity graph went from
indefinite hang to ~8.2 seconds.

Surgical restore: cherry-picks only the bridge code from f645b74 — not the
Pattern 1 async job model, not the v2.4 extraction instructions, neither of
which we want. Default extraction posture (taxonomy-naïve) stays the
operating mode. Rich-extraction story remains a BirdAI concern.
2026-05-20 04:06:46 +00:00

195 lines
7.2 KiB
Python

"""
Aaron AI — Graphiti Sidecar Service
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
Port 8001 (internal only). No OpenAI dependency.
"""
import os, logging, sys, traceback
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
load_dotenv(Path.home() / "aaronai" / ".env")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/var/log/aaronai/graphiti-sidecar.log"),
logging.StreamHandler(),
]
)
log = logging.getLogger("graphiti-sidecar")
GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
os.environ["EMBEDDING_DIM"] = "384"
def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
if LLM_PROVIDER == "anthropic":
from graphiti_core.llm_client.anthropic_client import AnthropicClient
return AnthropicClient(config)
elif LLM_PROVIDER == "openai":
from graphiti_core.llm_client.openai_client import OpenAIClient
return OpenAIClient(config)
elif LLM_PROVIDER == "gemini":
from graphiti_core.llm_client.gemini_client import GeminiClient
return GeminiClient(config)
elif LLM_PROVIDER == "groq":
from graphiti_core.llm_client.groq_client import GroqClient
return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
async def get_graphiti():
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
return graphiti_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
global graphiti_instance
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder
from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core import Graphiti
log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...")
graphiti_instance = Graphiti(
llm_client=get_llm_client(),
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"),
max_coroutines=2,
)
await graphiti_instance.build_indices_and_constraints()
# Bridge driver._search_ops to driver.search_interface — graphiti-core 0.29.0
# builds FalkorSearchOperations as driver._search_ops in FalkorDriver.__init__
# but never assigns it to driver.search_interface. search_utils.py dispatches
# on driver.search_interface; without this assignment it falls back to
# interpreted-Cypher cosine math (full table scans). Together with the
# vendored patches in graphiti_patches/, this activates FalkorDB's native
# vector index for entity dedup similarity search.
if (hasattr(graphiti_instance.driver, "_search_ops")
and graphiti_instance.driver.search_interface is None):
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
yield
await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
class BulkEpisodeItem(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
saga: str | None = None
class EpisodeRequest(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
group_id: str | None = None
custom_extraction_instructions: str | None = None
@app.get("/health")
async def health():
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
@app.post("/episodes")
async def add_episode(req: EpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
await g.add_episode(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
reference_time=ref_time,
source_description=req.source_description,
group_id=req.group_id or GROUP_ID,
custom_extraction_instructions=req.custom_extraction_instructions,
)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/episodes/bulk")
async def add_episodes_bulk(req: BulkEpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
raw_episodes = []
for ep in req.episodes:
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
raw_episodes.append(RawEpisode(
name=ep.name,
content=ep.content,
source_description=ep.source_description,
source=EpisodeType.text,
reference_time=ref_time,
))
try:
result = await g.add_episode_bulk(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
saga=req.saga or None,
)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None):
g = await get_graphiti()
try:
results = await g.search(
query=query,
num_results=limit,
group_ids=[group_id or GROUP_ID],
)
return {
"results": [
{
"fact": r.fact,
"source": getattr(r, "source_node_uuid", ""),
"score": getattr(r, "score", 0),
"valid_at": str(getattr(r, "valid_at", "")),
"invalid_at": str(getattr(r, "invalid_at", "")),
}
for r in results
]
}
except Exception as e:
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")