""" Aaron AI — Graphiti Sidecar Service (v2.0 — Pattern 1 async job model) Wraps graphiti-core in a FastAPI service. Pattern 1 architecture: ingest submission and completion are decoupled. Submitters POST to /episodes or /episodes/bulk and receive a job_id; an in-process background worker processes jobs serially against the graph; submitters poll GET /jobs/{id} until terminal status. Why Pattern 1: tonight's smoke test (2026-05-02) confirmed that bulk ingest against the 4,222-entity graph commits successfully even when the worker's HTTP read-timeout fires. The synchronous interface was producing false-negative failures — work succeeded but the worker stopped listening. Pattern 1 separates submission from completion observation so the worker can't false-negative this way. Architectural commitments: - One in-flight job per sidecar (per graph). Concurrent jobs against the same graph would race on graphiti-core's _resolve_nodes_and_edges_bulk (no transaction boundary, no internal coordination). Concurrent multi-tenancy is "run multiple sidecars," not "make one sidecar concurrency-safe across graphs." - Postgres-backed job state. Survives sidecar restart. On startup the sidecar resets any 'running' rows to 'queued' (their previous run died); the background worker picks them up naturally. - Both /episodes and /episodes/bulk are async-shaped for parity. graphiti- core operations underneath (add_episode, add_episode_bulk) are unchanged. - The bulk pathway is preserved — load-bearing for first-run corpus migration. Single-episode is preserved — load-bearing for state- superseding content per the Stage 2/3 routing rule. Port 8001 (internal only). No OpenAI dependency. """ import os, logging, sys, asyncio, traceback, uuid, json from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path import psycopg2 import psycopg2.extras from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from pydantic import BaseModel load_dotenv(Path.home() / "aaronai" / ".env") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler("/var/log/aaronai/graphiti-sidecar.log"), logging.StreamHandler(), ] ) log = logging.getLogger("graphiti-sidecar") GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron") FALKORDB_HOST = os.getenv("FALKORDB_HOST", "localhost") FALKORDB_PORT = int(os.getenv("FALKORDB_PORT", "6379")) LLM_PROVIDER = os.getenv("LLM_PROVIDER", "anthropic") LLM_MODEL = os.getenv("LLM_MODEL", "claude-sonnet-4-6") LLM_API_KEY = os.getenv("LLM_API_KEY") or os.getenv("ANTHROPIC_API_KEY") PG_DSN = os.getenv("PG_DSN") SIDECAR_NAME = os.getenv("SIDECAR_NAME", "graphiti-sidecar-1") os.environ["EMBEDDING_DIM"] = "384" # Background worker configuration. Polls Postgres for queued jobs every # WORKER_POLL_INTERVAL seconds when idle. Single-job-at-a-time by design; # no concurrency primitive beyond the serial loop. The sleep is brief # enough to feel responsive but long enough to avoid burning CPU on an # empty queue. WORKER_POLL_INTERVAL = 2.0 def get_llm_client(): from graphiti_core.llm_client.config import LLMConfig config = LLMConfig(api_key=LLM_API_KEY, model=LLM_MODEL) if LLM_PROVIDER == "anthropic": from graphiti_core.llm_client.anthropic_client import AnthropicClient return AnthropicClient(config) elif LLM_PROVIDER == "openai": from graphiti_core.llm_client.openai_client import OpenAIClient return OpenAIClient(config) elif LLM_PROVIDER == "gemini": from graphiti_core.llm_client.gemini_client import GeminiClient return GeminiClient(config) elif LLM_PROVIDER == "groq": from graphiti_core.llm_client.groq_client import GroqClient return GroqClient(config) raise ValueError(f"Unsupported LLM provider: {LLM_PROVIDER}") graphiti_instance = None worker_task = None # --------------------------------------------------------------------------- # Postgres job-state helpers. Synchronous psycopg2 calls inside async # functions: each call opens a fresh connection, runs one statement, closes. # Acceptable here because traffic is low (single-digit jobs/min steady state) # and the simplicity is worth more than connection pooling. If this ever # becomes a bottleneck, swap to asyncpg or psycopg3 async. # --------------------------------------------------------------------------- def _pg(): return psycopg2.connect(PG_DSN) def _job_insert(job_id: str, job_type: str, payload: dict) -> None: """Write a new job row in 'queued' status.""" pg = _pg() cur = pg.cursor() cur.execute( """ INSERT INTO graphiti_jobs (job_id, job_type, payload, status, submitted_by) VALUES (%s, %s, %s::jsonb, 'queued', %s) """, (job_id, job_type, json.dumps(payload), SIDECAR_NAME), ) pg.commit() pg.close() def _job_get(job_id: str) -> dict | None: """Read a single job by id. Returns None if not found.""" pg = _pg() cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( """ SELECT job_id, job_type, status, enqueued_at, started_at, finished_at, error, summary, submitted_by FROM graphiti_jobs WHERE job_id = %s """, (job_id,), ) row = cur.fetchone() pg.close() if row is None: return None # Convert UUID, datetimes for JSON serialization return { "job_id": str(row["job_id"]), "job_type": row["job_type"], "status": row["status"], "enqueued_at": row["enqueued_at"].isoformat() if row["enqueued_at"] else None, "started_at": row["started_at"].isoformat() if row["started_at"] else None, "finished_at": row["finished_at"].isoformat() if row["finished_at"] else None, "error": row["error"], "summary": row["summary"], "submitted_by": row["submitted_by"], } def _job_claim_next() -> dict | None: """Atomically claim the oldest queued job for processing. Uses SELECT ... FOR UPDATE SKIP LOCKED so multiple sidecar instances (future multi-tenant deployment) don't fight over the same row. For single-sidecar deployments this is just a clean atomic transition. Returns the full job row (including payload) or None if queue is empty. """ pg = _pg() cur = pg.cursor(cursor_factory=psycopg2.extras.RealDictCursor) cur.execute( """ WITH next_job AS ( SELECT job_id FROM graphiti_jobs WHERE status = 'queued' ORDER BY enqueued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED ) UPDATE graphiti_jobs g SET status = 'running', started_at = NOW() FROM next_job WHERE g.job_id = next_job.job_id RETURNING g.job_id, g.job_type, g.payload """ ) row = cur.fetchone() pg.commit() pg.close() if row is None: return None return { "job_id": str(row["job_id"]), "job_type": row["job_type"], "payload": row["payload"], # already a dict via JSONB } def _job_complete(job_id: str, summary: dict) -> None: pg = _pg() cur = pg.cursor() cur.execute( """ UPDATE graphiti_jobs SET status = 'committed', finished_at = NOW(), summary = %s::jsonb WHERE job_id = %s """, (json.dumps(summary), job_id), ) pg.commit() pg.close() def _job_fail(job_id: str, error: str) -> None: pg = _pg() cur = pg.cursor() cur.execute( """ UPDATE graphiti_jobs SET status = 'failed', finished_at = NOW(), error = %s WHERE job_id = %s """, (error[:2000], job_id), # truncate to keep error column reasonable ) pg.commit() pg.close() def _startup_recovery() -> int: """Reset any 'running' jobs to 'queued' on startup. Rationale: if the sidecar died while processing a job, that row is stuck in 'running' with no process advancing it. The right behavior on restart is to retry. graphiti-core's add_episode_bulk and add_episode are idempotent against the graph (dedup handles duplicate submission), so re-running a job is safe — at worst, a second run incurs API spend on resolve calls that no-op against an already- committed entity set. Returns the count of recovered jobs. """ pg = _pg() cur = pg.cursor() cur.execute( """ UPDATE graphiti_jobs SET status = 'queued', started_at = NULL WHERE status = 'running' """ ) count = cur.rowcount pg.commit() pg.close() return count # --------------------------------------------------------------------------- # Background worker — single asyncio task running for the sidecar lifetime. # Processes one job at a time. No concurrency. Restart recovery is handled # by _startup_recovery() before this task starts. # --------------------------------------------------------------------------- async def background_worker(): """Serial job processor. Polls graphiti_jobs, processes one at a time.""" log.info("Background worker started") from graphiti_core.nodes import EpisodeType from graphiti_core.utils.bulk_utils import RawEpisode while True: try: claimed = _job_claim_next() if claimed is None: await asyncio.sleep(WORKER_POLL_INTERVAL) continue job_id = claimed["job_id"] job_type = claimed["job_type"] payload = claimed["payload"] log.info(f"Processing job {job_id} (type={job_type})") start = datetime.now() try: if job_type == "bulk": summary = await _process_bulk_job(payload, EpisodeType, RawEpisode) elif job_type == "single": summary = await _process_single_job(payload, EpisodeType) else: raise ValueError(f"Unknown job_type: {job_type}") duration = (datetime.now() - start).total_seconds() summary["duration_seconds"] = duration _job_complete(job_id, summary) log.info(f"Committed job {job_id} in {duration:.1f}s — {summary}") except Exception as e: duration = (datetime.now() - start).total_seconds() err = f"{type(e).__name__}: {e}" log.error(f"Job {job_id} failed after {duration:.1f}s: {err}\n{traceback.format_exc()}") _job_fail(job_id, err) except asyncio.CancelledError: log.info("Background worker cancelled") raise except Exception as e: # Defensive: don't let the worker loop die from an unexpected error. # Log it, sleep briefly, continue. log.error(f"Worker loop error: {e}\n{traceback.format_exc()}") await asyncio.sleep(5.0) async def _process_bulk_job(payload: dict, EpisodeType, RawEpisode) -> dict: """Run add_episode_bulk for a 'bulk' job. Payload mirrors BulkEpisodeRequest.""" raw_episodes = [] for ep in payload["episodes"]: ref_time = ( datetime.fromisoformat(ep["timestamp"]) if ep.get("timestamp") else datetime.now() ) raw_episodes.append(RawEpisode( name=ep["name"], content=ep["content"], source_description=ep.get("source_description", ""), source=EpisodeType.text, reference_time=ref_time, )) kwargs = dict( bulk_episodes=raw_episodes, group_id=payload.get("group_id") or GROUP_ID, saga=payload.get("saga"), ) if payload.get("custom_extraction_instructions") is not None: kwargs["custom_extraction_instructions"] = payload["custom_extraction_instructions"] result = await graphiti_instance.add_episode_bulk(**kwargs) return { "type": "bulk", "episodes": len(result.episodes) if result and result.episodes else len(raw_episodes), "nodes": len(result.nodes) if result and result.nodes else 0, "edges": len(result.edges) if result and result.edges else 0, } async def _process_single_job(payload: dict, EpisodeType) -> dict: """Run add_episode for a 'single' job. Payload mirrors EpisodeRequest.""" ref_time = ( datetime.fromisoformat(payload["timestamp"]) if payload.get("timestamp") else datetime.now() ) kwargs = dict( name=payload["name"], episode_body=payload["content"], source=EpisodeType.text, reference_time=ref_time, source_description=payload.get("source_description", ""), group_id=payload.get("group_id") or GROUP_ID, custom_extraction_instructions=payload.get("custom_extraction_instructions"), ) if payload.get("saga") is not None: kwargs["saga"] = payload["saga"] await graphiti_instance.add_episode(**kwargs) return {"type": "single", "episodes": 1} # --------------------------------------------------------------------------- # Lifespan & app # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): global graphiti_instance, worker_task sys.path.insert(0, str(Path.home() / "aaronai" / "scripts")) log.info("Loading embedding and reranker models...") from st_embedder import SentenceTransformerEmbedder from graphiti_core.cross_encoder.bge_reranker_client import BGERerankerClient from graphiti_core.driver.falkordb_driver import FalkorDriver from graphiti_core import Graphiti log.info(f"Connecting to FalkorDB at {FALKORDB_HOST}:{FALKORDB_PORT}...") graphiti_instance = Graphiti( llm_client=get_llm_client(), embedder=SentenceTransformerEmbedder(), cross_encoder=BGERerankerClient(), graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"), max_coroutines=2, ) await graphiti_instance.build_indices_and_constraints() # PATCHED 2026-05-02: bridge the per-driver SearchOperations to the # search_interface attribute that search_utils.py dispatches on. # graphiti-core 0.29.0 builds FalkorSearchOperations as driver._search_ops # but never assigns it to driver.search_interface — naming mismatch # between the two halves of the codebase. Without this, search_utils.py # falls through to interpreted-Cypher cosine math (full-table scan) even # when our patched FalkorSearchOperations exists. Setting search_interface # activates the per-driver vector-index path. if hasattr(graphiti_instance.driver, '_search_ops') and graphiti_instance.driver.search_interface is None: graphiti_instance.driver.search_interface = graphiti_instance.driver._search_ops log.info("Wired driver.search_interface = driver._search_ops (vector index path active)") log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}") # Recover any jobs left 'running' from a previous sidecar instance. # They become 'queued' again and the background worker picks them up. recovered = _startup_recovery() if recovered > 0: log.info(f"Startup recovery: reset {recovered} running job(s) to queued") # Start the background job worker. worker_task = asyncio.create_task(background_worker()) log.info("Sidecar ready — accepting job submissions on :8001") yield # Shutdown: cancel worker, close graphiti. if worker_task is not None: worker_task.cancel() try: await worker_task except asyncio.CancelledError: pass await graphiti_instance.close() app = FastAPI(title="Aaron AI Graphiti Sidecar (Pattern 1)", lifespan=lifespan) # --------------------------------------------------------------------------- # Request models — preserved from v1.0 with no payload-shape changes. The # only API change is the response shape: instead of blocking until # graphiti-core returns, submission endpoints return a job_id immediately. # --------------------------------------------------------------------------- class BulkEpisodeItem(BaseModel): name: str content: str source_description: str = "" timestamp: str | None = None class BulkEpisodeRequest(BaseModel): episodes: list[BulkEpisodeItem] group_id: str | None = None saga: str | None = None custom_extraction_instructions: str | None = None class EpisodeRequest(BaseModel): name: str content: str source_description: str = "" timestamp: str | None = None group_id: str | None = None custom_extraction_instructions: str | None = None saga: str | None = None # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/health") async def health(): return { "ok": True, "provider": LLM_PROVIDER, "group": GROUP_ID, "sidecar": SIDECAR_NAME, "version": "2.0", } @app.post("/episodes/bulk") async def submit_bulk(req: BulkEpisodeRequest): """Submit a bulk ingest job. Returns job_id for polling. Job is processed serially by the sidecar's background worker; one bulk-or-single job at a time per graph. No HTTP read-timeout blocking. Submitter polls GET /jobs/{job_id} until terminal status. """ if graphiti_instance is None: raise HTTPException(status_code=503, detail="Graphiti not initialized") job_id = str(uuid.uuid4()) payload = req.model_dump() try: _job_insert(job_id, "bulk", payload) except Exception as e: log.error(f"Failed to enqueue bulk job: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}") return {"job_id": job_id, "status": "queued"} @app.post("/episodes") async def submit_single(req: EpisodeRequest): """Submit a single-episode ingest job. Returns job_id for polling.""" if graphiti_instance is None: raise HTTPException(status_code=503, detail="Graphiti not initialized") job_id = str(uuid.uuid4()) payload = req.model_dump() try: _job_insert(job_id, "single", payload) except Exception as e: log.error(f"Failed to enqueue single job: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Job enqueue failed: {e}") return {"job_id": job_id, "status": "queued"} @app.get("/jobs/{job_id}") async def get_job(job_id: str): """Poll a job's status. Returns 404 if job not found.""" job = _job_get(job_id) if job is None: raise HTTPException(status_code=404, detail=f"Job {job_id} not found") return job @app.get("/search") async def search(query: str, limit: int = 8, group_id: str | None = None): if graphiti_instance is None: raise HTTPException(status_code=503, detail="Graphiti not initialized") try: results = await graphiti_instance.search( query=query, num_results=limit, group_ids=[group_id or GROUP_ID], ) return { "results": [ { "fact": r.fact, "source": getattr(r, "source_node_uuid", ""), "score": getattr(r, "score", 0), "valid_at": str(getattr(r, "valid_at", "")), "invalid_at": str(getattr(r, "invalid_at", "")), } for r in results ] } except Exception as e: log.error(f"Search failed: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8001, log_level="info")