From 62b5b5453a82dc96c46d0a95c20457c5934d79cd Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Thu, 30 Apr 2026 04:01:02 +0000 Subject: [PATCH] fix: max_coroutines=2, saga support in sidecar; stage3 chunking; TIMEOUT_MAX 0 persistent in falkordb compose --- scripts/graphiti_service.py | 3 + scripts/stage3_worker.py | 181 ++++++++++++++++++++++++++++++++++++ 2 files changed, 184 insertions(+) create mode 100644 scripts/stage3_worker.py diff --git a/scripts/graphiti_service.py b/scripts/graphiti_service.py index 0a6c680..7e85a1f 100644 --- a/scripts/graphiti_service.py +++ b/scripts/graphiti_service.py @@ -65,6 +65,7 @@ async def lifespan(app: FastAPI): embedder=SentenceTransformerEmbedder(), cross_encoder=BGERerankerClient(), graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"), + max_coroutines=2, ) await graphiti_instance.build_indices_and_constraints() log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}") @@ -83,6 +84,7 @@ class BulkEpisodeItem(BaseModel): class BulkEpisodeRequest(BaseModel): episodes: list[BulkEpisodeItem] group_id: str | None = None + saga: str | None = None class EpisodeRequest(BaseModel): @@ -136,6 +138,7 @@ async def add_episodes_bulk(req: BulkEpisodeRequest): result = await g.add_episode_bulk( bulk_episodes=raw_episodes, group_id=req.group_id or GROUP_ID, + saga=req.saga or None, ) return {"ok": True, "count": len(raw_episodes)} except Exception as e: diff --git a/scripts/stage3_worker.py b/scripts/stage3_worker.py new file mode 100644 index 0000000..eeb1f5f --- /dev/null +++ b/scripts/stage3_worker.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation +Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti. + +Chunking rationale: Large documents sent as single episodes cause FalkorDB +write lock contention during entity deduplication. Chunking at ~500 words +(matching Stage 1) produces smaller deduplication passes that don't block. +Each document's chunks are linked via Graphiti's saga mechanism, preserving +document structure in the graph. + +Runs as systemd service: aaronai-stage3.service +""" + +import os, json, time, logging, requests +from pathlib import Path +from datetime import datetime +from dotenv import load_dotenv +import psycopg2 + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [stage3] %(levelname)s %(message)s", + handlers=[ + logging.StreamHandler(), + logging.FileHandler("/var/log/aaronai/stage3.log", mode="a"), + ] +) +log = logging.getLogger("stage3") + +PG_DSN = os.getenv("PG_DSN") +GRAPHITI_URL = "http://localhost:8001" +HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat") +RETRY_ATTEMPTS = 2 +POLL_INTERVAL = 5 +INGEST_TIMEOUT = 600 +WORKER_VERSION = "2.0" + +# Match Stage 1 chunking parameters +CHUNK_SIZE_WORDS = 500 +CHUNK_OVERLAP_WORDS = 50 +# Documents under this threshold ingested as single episode (no chunking overhead) +SINGLE_EPISODE_THRESHOLD = 1500 + + +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def write_heartbeat(): + try: + HEARTBEAT_FILE.parent.mkdir(parents=True, exist_ok=True) + HEARTBEAT_FILE.write_text(datetime.now().isoformat()) + except Exception: + pass + + +def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS): + """Split text into word-based chunks matching Stage 1 chunking.""" + words = text.split() + chunks = [] + start = 0 + while start < len(words): + end = start + chunk_size + chunk = " ".join(words[start:end]) + if chunk.strip(): + chunks.append(chunk) + start += chunk_size - overlap + return chunks + + +def ingest_to_graphiti(source, full_text, orientation): + """ + Ingest document to Graphiti as chunked episodes linked by saga. + + Short documents (<1500 chars) are ingested as a single episode. + Long documents are chunked at 500 words (matching Stage 1) and + ingested as a bulk batch with saga=source linking them together. + """ + char_length = len(full_text) + + if char_length < SINGLE_EPISODE_THRESHOLD: + # Single episode — short enough that deduplication won't block + episodes = [{ + "name": source, + "content": full_text, + "source_description": orientation, + "timestamp": datetime.now().isoformat(), + }] + log.info(f" Single episode ({char_length} chars)") + payload = {"episodes": episodes, "group_id": "aaron"} + else: + # Chunk document — each chunk becomes a separate episode + chunks = chunk_text(full_text) + episodes = [ + { + "name": f"{source} [{i+1}/{len(chunks)}]", + "content": chunk, + "source_description": orientation, + "timestamp": datetime.now().isoformat(), + } + for i, chunk in enumerate(chunks) + ] + log.info(f" Chunked into {len(chunks)} episodes ({char_length} chars)") + # saga=source links all chunks into a document unit in the graph + payload = {"episodes": episodes, "group_id": "aaron", "saga": source} + + resp = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=INGEST_TIMEOUT) + resp.raise_for_status() + return resp.json() + + +def process_one(row): + row_id, source, full_text, orientation = row + log.info(f"Ingesting to Graphiti: {source}") + + pg = get_pg() + cur = pg.cursor() + cur.execute( + "UPDATE stage_3_queue SET started_at = NOW(), attempts = attempts + 1 WHERE id = %s", + (row_id,) + ) + pg.commit() + + try: + result = ingest_to_graphiti(source, full_text, orientation) + cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,)) + pg.commit() + pg.close() + log.info(f" Committed to Graphiti: {source}") + return True + except Exception as e: + log.error(f" Graphiti ingest failed for {source}: {e}") + cur.execute(""" + UPDATE stage_3_queue + SET failed_at = NOW(), failure_reason = %s + WHERE id = %s + """, (str(e)[:500], row_id)) + pg.commit() + pg.close() + return False + + +def run(): + log.info(f"Stage 3 worker starting (v{WORKER_VERSION})") + + while True: + write_heartbeat() + + try: + pg = get_pg() + cur = pg.cursor() + cur.execute(""" + SELECT id, source, full_text, orientation + FROM stage_3_queue + WHERE completed_at IS NULL + AND failed_at IS NULL + AND (started_at IS NULL OR started_at < NOW() - INTERVAL '10 minutes') + AND attempts < %s + ORDER BY enqueued_at ASC + LIMIT 1 + """, (RETRY_ATTEMPTS + 1,)) + row = cur.fetchone() + pg.close() + + if not row: + time.sleep(POLL_INTERVAL) + continue + + process_one(row) + time.sleep(2) + + except Exception as e: + log.error(f"Worker loop error: {e}") + time.sleep(10) + + +if __name__ == "__main__": + run()