Files
aaronAI/scripts/graphiti_service.py
T
aaron f645b74b1c graphiti_service: v2.0 — Pattern 1 async job model + search_interface bridge
Major rewrite of the Graphiti sidecar. Two architectural changes:

PATTERN 1 ASYNC JOB MODEL

Submission and completion are decoupled. POST /episodes and
POST /episodes/bulk return job_id immediately; the actual graphiti-core
work happens in a background asyncio task. Submitters poll
GET /jobs/{job_id} until terminal status (committed | failed).

Why: tonight's smoke test confirmed that bulk ingest against the
4,222-entity graph was committing successfully even when the worker's
HTTP read-timeout fired. The synchronous interface was producing
false-negative failures — work succeeded but the worker stopped
listening at the 10-minute read-timeout. Three days of 'saga deadlock'
failures reframe as scaling pathology of unindexed similarity search,
not substrate deadlocks. Pattern 1 separates submission from completion
observation so the worker can't false-negative this way.

Architectural commitments:

- One in-flight job per sidecar (per graph). Concurrent jobs against
  the same graph would race on graphiti-core's bulk-resolve path (no
  transaction boundary). Concurrent multi-tenancy is 'run multiple
  sidecars,' not 'make one sidecar concurrency-safe across graphs.'

- Postgres-backed job state. Survives sidecar restart. On startup the
  sidecar resets any 'running' rows to 'queued' (their previous run
  died); the background worker picks them up naturally.

- Both endpoints async-shaped for parity. Bulk pathway preserved —
  load-bearing for first-run corpus migration. Single-episode
  preserved — load-bearing for state-superseding content per the
  Stage 2/3 routing rule. graphiti-core's add_episode and
  add_episode_bulk are unchanged underneath; the async wrapper sits
  between the HTTP layer and the library call.

- Polling cadence: 2s flat at the worker, FOR UPDATE SKIP LOCKED so
  the design is safe for future multi-sidecar deployment without
  changes.

Postgres helpers (_pg, _job_insert, _job_get, _job_claim_next,
_job_complete, _job_fail, _startup_recovery) replace the synchronous
graphiti.add_episode call with persistent job state. Background worker
loop catches everything, logs everything, never dies from an unexpected
error.

SEARCH_INTERFACE BRIDGE

graphiti-core 0.29.0 builds FalkorSearchOperations as
driver._search_ops in FalkorDriver.__init__ but never assigns it to
driver.search_interface. search_utils.py:edge_similarity_search and
node_similarity_search check 'if driver.search_interface:' and
delegate when present, falling through to interpreted-Cypher cosine
math when not. The naming mismatch between the two halves of
graphiti-core means the per-driver implementation never gets used.

Bridge after Graphiti instance construction:
  driver.search_interface = driver._search_ops

This activates the per-driver path which (with our vendored patches)
uses db.idx.vector.queryNodes for FalkorDB's native vector index.
Empirical result: single-episode add_episode against a 4,277-entity
graph went from indefinite hang to 8.2 seconds.

The bridge is also a candidate for an upstream PR — pick one name and
stick to it across the codebase. Tonight it's local.
2026-05-02 05:19:46 +00:00

551 lines
20 KiB
Python

"""
Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model)
Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest
submission and completion are decoupled. Submitters POST to /episodes or
/episodes/bulk and receive a job_id; an in-process background worker
processes jobs serially against the graph; submitters poll GET /jobs/{id}
until terminal status.
Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk
ingest against the 4,222-entity graph commits successfully even when the
worker's HTTP read-timeout fires. The synchronous interface was producing
false-negative failures — work succeeded but the worker stopped listening.
Pattern 1 separates submission from completion observation so the worker
can't false-negative this way.
Architectural commitments:
- One in-flight job per sidecar (per graph). Concurrent jobs against the
same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk
(no transaction boundary, no internal coordination). Concurrent
multi-tenancy is "run multiple sidecars," not "make one sidecar
concurrency-safe across graphs."
- Postgres-backed job state. Survives sidecar restart. On startup the
sidecar resets any 'running' rows to 'queued' (their previous run died);
the background worker picks them up naturally.
- Both /episodes and /episodes/bulk are async-shaped for parity. graphiti-
core operations underneath (add_episode, add_episode_bulk) are unchanged.
- The bulk pathway is preserved — load-bearing for first-run corpus
migration. Single-episode is preserved — load-bearing for state-
superseding content per the Stage 2/3 routing rule.
Port 8001 (internal only). No OpenAI dependency.
"""
import os, logging, sys, asyncio, traceback, uuid, json
from contextlib import asynccontextmanager
from datetime import datetime
from pathlib import Path
import psycopg2
import psycopg2.extras
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
load_dotenv(Path.home() / "aaronai" / ".env")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/var/log/aaronai/graphiti-sidecar.log"),
logging.StreamHandler(),
]
)
log = logging.getLogger("graphiti-sidecar")
GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost")
FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379"))
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic")
LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6")
LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY")
PG_DSN = os.getenv("PG_DSN")
SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1")
os.environ["EMBEDDING_DIM"] = "384"
# Background worker configuration. Polls Postgres for queued jobs every
# WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design;
# no concurrency primitive beyond the serial loop. The sleep is brief
# enough to feel responsive but long enough to avoid burning CPU on an
# empty queue.
WORKER_POLL_INTERVAL = 2.0
def get_llm_client():
from graphiti_core.llm_client.config import LLMConfig
config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL)
if LLM_PROVIDER == "anthropic":
from graphiti_core.llm_client.anthropic_client import AnthropicClient
return AnthropicClient(config)
elif LLM_PROVIDER == "openai":
from graphiti_core.llm_client.openai_client import OpenAIClient
return OpenAIClient(config)
elif LLM_PROVIDER == "gemini":
from graphiti_core.llm_client.gemini_client import GeminiClient
return GeminiClient(config)
elif LLM_PROVIDER == "groq":
from graphiti_core.llm_client.groq_client import GroqClient
return GroqClient(config)
raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}")
graphiti_instance = None
worker_task = None
# ---------------------------------------------------------------------------
# Postgres job-state helpers. Synchronous psycopg2 calls inside async
# functions: each call opens a fresh connection, runs one statement, closes.
# Acceptable here because traffic is low (single-digit jobs/min steady state)
# and the simplicity is worth more than connection pooling. If this ever
# becomes a bottleneck, swap to asyncpg or psycopg3 async.
# ---------------------------------------------------------------------------
def _pg():
return psycopg2.connect(PG_DSN)
def _job_insert(job_id: str, job_type: str, payload: dict) -> None:
"""Write a new job row in 'queued' status."""
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by)
VALUES (%s, %s, %s::jsonb, 'queued', %s)
""",
(job_id, job_type, json.dumps(payload), SIDECAR_NAME),
)
pg.commit()
pg.close()
def _job_get(job_id: str) -> dict | None:
"""Read a single job by id. Returns None if not found."""
pg = _pg()
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"""
SELECT job_id, job_type, status, enqueued_at, started_at, finished_at,
error, summary, submitted_by
FROM graphiti_jobs
WHERE job_id = %s
""",
(job_id,),
)
row = cur.fetchone()
pg.close()
if row is None:
return None
# Convert UUID, datetimes for JSON serialization
return {
"job_id": str(row["job_id"]),
"job_type": row["job_type"],
"status": row["status"],
"enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None,
"started_at": row["started_at"].isoformat() if row["started_at"] else None,
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else None,
"error": row["error"],
"summary": row["summary"],
"submitted_by": row["submitted_by"],
}
def _job_claim_next() -> dict | None:
"""Atomically claim the oldest queued job for processing.
Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances
(future multi-tenant deployment) don't fight over the same row. For
single-sidecar deployments this is just a clean atomic transition.
Returns the full job row (including payload) or None if queue is empty.
"""
pg = _pg()
cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
cur.execute(
"""
WITH next_job AS (
SELECT job_id
FROM graphiti_jobs
WHERE status = 'queued'
ORDER BY enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE graphiti_jobs g
SET status = 'running', started_at = NOW()
FROM next_job
WHERE g.job_id = next_job.job_id
RETURNING g.job_id, g.job_type, g.payload
"""
)
row = cur.fetchone()
pg.commit()
pg.close()
if row is None:
return None
return {
"job_id": str(row["job_id"]),
"job_type": row["job_type"],
"payload": row["payload"], # already a dict via JSONB
}
def _job_complete(job_id: str, summary: dict) -> None:
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'committed', finished_at = NOW(), summary = %s::jsonb
WHERE job_id = %s
""",
(json.dumps(summary), job_id),
)
pg.commit()
pg.close()
def _job_fail(job_id: str, error: str) -> None:
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'failed', finished_at = NOW(), error = %s
WHERE job_id = %s
""",
(error[:2000], job_id), # truncate to keep error column reasonable
)
pg.commit()
pg.close()
def _startup_recovery() -> int:
"""Reset any 'running' jobs to 'queued' on startup.
Rationale: if the sidecar died while processing a job, that row is
stuck in 'running' with no process advancing it. The right behavior
on restart is to retry. graphiti-core's add_episode_bulk and
add_episode are idempotent against the graph (dedup handles duplicate
submission), so re-running a job is safe — at worst, a second run
incurs API spend on resolve calls that no-op against an already-
committed entity set.
Returns the count of recovered jobs.
"""
pg = _pg()
cur = pg.cursor()
cur.execute(
"""
UPDATE graphiti_jobs
SET status = 'queued', started_at = NULL
WHERE status = 'running'
"""
)
count = cur.rowcount
pg.commit()
pg.close()
return count
# ---------------------------------------------------------------------------
# Background worker — single asyncio task running for the sidecar lifetime.
# Processes one job at a time. No concurrency. Restart recovery is handled
# by _startup_recovery() before this task starts.
# ---------------------------------------------------------------------------
async def background_worker():
"""Serial job processor. Polls graphiti_jobs, processes one at a time."""
log.info("Background worker started")
from graphiti_core.nodes import EpisodeType
from graphiti_core.utils.bulk_utils import RawEpisode
while True:
try:
claimed = _job_claim_next()
if claimed is None:
await asyncio.sleep(WORKER_POLL_INTERVAL)
continue
job_id = claimed["job_id"]
job_type = claimed["job_type"]
payload = claimed["payload"]
log.info(f"Processing job {job_id} (type={job_type})")
start = datetime.now()
try:
if job_type == "bulk":
summary = await _process_bulk_job(payload, EpisodeType, RawEpisode)
elif job_type == "single":
summary = await _process_single_job(payload, EpisodeType)
else:
raise ValueError(f"Unknown job_type: {job_type}")
duration = (datetime.now() - start).total_seconds()
summary["duration_seconds"] = duration
_job_complete(job_id, summary)
log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}")
except Exception as e:
duration = (datetime.now() - start).total_seconds()
err = f"{type(e).__name__}: {e}"
log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}")
_job_fail(job_id, err)
except asyncio.CancelledError:
log.info("Background worker cancelled")
raise
except Exception as e:
# Defensive: don't let the worker loop die from an unexpected error.
# Log it, sleep briefly, continue.
log.error(f"Worker loop error: {e}\n{traceback.format_exc()}")
await asyncio.sleep(5.0)
async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict:
"""Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest."""
raw_episodes = []
for ep in payload["episodes"]:
ref_time = (
datetime.fromisoformat(ep["timestamp"])
if ep.get("timestamp") else datetime.now()
)
raw_episodes.append(RawEpisode(
name=ep["name"],
content=ep["content"],
source_description=ep.get("source_description", ""),
source=EpisodeType.text,
reference_time=ref_time,
))
kwargs = dict(
bulk_episodes=raw_episodes,
group_id=payload.get("group_id") or GROUP_ID,
saga=payload.get("saga"),
)
if payload.get("custom_extraction_instructions") is not None:
kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"]
result = await graphiti_instance.add_episode_bulk(**kwargs)
return {
"type": "bulk",
"episodes": len(result.episodes) if result and result.episodes else len(raw_episodes),
"nodes": len(result.nodes) if result and result.nodes else 0,
"edges": len(result.edges) if result and result.edges else 0,
}
async def _process_single_job(payload: dict, EpisodeType) -> dict:
"""Run add_episode for a 'single' job. Payload mirrors EpisodeRequest."""
ref_time = (
datetime.fromisoformat(payload["timestamp"])
if payload.get("timestamp") else datetime.now()
)
kwargs = dict(
name=payload["name"],
episode_body=payload["content"],
source=EpisodeType.text,
reference_time=ref_time,
source_description=payload.get("source_description", ""),
group_id=payload.get("group_id") or GROUP_ID,
custom_extraction_instructions=payload.get("custom_extraction_instructions"),
)
if payload.get("saga") is not None:
kwargs["saga"] = payload["saga"]
await graphiti_instance.add_episode(**kwargs)
return {"type": "single", "episodes": 1}
# ---------------------------------------------------------------------------
# Lifespan & app
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
global graphiti_instance, worker_task
sys.path.insert(0, str(Path.home() / "aaronai" / "scripts"))
log.info("Loading embedding and reranker models...")
from st_embedder import SentenceTransformerEmbedder
from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient
from graphiti_core.driver.falkordb_driver import FalkorDriver
from graphiti_core import Graphiti
log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...")
graphiti_instance = Graphiti(
llm_client=get_llm_client(),
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"),
max_coroutines=2,
)
await graphiti_instance.build_indices_and_constraints()
# PATCHED 2026-05-02: bridge the per-driver SearchOperations to the
# search_interface attribute that search_utils.py dispatches on.
# graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops
# but never assigns it to driver.search_interface — naming mismatch
# between the two halves of the codebase. Without this, search_utils.py
# falls through to interpreted-Cypher cosine math (full-table scan) even
# when our patched FalkorSearchOperations exists. Setting search_interface
# activates the per-driver vector-index path.
if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None:
graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops
log.info("Wired driver.search_interface = driver._search_ops (vector index path active)")
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
# Recover any jobs left 'running' from a previous sidecar instance.
# They become 'queued' again and the background worker picks them up.
recovered = _startup_recovery()
if recovered > 0:
log.info(f"Startup recovery: reset {recovered} running job(s) to queued")
# Start the background job worker.
worker_task = asyncio.create_task(background_worker())
log.info("Sidecar ready — accepting job submissions on :8001")
yield
# Shutdown: cancel worker, close graphiti.
if worker_task is not None:
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
await graphiti_instance.close()
app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan)
# ---------------------------------------------------------------------------
# Request models — preserved from v1.0 with no payload-shape changes. The
# only API change is the response shape: instead of blocking until
# graphiti-core returns, submission endpoints return a job_id immediately.
# ---------------------------------------------------------------------------
class BulkEpisodeItem(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
saga: str | None = None
custom_extraction_instructions: str | None = None
class EpisodeRequest(BaseModel):
name: str
content: str
source_description: str = ""
timestamp: str | None = None
group_id: str | None = None
custom_extraction_instructions: str | None = None
saga: str | None = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health")
async def health():
return {
"ok": True,
"provider": LLM_PROVIDER,
"group": GROUP_ID,
"sidecar": SIDECAR_NAME,
"version": "2.0",
}
@app.post("/episodes/bulk")
async def submit_bulk(req: BulkEpisodeRequest):
"""Submit a bulk ingest job. Returns job_id for polling.
Job is processed serially by the sidecar's background worker; one
bulk-or-single job at a time per graph. No HTTP read-timeout
blocking. Submitter polls GET /jobs/{job_id} until terminal status.
"""
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
job_id = str(uuid.uuid4())
payload = req.model_dump()
try:
_job_insert(job_id, "bulk", payload)
except Exception as e:
log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
return {"job_id": job_id, "status": "queued"}
@app.post("/episodes")
async def submit_single(req: EpisodeRequest):
"""Submit a single-episode ingest job. Returns job_id for polling."""
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
job_id = str(uuid.uuid4())
payload = req.model_dump()
try:
_job_insert(job_id, "single", payload)
except Exception as e:
log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}")
return {"job_id": job_id, "status": "queued"}
@app.get("/jobs/{job_id}")
async def get_job(job_id: str):
"""Poll a job's status. Returns 404 if job not found."""
job = _job_get(job_id)
if job is None:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
return job
@app.get("/search")
async def search(query: str, limit: int = 8, group_id: str | None = None):
if graphiti_instance is None:
raise HTTPException(status_code=503, detail="Graphiti not initialized")
try:
results = await graphiti_instance.search(
query=query,
num_results=limit,
group_ids=[group_id or GROUP_ID],
)
return {
"results": [
{
"fact": r.fact,
"source": getattr(r, "source_node_uuid", ""),
"score": getattr(r, "score", 0),
"valid_at": str(getattr(r, "valid_at", "")),
"invalid_at": str(getattr(r, "invalid_at", "")),
}
for r in results
]
}
except Exception as e:
log.error(f"Search failed: {e}\n{traceback.format_exc()}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")