From 151c756b89e99964ba6645d28e1fac447846774f Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Wed, 20 May 2026 05:08:07 +0000 Subject: [PATCH] api.py: async chat-turn push to Graphiti MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After chat() returns, fire-and-forget background thread POSTs the (user message + assistant response) as one episode to /episodes. Default extraction (Sonnet). Errors logged, never raised — chat is not gated on the write. Wall-clock cost in the background is ~20 min per episode against the current ~4,300-entity graph. The chat experience is unaffected; the graph catches up with a delay. Search_facts queries reflect new turns once the sidecar has finished processing them. Kill-switch: SKIP_GRAPHITI_CHAT_PUSH=1 in the api service environment disables the push without code changes. Useful if dedup contention surfaces under sustained load. Companions to this commit: search_facts tool (e96bf40), orientation indexer worker (e96bf40), FalkorDB vector index patches (d2ec20e, 313c0f0). --- scripts/api.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/scripts/api.py b/scripts/api.py index 5f90909..d64eed1 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -488,6 +488,47 @@ SEARCH_FACTS_TOOL = { } +def _push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message): + """Async fire-and-forget push of a chat turn into Graphiti. Single episode, + default extraction, no custom_extraction_instructions. Takes ~20 min in + the background against the current ~4,300-entity graph; the chat caller + is not gated on this. Errors are logged, never raised.""" + if os.getenv("SKIP_GRAPHITI_CHAT_PUSH"): + return + if not (user_message or "").strip() and not (assistant_message or "").strip(): + return + import threading + from datetime import datetime as _dt + + def _work(): + try: + episode_name = f"chat-{conversation_id[:8]}-{_dt.now().strftime('%Y%m%dT%H%M%S')}" + content = ( + f"User: {user_message}\n\n" + f"Assistant: {assistant_message}" + ) + payload = { + "name": episode_name, + "content": content, + "source_description": f"chat turn (conversation {conversation_id})", + "timestamp": _dt.now().isoformat(), + "group_id": GRAPHITI_GROUP_ID, + } + # Long timeout — sidecar add_episode against the current graph + # is empirically ~20 min wall-clock. We're patient; chat isn't. + r = requests.post(f"{GRAPHITI_URL}/episodes", json=payload, timeout=1800) + if r.status_code == 200: + print(f"[graphiti-push] turn ingested: {episode_name}") + else: + print(f"[graphiti-push] non-200 ({r.status_code}) for {episode_name}: {r.text[:200]}") + except requests.RequestException as e: + print(f"[graphiti-push] request failed: {e}") + except Exception as e: + print(f"[graphiti-push] unexpected error: {e}") + + threading.Thread(target=_work, daemon=True).start() + + def _execute_search_facts(tool_input): """Hit Graphiti /search, format the results as text for Claude.""" query = (tool_input or {}).get("query", "").strip() @@ -775,6 +816,11 @@ def chat(user_message, conversation_id, settings, client_time=None): for block in response.content: if hasattr(block, "text"): assistant_message += block.text + # Async fire-and-forget into Graphiti so the turn lands in the + # graph as a single episode for future search_facts queries to + # find. Takes ~20 min wall-clock in the background; chat returns + # immediately. Disable via SKIP_GRAPHITI_CHAT_PUSH=1 if needed. + _push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message) # Cap citations: accumulated_sources can grow large across multiple # retrieve_documents calls and not every chunk that came back was # actually used in the answer. Insertion order preserves rank