From 8b0a1636708760a7f073f0eba1a53fa8f6a96074 Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Fri, 1 May 2026 18:57:31 +0000 Subject: [PATCH] graphiti_service: expose custom_extraction_instructions on /episodes/bulk; add saga on /episodes - BulkEpisodeRequest: new optional custom_extraction_instructions field with comment noting graphiti-core inserts it into extract_nodes/extract_edges prompts only, NOT dedupe prompts (verified by reading prompts directory) - EpisodeRequest: new optional saga field, plumbed through to add_episode for upcoming Stage 3 single-episode pathway - Both handlers use conditional kwargs construction so existing callers see no behavioral change Phase A item 1 of three. Items 2 (stage2_worker) and 3 (stage3_worker) follow. --- scripts/graphiti_service.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/scripts/graphiti_service.py b/scripts/graphiti_service.py index 6154eaf..7dddf28 100644 --- a/scripts/graphiti_service.py +++ b/scripts/graphiti_service.py @@ -92,6 +92,12 @@ class BulkEpisodeRequest(BaseModel): episodes: list[BulkEpisodeItem] group_id: str | None = None saga: str | None = None + # Batch-level extraction guidance. graphiti-core inserts this into the + # entity-extraction and edge-extraction prompts only — NOT into dedup + # prompts. Use to bias *what* gets extracted, not *how* dedup runs. + # Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py, + # dedupe_nodes.py, dedupe_edges.py in graphiti-core. + custom_extraction_instructions: str | None = None class EpisodeRequest(BaseModel): @@ -101,6 +107,7 @@ class EpisodeRequest(BaseModel): timestamp: str | None = None group_id: str | None = None custom_extraction_instructions: str | None = None + saga: str | None = None @app.get("/health") async def health(): @@ -112,7 +119,7 @@ async def add_episode(req: EpisodeRequest): from graphiti_core.nodes import EpisodeType try: ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now() - await g.add_episode( + kwargs = dict( name=req.name, episode_body=req.content, source=EpisodeType.text, @@ -121,6 +128,11 @@ async def add_episode(req: EpisodeRequest): group_id=req.group_id or GROUP_ID, custom_extraction_instructions=req.custom_extraction_instructions, ) + # Saga is supported on graphiti-core add_episode but kept optional + # so older callers don't need to know about it. + if req.saga is not None: + kwargs["saga"] = req.saga + await g.add_episode(**kwargs) return {"ok": True} except Exception as e: log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}") @@ -142,11 +154,16 @@ async def add_episodes_bulk(req: BulkEpisodeRequest): reference_time=ref_time, )) try: - result = await g.add_episode_bulk( + kwargs = dict( bulk_episodes=raw_episodes, group_id=req.group_id or GROUP_ID, saga=req.saga or None, ) + # Pass-through only when set, so callers that don't supply + # instructions get graphiti-core's default behavior unchanged. + if req.custom_extraction_instructions is not None: + kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions + result = await g.add_episode_bulk(**kwargs) return {"ok": True, "count": len(raw_episodes)} except Exception as e: log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")