api.py: async chat-turn push to Graphiti

After chat() returns, fire-and-forget background thread POSTs the (user
message + assistant response) as one episode to /episodes. Default extraction
(Sonnet). Errors logged, never raised — chat is not gated on the write.

Wall-clock cost in the background is ~20 min per episode against the
current ~4,300-entity graph. The chat experience is unaffected; the graph
catches up with a delay. Search_facts queries reflect new turns once the
sidecar has finished processing them.

Kill-switch: SKIP_GRAPHITI_CHAT_PUSH=1 in the api service environment
disables the push without code changes. Useful if dedup contention surfaces
under sustained load.

Companions to this commit: search_facts tool (e96bf40), orientation indexer
worker (e96bf40), FalkorDB vector index patches (d2ec20e, 313c0f0).
This commit is contained in:
2026-05-20 05:08:07 +00:00
parent e96bf40b2f
commit 151c756b89
+46
View File
@@ -488,6 +488,47 @@ SEARCH_FACTS_TOOL = {
} }
def _push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message):
"""Async fire-and-forget push of a chat turn into Graphiti. Single episode,
default extraction, no custom_extraction_instructions. Takes ~20 min in
the background against the current ~4,300-entity graph; the chat caller
is not gated on this. Errors are logged, never raised."""
if os.getenv("SKIP_GRAPHITI_CHAT_PUSH"):
return
if not (user_message or "").strip() and not (assistant_message or "").strip():
return
import threading
from datetime import datetime as _dt
def _work():
try:
episode_name = f"chat-{conversation_id[:8]}-{_dt.now().strftime('%Y%m%dT%H%M%S')}"
content = (
f"User: {user_message}\n\n"
f"Assistant: {assistant_message}"
)
payload = {
"name": episode_name,
"content": content,
"source_description": f"chat turn (conversation {conversation_id})",
"timestamp": _dt.now().isoformat(),
"group_id": GRAPHITI_GROUP_ID,
}
# Long timeout — sidecar add_episode against the current graph
# is empirically ~20 min wall-clock. We're patient; chat isn't.
r = requests.post(f"{GRAPHITI_URL}/episodes", json=payload, timeout=1800)
if r.status_code == 200:
print(f"[graphiti-push] turn ingested: {episode_name}")
else:
print(f"[graphiti-push] non-200 ({r.status_code}) for {episode_name}: {r.text[:200]}")
except requests.RequestException as e:
print(f"[graphiti-push] request failed: {e}")
except Exception as e:
print(f"[graphiti-push] unexpected error: {e}")
threading.Thread(target=_work, daemon=True).start()
def _execute_search_facts(tool_input): def _execute_search_facts(tool_input):
"""Hit Graphiti /search, format the results as text for Claude.""" """Hit Graphiti /search, format the results as text for Claude."""
query = (tool_input or {}).get("query", "").strip() query = (tool_input or {}).get("query", "").strip()
@@ -775,6 +816,11 @@ def chat(user_message, conversation_id, settings, client_time=None):
for block in response.content: for block in response.content:
if hasattr(block, "text"): if hasattr(block, "text"):
assistant_message += block.text assistant_message += block.text
# Async fire-and-forget into Graphiti so the turn lands in the
# graph as a single episode for future search_facts queries to
# find. Takes ~20 min wall-clock in the background; chat returns
# immediately. Disable via SKIP_GRAPHITI_CHAT_PUSH=1 if needed.
_push_chat_turn_to_graphiti(conversation_id, user_message, assistant_message)
# Cap citations: accumulated_sources can grow large across multiple # Cap citations: accumulated_sources can grow large across multiple
# retrieve_documents calls and not every chunk that came back was # retrieve_documents calls and not every chunk that came back was
# actually used in the answer. Insertion order preserves rank # actually used in the answer. Insertion order preserves rank