diff --git a/scripts/stage3_worker.py b/scripts/stage3_worker.py index 0e802e0..f64ce0e 100644 --- a/scripts/stage3_worker.py +++ b/scripts/stage3_worker.py @@ -1,22 +1,45 @@ #!/usr/bin/env python3 """ -Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation -Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti. +Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing -Chunking rationale: Large documents sent as single episodes cause FalkorDB -write lock contention during entity deduplication. Chunking at ~500 words -(matching Stage 1) produces smaller deduplication passes that don't block. -Each document's chunks are linked via Graphiti's saga mechanism, preserving -document structure in the graph. +Polls stage_3_queue, routes each row to one of two ingest pathways based on +state-type classification produced by Stage 2: -Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of -17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents -producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk -commits, each tagged with the same saga value so Graphiti still links them. + - BULK pathway (existing): supersedes_prior_state=false OR confidence=low + OR routing fields missing. Fast, no temporal invalidation. -Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 — -Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's -consecutive_failures pattern with sidecar restart on threshold. + - SINGLE-EPISODE pathway (new): supersedes_prior_state=true AND + confidence in {medium, high}. Per-chunk POST to /episodes with shared + saga tag, full edge invalidation, per-chunk timeout/retry independence. + +Routing rationale: the single-episode pathway is the correct API per +graphiti-core's docs for content that supersedes prior facts (it does +edge invalidation that bulk skips). It costs more per chunk because of +the resolve_edge LLM call; the routing rule keeps that cost bounded to +content that actually needs it. + +Chunking rationale (preserved from prior versions): Large documents sent +as single episodes cause FalkorDB write lock contention during entity +deduplication. Chunking at ~500 words (matching Stage 1) produces smaller +deduplication passes that don't block. Each document's chunks are linked +via Graphiti's saga mechanism, preserving document structure in the graph. + +Per-chunk heartbeat: single-episode pathway updates stage_3_queue.started_at +after each successful chunk POST so a long-running document doesn't cross +the 10-minute stale threshold mid-process and get re-dequeued by another +worker (or the same worker on next loop iteration). started_at thus means +"last activity timestamp" rather than "began at" — semantics that match +the dequeue query's intent (catch dead workers, not slow ones). + +Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed bulk +sagas of 17 and 19 chunks deadlock the sidecar's Python-side coordination. +Documents producing more than MAX_CHUNKS_PER_SAGA chunks on the bulk +pathway are split into multiple bulk commits, each tagged with the same +saga value so Graphiti still links them. The single-episode pathway +doesn't need this split since each chunk is its own POST. + +Wedge detection: mirrors Stage 2's consecutive_failures pattern with +sidecar restart on threshold. Runs as systemd service: aaronai-stage3.service """ @@ -44,17 +67,23 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat") RETRY_ATTEMPTS = 2 POLL_INTERVAL = 5 INGEST_TIMEOUT = 600 -WORKER_VERSION = "2.2" +WORKER_VERSION = "2.3" # Match Stage 1 chunking parameters CHUNK_SIZE_WORDS = 500 CHUNK_OVERLAP_WORDS = 50 # Documents under this threshold ingested as single episode (no chunking overhead) SINGLE_EPISODE_THRESHOLD = 1500 -# Sagas larger than this many chunks split into multiple commits +# Bulk-pathway sagas larger than this many chunks split into multiple commits # (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar) MAX_CHUNKS_PER_SAGA = 10 +# Routing rule: single-episode pathway requires both signals positive. +# Anything else (false, NULL, low confidence) routes to bulk — the +# safer-cheaper default. Mistral parse drift can't accidentally trigger +# the expensive pathway. +HIGH_TRUST_CONFIDENCE = ("medium", "high") + def get_pg(): return psycopg2.connect(PG_DSN) @@ -109,6 +138,22 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS): return chunks +def heartbeat_row(row_id): + """Refresh stage_3_queue.started_at to NOW() so a long-running single-episode + ingest doesn't cross the 10-minute stale threshold mid-process. Called + after each successful chunk POST. Best-effort: failures are logged but + don't fail the chunk — the worst case is a stale-threshold re-dequeue, + which graphiti's dedup will handle as a no-op.""" + try: + pg = get_pg() + cur = pg.cursor() + cur.execute("UPDATE stage_3_queue SET started_at = NOW() WHERE id = %s", (row_id,)) + pg.commit() + pg.close() + except Exception as e: + log.warning(f" Heartbeat update failed (continuing): {e}") + + def post_bulk(payload, batch_label=""): """Single POST to /episodes/bulk with consistent error handling.""" resp = requests.post( @@ -122,16 +167,32 @@ def post_bulk(payload, batch_label=""): return resp.json() -def ingest_to_graphiti(source, full_text, orientation): - """ - Ingest document to Graphiti as chunked episodes linked by saga. +def post_episode(payload, episode_label=""): + """Single POST to /episodes (singular) with consistent error handling. + Used by the single-episode pathway, one call per chunk.""" + resp = requests.post( + f"{GRAPHITI_URL}/episodes", + json=payload, + timeout=INGEST_TIMEOUT + ) + if not resp.ok: + prefix = f"{episode_label} " if episode_label else "" + raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}") + return resp.json() + + +def ingest_bulk(source, full_text, orientation): + """ + Bulk-pathway ingest: documents that don't supersede prior state. + Skips edge invalidation. Cheap. Three sub-paths by document size: - Three paths: - Short documents ( MAX_CHUNKS_PER_SAGA): split into batches of - MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag - so Graphiti links them as one document unit + MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga + tag so Graphiti links them as one document unit """ char_length = len(full_text) @@ -142,7 +203,7 @@ def ingest_to_graphiti(source, full_text, orientation): "source_description": orientation, "timestamp": datetime.now().isoformat(), }] - log.info(f" Single episode ({char_length} chars)") + log.info(f" [bulk] Single episode ({char_length} chars)") return post_bulk({"episodes": episodes, "group_id": "aaron"}) chunks = chunk_text(full_text) @@ -158,7 +219,7 @@ def ingest_to_graphiti(source, full_text, orientation): } for i, chunk in enumerate(chunks) ] - log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)") + log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)") return post_bulk( {"episodes": episodes, "group_id": "aaron", "saga": source} ) @@ -166,7 +227,7 @@ def ingest_to_graphiti(source, full_text, orientation): # Large document: split into batches sharing the same saga tag batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA log.info( - f" Chunked into {total_chunks} episodes ({char_length} chars); " + f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars); " f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}" ) last_result = None @@ -193,9 +254,110 @@ def ingest_to_graphiti(source, full_text, orientation): return last_result +def ingest_single_episode(row_id, source, full_text, orientation): + """ + Single-episode pathway: documents that supersede prior state with + medium-or-high confidence. Each chunk is its own POST to /episodes + with shared saga tag. Each call independent: own timeout, own retry + envelope, own failure semantics. + + Partial-success behavior: if chunk N of total fails, chunks 1..N-1 + stay committed (graphiti has already accepted them) and the function + raises with detail about which chunk failed and how many succeeded. + The caller marks the row failed_at with that detail; the operator + decides whether to re-enqueue. Re-ingestion will re-POST chunks 1..N-1 + against the graph; graphiti's dedup will handle them as no-ops. + + Heartbeats stage_3_queue.started_at after each successful chunk so the + row doesn't cross the 10-minute stale threshold while actively progressing. + """ + char_length = len(full_text) + + # Short documents: one POST, no chunking, no saga + if char_length < SINGLE_EPISODE_THRESHOLD: + payload = { + "name": source, + "content": full_text, + "source_description": orientation, + "group_id": "aaron", + "timestamp": datetime.now().isoformat(), + } + log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)") + return post_episode(payload, episode_label="single-ep") + + chunks = chunk_text(full_text) + total_chunks = len(chunks) + log.info( + f" [single-ep] Chunked into {total_chunks} episodes ({char_length} chars); " + f"per-chunk POSTs with shared saga" + ) + + succeeded = 0 + for i, chunk in enumerate(chunks): + chunk_num = i + 1 + payload = { + "name": f"{source} [{chunk_num}/{total_chunks}]", + "content": chunk, + "source_description": orientation, + "group_id": "aaron", + "saga": source, + "timestamp": datetime.now().isoformat(), + } + try: + post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}") + succeeded += 1 + log.info(f" chunk {chunk_num}/{total_chunks} committed") + heartbeat_row(row_id) + except Exception as e: + # Annotate the exception with partial-success detail so the + # caller can write a clean failure_reason. Re-raise to abort + # the document; previously-committed chunks stay in the graph. + raise RuntimeError( + f"single_episode_partial: chunk {chunk_num}/{total_chunks} failed " + f"(succeeded: {succeeded}); error: {str(e)[:300]}" + ) from e + + log.info(f" [single-ep] All {total_chunks} chunks committed") + return {"ok": True, "chunks_committed": total_chunks} + + +def should_route_single_episode(supersedes_prior_state, state_type_confidence): + """Routing decision for Phase A. + + Single-episode pathway requires BOTH: + - supersedes_prior_state is true (Mistral judged it temporally superseding) + - confidence is medium or high (Mistral was confident enough to trust) + + Anything else routes to bulk: false supersedes, NULL fields (legacy rows + pre-dating Stage 2 v2.2), low confidence even on supersedes=true. This + is the safer-cheaper default — bulk skips temporal invalidation, which + is the right behavior when we're not confident the content needs it. + """ + if not supersedes_prior_state: + return False + if state_type_confidence not in HIGH_TRUST_CONFIDENCE: + return False + return True + + def process_one(row): - row_id, source, full_text, orientation = row - log.info(f"Ingesting to Graphiti: {source}") + (row_id, source, full_text, orientation, + state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale) = row + + # Route decision + use_single_episode = should_route_single_episode( + supersedes_prior_state, state_type_confidence + ) + pathway = "single-episode" if use_single_episode else "bulk" + + log.info( + f"Ingesting to Graphiti: {source} " + f"[pathway={pathway}, state_type={state_type}, " + f"conf={state_type_confidence}, supersedes={supersedes_prior_state}]" + ) + if state_type_rationale: + log.info(f" rationale: {state_type_rationale[:200]}") pg = get_pg() cur = pg.cursor() @@ -204,9 +366,16 @@ def process_one(row): (row_id,) ) pg.commit() + pg.close() try: - result = ingest_to_graphiti(source, full_text, orientation) + if use_single_episode: + result = ingest_single_episode(row_id, source, full_text, orientation) + else: + result = ingest_bulk(source, full_text, orientation) + + pg = get_pg() + cur = pg.cursor() cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,)) pg.commit() pg.close() @@ -214,6 +383,8 @@ def process_one(row): return True except Exception as e: log.error(f" Graphiti ingest failed for {source}: {e}") + pg = get_pg() + cur = pg.cursor() cur.execute(""" UPDATE stage_3_queue SET failed_at = NOW(), failure_reason = %s @@ -235,7 +406,9 @@ def run(): pg = get_pg() cur = pg.cursor() cur.execute(""" - SELECT id, source, full_text, orientation + SELECT id, source, full_text, orientation, + state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale FROM stage_3_queue WHERE completed_at IS NULL AND failed_at IS NULL