stage3_worker: v2.3 — bulk-vs-single-episode routing on Stage 2 state-type

Reads new routing columns from stage_3_queue (state_type, state_type_confidence,
supersedes_prior_state, state_type_rationale) and dispatches each row to one of
two ingest pathways:

  - BULK pathway (existing, renamed from ingest_to_graphiti to ingest_bulk):
    safer-cheaper default. Used when supersedes=false OR confidence=low OR
    routing fields are NULL (legacy rows). Skips edge invalidation per
    graphiti-core's bulk semantics.

  - SINGLE-EPISODE pathway (new, ingest_single_episode): used only when
    supersedes_prior_state=true AND confidence in {medium, high}. Per-chunk
    POST to /episodes (singular endpoint) with shared saga tag. Each call
    independent — own timeout, own retry envelope.

Routing decision isolated in should_route_single_episode() with unit-tested
truth table covering all eight (supersedes × confidence) combinations.

Per-chunk heartbeat (heartbeat_row): single-episode pathway updates
stage_3_queue.started_at after each successful chunk POST so a long-running
document doesn't cross the 10-minute stale threshold mid-process and get
re-dequeued. started_at semantics now: 'last activity timestamp' rather
than 'began at'. Best-effort; failures logged not raised.

Partial-success on chunk failure: previously-committed chunks stay in the
graph; the function raises with detail (single_episode_partial: chunk N/M
failed, succeeded K). The row is marked failed_at with that detail. Re-
ingestion would re-POST chunks 1..N-1 against the graph; graphiti's dedup
handles them as no-ops.

DB connection scoping: process_one no longer holds one Postgres connection
across the whole ingest call (which can run an hour for long single-episode
documents). Each DB write gets a short-lived connection.

Phase A item 3 of three. Closes the mechanical-patches block. Item 4
(custom_extraction_instructions text design) is the remaining intellectual
work; sidecar and worker plumbing is now ready for it.
This commit is contained in:
2026-05-01 19:07:41 +00:00
parent 70e87e3ab5
commit e7de7fb64b
+202 -29
View File
@@ -1,22 +1,45 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti.
Chunking rationale: Large documents sent as single episodes cause FalkorDB Polls stage_3_queue, routes each row to one of two ingest pathways based on
write lock contention during entity deduplication. Chunking at ~500 words state-type classification produced by Stage 2:
(matching Stage 1) produces smaller deduplication passes that don't block.
Each document's chunks are linked via Graphiti's saga mechanism, preserving
document structure in the graph.
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of - BULK pathway (existing): supersedes_prior_state=false OR confidence=low
17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents OR routing fields missing. Fast, no temporal invalidation.
producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk
commits, each tagged with the same saga value so Graphiti still links them.
Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 — - SINGLE-EPISODE pathway (new): supersedes_prior_state=true AND
Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's confidence in {medium, high}. Per-chunk POST to /episodes with shared
consecutive_failures pattern with sidecar restart on threshold. saga tag, full edge invalidation, per-chunk timeout/retry independence.
Routing rationale: the single-episode pathway is the correct API per
graphiti-core's docs for content that supersedes prior facts (it does
edge invalidation that bulk skips). It costs more per chunk because of
the resolve_edge LLM call; the routing rule keeps that cost bounded to
content that actually needs it.
Chunking rationale (preserved from prior versions): Large documents sent
as single episodes cause FalkorDB write lock contention during entity
deduplication. Chunking at ~500 words (matching Stage 1) produces smaller
deduplication passes that don't block. Each document's chunks are linked
via Graphiti's saga mechanism, preserving document structure in the graph.
Per-chunk heartbeat: single-episode pathway updates stage_3_queue.started_at
after each successful chunk POST so a long-running document doesn't cross
the 10-minute stale threshold mid-process and get re-dequeued by another
worker (or the same worker on next loop iteration). started_at thus means
"last activity timestamp" rather than "began at" — semantics that match
the dequeue query's intent (catch dead workers, not slow ones).
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed bulk
sagas of 17 and 19 chunks deadlock the sidecar's Python-side coordination.
Documents producing more than MAX_CHUNKS_PER_SAGA chunks on the bulk
pathway are split into multiple bulk commits, each tagged with the same
saga value so Graphiti still links them. The single-episode pathway
doesn't need this split since each chunk is its own POST.
Wedge detection: mirrors Stage 2's consecutive_failures pattern with
sidecar restart on threshold.
Runs as systemd service: aaronai-stage3.service Runs as systemd service: aaronai-stage3.service
""" """
@@ -44,17 +67,23 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
RETRY_ATTEMPTS = 2 RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5 POLL_INTERVAL = 5
INGEST_TIMEOUT = 600 INGEST_TIMEOUT = 600
WORKER_VERSION = "2.2" WORKER_VERSION = "2.3"
# Match Stage 1 chunking parameters # Match Stage 1 chunking parameters
CHUNK_SIZE_WORDS = 500 CHUNK_SIZE_WORDS = 500
CHUNK_OVERLAP_WORDS = 50 CHUNK_OVERLAP_WORDS = 50
# Documents under this threshold ingested as single episode (no chunking overhead) # Documents under this threshold ingested as single episode (no chunking overhead)
SINGLE_EPISODE_THRESHOLD = 1500 SINGLE_EPISODE_THRESHOLD = 1500
# Sagas larger than this many chunks split into multiple commits # Bulk-pathway sagas larger than this many chunks split into multiple commits
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar) # (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
MAX_CHUNKS_PER_SAGA = 10 MAX_CHUNKS_PER_SAGA = 10
# Routing rule: single-episode pathway requires both signals positive.
# Anything else (false, NULL, low confidence) routes to bulk — the
# safer-cheaper default. Mistral parse drift can't accidentally trigger
# the expensive pathway.
HIGH_TRUST_CONFIDENCE = ("medium", "high")
def get_pg(): def get_pg():
return psycopg2.connect(PG_DSN) return psycopg2.connect(PG_DSN)
@@ -109,6 +138,22 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
return chunks return chunks
def heartbeat_row(row_id):
"""Refresh stage_3_queue.started_at to NOW() so a long-running single-episode
ingest doesn't cross the 10-minute stale threshold mid-process. Called
after each successful chunk POST. Best-effort: failures are logged but
don't fail the chunk — the worst case is a stale-threshold re-dequeue,
which graphiti's dedup will handle as a no-op."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE stage_3_queue SET started_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
except Exception as e:
log.warning(f" Heartbeat update failed (continuing): {e}")
def post_bulk(payload, batch_label=""): def post_bulk(payload, batch_label=""):
"""Single POST to /episodes/bulk with consistent error handling.""" """Single POST to /episodes/bulk with consistent error handling."""
resp = requests.post( resp = requests.post(
@@ -122,16 +167,32 @@ def post_bulk(payload, batch_label=""):
return resp.json() return resp.json()
def ingest_to_graphiti(source, full_text, orientation): def post_episode(payload, episode_label=""):
""" """Single POST to /episodes (singular) with consistent error handling.
Ingest document to Graphiti as chunked episodes linked by saga. Used by the single-episode pathway, one call per chunk."""
resp = requests.post(
f"{GRAPHITI_URL}/episodes",
json=payload,
timeout=INGEST_TIMEOUT
)
if not resp.ok:
prefix = f"{episode_label} " if episode_label else ""
raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}")
return resp.json()
def ingest_bulk(source, full_text, orientation):
"""
Bulk-pathway ingest: documents that don't supersede prior state.
Skips edge invalidation. Cheap. Three sub-paths by document size:
Three paths:
- Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga - Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
[note: 'single episode' here means one bulk call with one item, NOT
the single-episode-pathway; naming overlap is unfortunate but local]
- Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked - Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of - Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
so Graphiti links them as one document unit tag so Graphiti links them as one document unit
""" """
char_length = len(full_text) char_length = len(full_text)
@@ -142,7 +203,7 @@ def ingest_to_graphiti(source, full_text, orientation):
"source_description": orientation, "source_description": orientation,
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
}] }]
log.info(f" Single episode ({char_length} chars)") log.info(f" [bulk] Single episode ({char_length} chars)")
return post_bulk({"episodes": episodes, "group_id": "aaron"}) return post_bulk({"episodes": episodes, "group_id": "aaron"})
chunks = chunk_text(full_text) chunks = chunk_text(full_text)
@@ -158,7 +219,7 @@ def ingest_to_graphiti(source, full_text, orientation):
} }
for i, chunk in enumerate(chunks) for i, chunk in enumerate(chunks)
] ]
log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)") log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
return post_bulk( return post_bulk(
{"episodes": episodes, "group_id": "aaron", "saga": source} {"episodes": episodes, "group_id": "aaron", "saga": source}
) )
@@ -166,7 +227,7 @@ def ingest_to_graphiti(source, full_text, orientation):
# Large document: split into batches sharing the same saga tag # Large document: split into batches sharing the same saga tag
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
log.info( log.info(
f" Chunked into {total_chunks} episodes ({char_length} chars); " f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars); "
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}" f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
) )
last_result = None last_result = None
@@ -193,9 +254,110 @@ def ingest_to_graphiti(source, full_text, orientation):
return last_result return last_result
def ingest_single_episode(row_id, source, full_text, orientation):
"""
Single-episode pathway: documents that supersede prior state with
medium-or-high confidence. Each chunk is its own POST to /episodes
with shared saga tag. Each call independent: own timeout, own retry
envelope, own failure semantics.
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
stay committed (graphiti has already accepted them) and the function
raises with detail about which chunk failed and how many succeeded.
The caller marks the row failed_at with that detail; the operator
decides whether to re-enqueue. Re-ingestion will re-POST chunks 1..N-1
against the graph; graphiti's dedup will handle them as no-ops.
Heartbeats stage_3_queue.started_at after each successful chunk so the
row doesn't cross the 10-minute stale threshold while actively progressing.
"""
char_length = len(full_text)
# Short documents: one POST, no chunking, no saga
if char_length < SINGLE_EPISODE_THRESHOLD:
payload = {
"name": source,
"content": full_text,
"source_description": orientation,
"group_id": "aaron",
"timestamp": datetime.now().isoformat(),
}
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
return post_episode(payload, episode_label="single-ep")
chunks = chunk_text(full_text)
total_chunks = len(chunks)
log.info(
f" [single-ep] Chunked into {total_chunks} episodes ({char_length} chars); "
f"per-chunk POSTs with shared saga"
)
succeeded = 0
for i, chunk in enumerate(chunks):
chunk_num = i + 1
payload = {
"name": f"{source} [{chunk_num}/{total_chunks}]",
"content": chunk,
"source_description": orientation,
"group_id": "aaron",
"saga": source,
"timestamp": datetime.now().isoformat(),
}
try:
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
succeeded += 1
log.info(f" chunk {chunk_num}/{total_chunks} committed")
heartbeat_row(row_id)
except Exception as e:
# Annotate the exception with partial-success detail so the
# caller can write a clean failure_reason. Re-raise to abort
# the document; previously-committed chunks stay in the graph.
raise RuntimeError(
f"single_episode_partial: chunk {chunk_num}/{total_chunks} failed "
f"(succeeded: {succeeded}); error: {str(e)[:300]}"
) from e
log.info(f" [single-ep] All {total_chunks} chunks committed")
return {"ok": True, "chunks_committed": total_chunks}
def should_route_single_episode(supersedes_prior_state, state_type_confidence):
"""Routing decision for Phase A.
Single-episode pathway requires BOTH:
- supersedes_prior_state is true (Mistral judged it temporally superseding)
- confidence is medium or high (Mistral was confident enough to trust)
Anything else routes to bulk: false supersedes, NULL fields (legacy rows
pre-dating Stage 2 v2.2), low confidence even on supersedes=true. This
is the safer-cheaper default — bulk skips temporal invalidation, which
is the right behavior when we're not confident the content needs it.
"""
if not supersedes_prior_state:
return False
if state_type_confidence not in HIGH_TRUST_CONFIDENCE:
return False
return True
def process_one(row): def process_one(row):
row_id, source, full_text, orientation = row (row_id, source, full_text, orientation,
log.info(f"Ingesting to Graphiti: {source}") state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale) = row
# Route decision
use_single_episode = should_route_single_episode(
supersedes_prior_state, state_type_confidence
)
pathway = "single-episode" if use_single_episode else "bulk"
log.info(
f"Ingesting to Graphiti: {source} "
f"[pathway={pathway}, state_type={state_type}, "
f"conf={state_type_confidence}, supersedes={supersedes_prior_state}]"
)
if state_type_rationale:
log.info(f" rationale: {state_type_rationale[:200]}")
pg = get_pg() pg = get_pg()
cur = pg.cursor() cur = pg.cursor()
@@ -204,9 +366,16 @@ def process_one(row):
(row_id,) (row_id,)
) )
pg.commit() pg.commit()
pg.close()
try: try:
result = ingest_to_graphiti(source, full_text, orientation) if use_single_episode:
result = ingest_single_episode(row_id, source, full_text, orientation)
else:
result = ingest_bulk(source, full_text, orientation)
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,)) cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit() pg.commit()
pg.close() pg.close()
@@ -214,6 +383,8 @@ def process_one(row):
return True return True
except Exception as e: except Exception as e:
log.error(f" Graphiti ingest failed for {source}: {e}") log.error(f" Graphiti ingest failed for {source}: {e}")
pg = get_pg()
cur = pg.cursor()
cur.execute(""" cur.execute("""
UPDATE stage_3_queue UPDATE stage_3_queue
SET failed_at = NOW(), failure_reason = %s SET failed_at = NOW(), failure_reason = %s
@@ -235,7 +406,9 @@ def run():
pg = get_pg() pg = get_pg()
cur = pg.cursor() cur = pg.cursor()
cur.execute(""" cur.execute("""
SELECT id, source, full_text, orientation SELECT id, source, full_text, orientation,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale
FROM stage_3_queue FROM stage_3_queue
WHERE completed_at IS NULL WHERE completed_at IS NULL
AND failed_at IS NULL AND failed_at IS NULL