Files
aaronAI/scripts/graphiti_service.py
T
aaron 8b0a163670 graphiti_service: expose custom_extraction_instructions on /episodes/bulk; add saga on /episodes
- BulkEpisodeRequest: new optional custom_extraction_instructions field
  with comment noting graphiti-core inserts it into extract_nodes/extract_edges
  prompts only, NOT dedupe prompts (verified by reading prompts directory)
- EpisodeRequest: new optional saga field, plumbed through to add_episode
  for upcoming Stage 3 single-episode pathway
- Both handlers use conditional kwargs construction so existing callers
  see no behavioral change

Phase A item 1 of three. Items 2 (stage2_worker) and 3 (stage3_worker) follow.
2026-05-01 18:57:31 +00:00

201 lines
7.4 KiB
Python

"""
Aaron AI — Graphiti Sidecar Service
Wraps graphiti-core in a FastAPI service to avoid asyncio event loop conflicts.
Port 8001 (internal only). No OpenAI dependency.
"""
import os, logging, sys, traceback
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
load_dotenv(Path.home() / "aaronai" / ".env")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/var/log/aaronai/graphiti-sidecar.log"),
logging.StreamHandler(),
]
)
log = logging.getLogger("graphiti-sidecar")
GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
os.environ["EMBEDDING_DIM"] = "384"
def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
if LLM_PROVIDER == "anthropic":
from graphiti_core.llm_client.anthropic_client import AnthropicClient
return AnthropicClient(config)
elif LLM_PROVIDER == "openai":
from graphiti_core.llm_client.openai_client import OpenAIClient
return OpenAIClient(config)
elif LLM_PROVIDER == "gemini":
from graphiti_core.llm_client.gemini_client import GeminiClient
return GeminiClient(config)
elif LLM_PROVIDER == "groq":
from graphiti_core.llm_client.groq_client import GroqClient
return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
async def get_graphiti():
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
return graphiti_instance
@asynccontextmanager
async def lifespan(app: FastAPI):
global graphiti_instance
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder
from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core import Graphiti
log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...")
graphiti_instance = Graphiti(
llm_client=get_llm_client(),
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"),
max_coroutines=2,
)
await graphiti_instance.build_indices_and_constraints()
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
yield
await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar", lifespan=lifespan)
class BulkEpisodeItem(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
saga: str | None = None
# Batch-level extraction guidance. graphiti-core inserts this into the
# entity-extraction and edge-extraction prompts only — NOT into dedup
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
custom_extraction_instructions: str | None = None
class EpisodeRequest(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
group_id: str | None = None
custom_extraction_instructions: str | None = None
saga: str | None = None
@app.get("/health")
async def health():
return {"ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID}
@app.post("/episodes")
async def add_episode(req: EpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
kwargs = dict(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
reference_time=ref_time,
source_description=req.source_description,
group_id=req.group_id or GROUP_ID,
custom_extraction_instructions=req.custom_extraction_instructions,
)
# Saga is supported on graphiti-core add_episode but kept optional
# so older callers don't need to know about it.
if req.saga is not None:
kwargs["saga"] = req.saga
await g.add_episode(**kwargs)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/episodes/bulk")
async def add_episodes_bulk(req: BulkEpisodeRequest):
g = await get_graphiti()
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
raw_episodes = []
for ep in req.episodes:
ref_time = datetime.fromisoformat(ep.timestamp) if ep.timestamp else datetime.now()
raw_episodes.append(RawEpisode(
name=ep.name,
content=ep.content,
source_description=ep.source_description,
source=EpisodeType.text,
reference_time=ref_time,
))
try:
kwargs = dict(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
saga=req.saga or None,
)
# Pass-through only when set, so callers that don't supply
# instructions get graphiti-core's default behavior unchanged.
if req.custom_extraction_instructions is not None:
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
result = await g.add_episode_bulk(**kwargs)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None):
g = await get_graphiti()
try:
results = await g.search(
query=query,
num_results=limit,
group_ids=[group_id or GROUP_ID],
)
return {
"results": [
{
"fact": r.fact,
"source": getattr(r, "source_node_uuid", ""),
"score": getattr(r, "score", 0),
"valid_at": str(getattr(r, "valid_at", "")),
"invalid_at": str(getattr(r, "invalid_at", "")),
}
for r in results
]
}
except Exception as e:
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")