Compare commits

...

4 Commits

Author SHA1 Message Date
aaron 30beeb3a26 migrations: retroactively track stage_3_queue routing columns
Adds migrations/ directory with README documenting the convention
(timestamped filenames, idempotent SQL, forward-only, single change per file).

First migration is the Stage 3 queue routing columns added live during
Phase A patches today: state_type, state_type_confidence,
supersedes_prior_state, state_type_rationale, plus index on supersedes.
Required by stage2_worker.py >= 2.2 and stage3_worker.py >= 2.3.

Idempotent (IF NOT EXISTS), safe to re-apply. Verified by re-applying
against the live DB — no changes, no errors.

Closes a reproducibility gap: a fresh DB provisioned from git would crash
on first Stage 2 enqueue without these columns. Now the SQL travels with
the code.
2026-05-01 19:11:09 +00:00
aaron e7de7fb64b stage3_worker: v2.3 — bulk-vs-single-episode routing on Stage 2 state-type
Reads new routing columns from stage_3_queue (state_type, state_type_confidence,
supersedes_prior_state, state_type_rationale) and dispatches each row to one of
two ingest pathways:

  - BULK pathway (existing, renamed from ingest_to_graphiti to ingest_bulk):
    safer-cheaper default. Used when supersedes=false OR confidence=low OR
    routing fields are NULL (legacy rows). Skips edge invalidation per
    graphiti-core's bulk semantics.

  - SINGLE-EPISODE pathway (new, ingest_single_episode): used only when
    supersedes_prior_state=true AND confidence in {medium, high}. Per-chunk
    POST to /episodes (singular endpoint) with shared saga tag. Each call
    independent — own timeout, own retry envelope.

Routing decision isolated in should_route_single_episode() with unit-tested
truth table covering all eight (supersedes × confidence) combinations.

Per-chunk heartbeat (heartbeat_row): single-episode pathway updates
stage_3_queue.started_at after each successful chunk POST so a long-running
document doesn't cross the 10-minute stale threshold mid-process and get
re-dequeued. started_at semantics now: 'last activity timestamp' rather
than 'began at'. Best-effort; failures logged not raised.

Partial-success on chunk failure: previously-committed chunks stay in the
graph; the function raises with detail (single_episode_partial: chunk N/M
failed, succeeded K). The row is marked failed_at with that detail. Re-
ingestion would re-POST chunks 1..N-1 against the graph; graphiti's dedup
handles them as no-ops.

DB connection scoping: process_one no longer holds one Postgres connection
across the whole ingest call (which can run an hour for long single-episode
documents). Each DB write gets a short-lived connection.

Phase A item 3 of three. Closes the mechanical-patches block. Item 4
(custom_extraction_instructions text design) is the remaining intellectual
work; sidecar and worker plumbing is now ready for it.
2026-05-01 19:07:41 +00:00
aaron 70e87e3ab5 stage2_worker: v2.2 — add state-type classification for Stage 3 routing
Mistral pass now produces two concerns in a single flat JSON output:
  (a) orientation context (existing four fields, unchanged semantics)
  (b) state-type classification: state_type (current/reference/historical),
      state_type_confidence (low/medium/high), supersedes_prior_state (bool),
      state_type_rationale (text)

Routing fields written as explicit columns on stage_3_queue (separate
ALTER TABLE migration adds them: state_type, state_type_confidence,
supersedes_prior_state, state_type_rationale + index on supersedes).

Safe-cheap defaults on malformed Mistral output: state_type='reference',
confidence='low', supersedes=false. All defaults route to bulk pathway
(no temporal invalidation cost) so Mistral parse drift can't accidentally
trigger expensive single-episode ingest.

Phase A item 2 of three. Sidecar (item 1, commit 8b0a163) already plumbs
custom_extraction_instructions through to /episodes/bulk. Stage 3 routing
logic (item 3) follows.
2026-05-01 19:02:11 +00:00
aaron 8b0a163670 graphiti_service: expose custom_extraction_instructions on /episodes/bulk; add saga on /episodes
- BulkEpisodeRequest: new optional custom_extraction_instructions field
  with comment noting graphiti-core inserts it into extract_nodes/extract_edges
  prompts only, NOT dedupe prompts (verified by reading prompts directory)
- EpisodeRequest: new optional saga field, plumbed through to add_episode
  for upcoming Stage 3 single-episode pathway
- Both handlers use conditional kwargs construction so existing callers
  see no behavioral change

Phase A item 1 of three. Items 2 (stage2_worker) and 3 (stage3_worker) follow.
2026-05-01 18:57:31 +00:00
5 changed files with 418 additions and 53 deletions
@@ -0,0 +1,23 @@
-- 20260501-001 — Stage 3 queue routing columns for Phase A bulk-vs-single-episode routing
--
-- Adds four columns and one index to stage_3_queue, written by Stage 2 v2.2
-- and read by Stage 3 v2.3 to choose between bulk and single-episode ingest
-- pathways. See architecture doc and Phase A handoff (2026-05-01) for design.
--
-- Required by:
-- scripts/stage2_worker.py >= 2.2
-- scripts/stage3_worker.py >= 2.3
--
-- Idempotent: safe to re-apply against a database where the columns already
-- exist (was applied live before this file was created).
ALTER TABLE stage_3_queue
ADD COLUMN IF NOT EXISTS state_type TEXT,
ADD COLUMN IF NOT EXISTS state_type_confidence TEXT,
ADD COLUMN IF NOT EXISTS supersedes_prior_state BOOLEAN,
ADD COLUMN IF NOT EXISTS state_type_rationale TEXT;
-- Index on the routing signal — Stage 3 reads this on every dequeue,
-- and observability queries (item 6: routing_decisions) will filter on it.
CREATE INDEX IF NOT EXISTS stage_3_queue_supersedes_idx
ON stage_3_queue (supersedes_prior_state);
+37
View File
@@ -0,0 +1,37 @@
# BirdAI database migrations
Schema changes applied to the BirdAI Postgres database, in chronological order.
Filenames are YYYYMMDD-NNN_short_description.sql where NNN is a sequence number
within the day for ordering when multiple migrations land same-day.
## Conventions
- Each file is idempotent: uses IF NOT EXISTS / IF EXISTS so it can be
re-run safely against a database that already has the change applied. This
matters because we don't track which migrations a given DB has applied (no
migrations table yet — that's its own future migration).
- Each file is a single logical change: one feature, one rollout. Don't pile
unrelated DDL into one file.
- Each file documents what it's for and which worker version requires it
in a header comment, so the relationship between schema and code is legible
from either side.
- Migrations are forward-only. No down-migrations. If a change is wrong,
write a new migration that fixes it.
## Applying
Against the live DB:
psql "$PG_DSN" -f migrations/YYYYMMDD-NNN_name.sql
Against a fresh DB (disaster recovery, dev clone), apply all files in order:
for f in migrations/*.sql; do
echo "Applying $f"
psql "$PG_DSN" -f "$f"
done
## Pending: migrations tracking table
There is no schema_migrations table yet. Adding one is itself a migration —
deferred until a second migration after this one lands and the need is real.
+19 -2
View File
@@ -92,6 +92,12 @@ class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
saga: str | None = None
# Batch-level extraction guidance. graphiti-core inserts this into the
# entity-extraction and edge-extraction prompts only — NOT into dedup
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
custom_extraction_instructions: str | None = None
class EpisodeRequest(BaseModel):
@@ -101,6 +107,7 @@ class EpisodeRequest(BaseModel):
timestamp: str | None = None
group_id: str | None = None
custom_extraction_instructions: str | None = None
saga: str | None = None
@app.get("/health")
async def health():
@@ -112,7 +119,7 @@ async def add_episode(req: EpisodeRequest):
from graphiti_core.nodes import EpisodeType
try:
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
await g.add_episode(
kwargs = dict(
name=req.name,
episode_body=req.content,
source=EpisodeType.text,
@@ -121,6 +128,11 @@ async def add_episode(req: EpisodeRequest):
group_id=req.group_id or GROUP_ID,
custom_extraction_instructions=req.custom_extraction_instructions,
)
# Saga is supported on graphiti-core add_episode but kept optional
# so older callers don't need to know about it.
if req.saga is not None:
kwargs["saga"] = req.saga
await g.add_episode(**kwargs)
return {"ok": True}
except Exception as e:
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
@@ -142,11 +154,16 @@ async def add_episodes_bulk(req: BulkEpisodeRequest):
reference_time=ref_time,
))
try:
result = await g.add_episode_bulk(
kwargs = dict(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
saga=req.saga or None,
)
# Pass-through only when set, so callers that don't supply
# instructions get graphiti-core's default behavior unchanged.
if req.custom_extraction_instructions is not None:
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
result = await g.add_episode_bulk(**kwargs)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
+137 -22
View File
@@ -1,12 +1,19 @@
#!/usr/bin/env python3
"""
Stage 2 Worker — Taxonomy-Free Mistral Orientation
Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3.
Runs as systemd service: aaronai-stage2.service
Stage 2 Worker — Taxonomy-Free Mistral Orientation + State-Type Classification
Polls stage_2_queue, runs Mistral pass that produces:
(a) orientation context (active frames, frame relationships, extraction focus)
(b) state-type classification for Stage 3 routing (current/reference/historical,
supersedes_prior_state boolean, confidence, rationale)
Enqueues Stage 3 with both concerns as explicit columns.
Routing:
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
- char_length >= 2000 → enqueue Stage 3 with orientation metadata
- char_length >= 2000 → enqueue Stage 3 with orientation + routing metadata
Runs as systemd service: aaronai-stage2.service
"""
import os, json, time, subprocess, logging, requests
@@ -33,22 +40,68 @@ CHAR_LENGTH_THRESHOLD = 2000
REQUEST_TIMEOUT = 300
RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5
WORKER_VERSION = "2.1"
WORKER_VERSION = "2.2"
# Valid values for state-type fields. Mistral output validated against these;
# anything outside falls through to safe-cheap defaults (bulk routing).
VALID_STATE_TYPES = ("current", "reference", "historical")
VALID_CONFIDENCE = ("low", "medium", "high")
# Safe-cheap defaults applied when Mistral output is missing or malformed.
# All route to bulk pathway (no temporal invalidation cost) per Phase A
# routing rule: route to single-episode only on supersedes_prior_state=true
# AND confidence in {medium, high}.
DEFAULT_STATE_TYPE = "reference"
DEFAULT_CONFIDENCE = "low"
DEFAULT_SUPERSEDES = False
DEFAULT_RATIONALE = "mistral output missing or malformed; default applied"
TAXFREE_PROMPT = (
"You are a metadata extraction system. Given a document, describe its content "
"shape for use as orientation context in a knowledge graph extraction pass.\n\n"
"Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n"
"Instead, describe:\n"
"- What domains or frames are active in this content (there may be several simultaneously)\n"
"- How those frames relate to each other in this specific document\n"
"- What kind of relational content a knowledge graph extractor should look for\n\n"
"Output JSON only. No prose, no explanation, no markdown.\n\n"
"Schema:\n"
"You are a metadata extraction system. Given a document, produce a JSON object "
"describing two distinct concerns about the document. Output JSON only — no prose, "
"no explanation, no markdown.\n\n"
"CONCERN 1 — ORIENTATION CONTEXT (for downstream knowledge-graph extraction):\n"
"Describe the content shape. Do not summarize content. Do not extract entities. "
"Do not assign a single category label.\n"
" - active_frames: which domains or frames are active in this content (there may "
"be several simultaneously)\n"
" - frame_relationships: how those frames relate in this specific document, "
"one sentence\n"
" - extraction_orientation: what kind of relational content a knowledge-graph "
"extractor should look for, one sentence\n"
" - one_sentence_summary: a single-sentence content summary\n\n"
"CONCERN 2 — STATE-TYPE CLASSIFICATION (for ingest routing):\n"
"Classify the document's relationship to time and prior facts. This is independent "
"of orientation: a document can be in a 'reference frame' (orientation) while "
"describing 'current state' (state-type), or vice versa. Judge the document's "
"ROLE, not its topic.\n"
" - state_type: one of\n"
" 'current' — describes the author's present state, recent decisions, or "
"ongoing situations as of the document's date\n"
" 'reference' — timeless or slow-changing material: external books, "
"documentation, technical reference, conceptual writing\n"
" 'historical' — describes past events, prior states, or archived material "
"the author is recording but not living in\n"
" - state_type_confidence: 'low' | 'medium' | 'high' — how confident you are in "
"the classification. Use 'low' when genuinely uncertain.\n"
" - supersedes_prior_state: true if this document describes facts that should "
"REPLACE previously-recorded facts about the same subjects (e.g. a journal entry "
"saying 'I no longer work at X', a status update, a corrected belief). false "
"otherwise. Default to false when uncertain.\n"
" - state_type_rationale: one sentence explaining the classification\n\n"
"Output schema (flat, all eight fields at the top level):\n"
'{"active_frames": ["<frame 1>", "<frame 2>"], '
'"frame_relationships": "<one sentence>", '
'"extraction_orientation": "<one sentence>", '
'"one_sentence_summary": "<one sentence>"}\n\n'
'"one_sentence_summary": "<one sentence>", '
'"state_type": "current|reference|historical", '
'"state_type_confidence": "low|medium|high", '
'"supersedes_prior_state": true|false, '
'"state_type_rationale": "<one sentence>"}\n\n'
"Document:\n"
)
@@ -100,6 +153,38 @@ def run_mistral(doc_text):
return {"error": "parse_failed", "raw": raw[:200]}
def normalize_state_fields(meta):
"""Validate and normalize the four state-type fields from Mistral output.
Anything missing or malformed falls through to safe-cheap defaults that
route to bulk pathway (no temporal invalidation work)."""
raw_state_type = meta.get("state_type")
if isinstance(raw_state_type, str) and raw_state_type.lower() in VALID_STATE_TYPES:
state_type = raw_state_type.lower()
else:
state_type = DEFAULT_STATE_TYPE
raw_conf = meta.get("state_type_confidence")
if isinstance(raw_conf, str) and raw_conf.lower() in VALID_CONFIDENCE:
confidence = raw_conf.lower()
else:
confidence = DEFAULT_CONFIDENCE
raw_supersedes = meta.get("supersedes_prior_state")
if isinstance(raw_supersedes, bool):
supersedes = raw_supersedes
else:
supersedes = DEFAULT_SUPERSEDES
raw_rationale = meta.get("state_type_rationale")
if isinstance(raw_rationale, str) and raw_rationale.strip():
rationale = raw_rationale.strip()[:1000]
else:
rationale = DEFAULT_RATIONALE
return state_type, confidence, supersedes, rationale
def build_orientation(meta):
frames = ", ".join(meta.get("active_frames", []))
rel = meta.get("frame_relationships", "")
@@ -108,20 +193,39 @@ def build_orientation(meta):
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
def enqueue_stage3(pg, source, full_text, orientation, metadata):
def enqueue_stage3(pg, source, full_text, orientation, metadata,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale):
"""Write Stage 3 queue row with orientation + explicit routing columns.
Routing columns (state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale) are first-class queue properties for Phase A.
Stage 3 reads them on every dequeue to choose bulk vs single-episode pathway.
The full Mistral metadata blob is also retained in stage2_metadata JSON for
debugging and future cycle work."""
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata)
VALUES (%s, %s, %s, %s)
INSERT INTO stage_3_queue (
source, full_text, orientation, stage2_metadata,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
orientation = EXCLUDED.orientation,
stage2_metadata = EXCLUDED.stage2_metadata,
state_type = EXCLUDED.state_type,
state_type_confidence = EXCLUDED.state_type_confidence,
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
state_type_rationale = EXCLUDED.state_type_rationale,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text, orientation, json.dumps(metadata)))
""", (source, full_text, orientation, json.dumps(metadata),
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale))
pg.commit()
@@ -144,7 +248,7 @@ def process_one(row):
return True
# Run Mistral
log.info(f" Running Mistral taxonomy-free pass...")
log.info(f" Running Mistral taxonomy-free + state-type pass...")
try:
meta = run_mistral(full_text)
except requests.exceptions.Timeout:
@@ -177,14 +281,25 @@ def process_one(row):
frames = meta.get("active_frames", [])
log.info(f" Frames: {frames}")
# Normalize state-type fields with safe-cheap defaults on malformed output.
# Note: Mistral may return valid orientation but malformed state-type;
# we accept the orientation and default the routing rather than fail
# the whole row, since defaults route to bulk (cheap, safe).
state_type, confidence, supersedes, rationale = normalize_state_fields(meta)
log.info(
f" State-type: {state_type} (conf={confidence}, "
f"supersedes={supersedes})"
)
orientation = build_orientation(meta)
meta["_model"] = "mistral:latest"
meta["_worker_version"] = WORKER_VERSION
meta["_generated_at"] = datetime.now().isoformat()
meta["char_length"] = char_length
# Enqueue Stage 3
enqueue_stage3(pg, source, full_text, orientation, meta)
# Enqueue Stage 3 with explicit routing columns
enqueue_stage3(pg, source, full_text, orientation, meta,
state_type, confidence, supersedes, rationale)
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
+202 -29
View File
@@ -1,22 +1,45 @@
#!/usr/bin/env python3
"""
Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation
Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti.
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
Chunking rationale: Large documents sent as single episodes cause FalkorDB
write lock contention during entity deduplication. Chunking at ~500 words
(matching Stage 1) produces smaller deduplication passes that don't block.
Each document's chunks are linked via Graphiti's saga mechanism, preserving
document structure in the graph.
Polls stage_3_queue, routes each row to one of two ingest pathways based on
state-type classification produced by Stage 2:
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of
17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents
producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk
commits, each tagged with the same saga value so Graphiti still links them.
- BULK pathway (existing): supersedes_prior_state=false OR confidence=low
OR routing fields missing. Fast, no temporal invalidation.
Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 —
Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's
consecutive_failures pattern with sidecar restart on threshold.
- SINGLE-EPISODE pathway (new): supersedes_prior_state=true AND
confidence in {medium, high}. Per-chunk POST to /episodes with shared
saga tag, full edge invalidation, per-chunk timeout/retry independence.
Routing rationale: the single-episode pathway is the correct API per
graphiti-core's docs for content that supersedes prior facts (it does
edge invalidation that bulk skips). It costs more per chunk because of
the resolve_edge LLM call; the routing rule keeps that cost bounded to
content that actually needs it.
Chunking rationale (preserved from prior versions): Large documents sent
as single episodes cause FalkorDB write lock contention during entity
deduplication. Chunking at ~500 words (matching Stage 1) produces smaller
deduplication passes that don't block. Each document's chunks are linked
via Graphiti's saga mechanism, preserving document structure in the graph.
Per-chunk heartbeat: single-episode pathway updates stage_3_queue.started_at
after each successful chunk POST so a long-running document doesn't cross
the 10-minute stale threshold mid-process and get re-dequeued by another
worker (or the same worker on next loop iteration). started_at thus means
"last activity timestamp" rather than "began at" — semantics that match
the dequeue query's intent (catch dead workers, not slow ones).
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed bulk
sagas of 17 and 19 chunks deadlock the sidecar's Python-side coordination.
Documents producing more than MAX_CHUNKS_PER_SAGA chunks on the bulk
pathway are split into multiple bulk commits, each tagged with the same
saga value so Graphiti still links them. The single-episode pathway
doesn't need this split since each chunk is its own POST.
Wedge detection: mirrors Stage 2's consecutive_failures pattern with
sidecar restart on threshold.
Runs as systemd service: aaronai-stage3.service
"""
@@ -44,17 +67,23 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5
INGEST_TIMEOUT = 600
WORKER_VERSION = "2.2"
WORKER_VERSION = "2.3"
# Match Stage 1 chunking parameters
CHUNK_SIZE_WORDS = 500
CHUNK_OVERLAP_WORDS = 50
# Documents under this threshold ingested as single episode (no chunking overhead)
SINGLE_EPISODE_THRESHOLD = 1500
# Sagas larger than this many chunks split into multiple commits
# Bulk-pathway sagas larger than this many chunks split into multiple commits
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
MAX_CHUNKS_PER_SAGA = 10
# Routing rule: single-episode pathway requires both signals positive.
# Anything else (false, NULL, low confidence) routes to bulk — the
# safer-cheaper default. Mistral parse drift can't accidentally trigger
# the expensive pathway.
HIGH_TRUST_CONFIDENCE = ("medium", "high")
def get_pg():
return psycopg2.connect(PG_DSN)
@@ -109,6 +138,22 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
return chunks
def heartbeat_row(row_id):
"""Refresh stage_3_queue.started_at to NOW() so a long-running single-episode
ingest doesn't cross the 10-minute stale threshold mid-process. Called
after each successful chunk POST. Best-effort: failures are logged but
don't fail the chunk — the worst case is a stale-threshold re-dequeue,
which graphiti's dedup will handle as a no-op."""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE stage_3_queue SET started_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
except Exception as e:
log.warning(f" Heartbeat update failed (continuing): {e}")
def post_bulk(payload, batch_label=""):
"""Single POST to /episodes/bulk with consistent error handling."""
resp = requests.post(
@@ -122,16 +167,32 @@ def post_bulk(payload, batch_label=""):
return resp.json()
def ingest_to_graphiti(source, full_text, orientation):
"""
Ingest document to Graphiti as chunked episodes linked by saga.
def post_episode(payload, episode_label=""):
"""Single POST to /episodes (singular) with consistent error handling.
Used by the single-episode pathway, one call per chunk."""
resp = requests.post(
f"{GRAPHITI_URL}/episodes",
json=payload,
timeout=INGEST_TIMEOUT
)
if not resp.ok:
prefix = f"{episode_label} " if episode_label else ""
raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}")
return resp.json()
def ingest_bulk(source, full_text, orientation):
"""
Bulk-pathway ingest: documents that don't supersede prior state.
Skips edge invalidation. Cheap. Three sub-paths by document size:
Three paths:
- Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
[note: 'single episode' here means one bulk call with one item, NOT
the single-episode-pathway; naming overlap is unfortunate but local]
- Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag
so Graphiti links them as one document unit
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
tag so Graphiti links them as one document unit
"""
char_length = len(full_text)
@@ -142,7 +203,7 @@ def ingest_to_graphiti(source, full_text, orientation):
"source_description": orientation,
"timestamp": datetime.now().isoformat(),
}]
log.info(f" Single episode ({char_length} chars)")
log.info(f" [bulk] Single episode ({char_length} chars)")
return post_bulk({"episodes": episodes, "group_id": "aaron"})
chunks = chunk_text(full_text)
@@ -158,7 +219,7 @@ def ingest_to_graphiti(source, full_text, orientation):
}
for i, chunk in enumerate(chunks)
]
log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)")
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
return post_bulk(
{"episodes": episodes, "group_id": "aaron", "saga": source}
)
@@ -166,7 +227,7 @@ def ingest_to_graphiti(source, full_text, orientation):
# Large document: split into batches sharing the same saga tag
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
log.info(
f" Chunked into {total_chunks} episodes ({char_length} chars); "
f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars); "
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
)
last_result = None
@@ -193,9 +254,110 @@ def ingest_to_graphiti(source, full_text, orientation):
return last_result
def ingest_single_episode(row_id, source, full_text, orientation):
"""
Single-episode pathway: documents that supersede prior state with
medium-or-high confidence. Each chunk is its own POST to /episodes
with shared saga tag. Each call independent: own timeout, own retry
envelope, own failure semantics.
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
stay committed (graphiti has already accepted them) and the function
raises with detail about which chunk failed and how many succeeded.
The caller marks the row failed_at with that detail; the operator
decides whether to re-enqueue. Re-ingestion will re-POST chunks 1..N-1
against the graph; graphiti's dedup will handle them as no-ops.
Heartbeats stage_3_queue.started_at after each successful chunk so the
row doesn't cross the 10-minute stale threshold while actively progressing.
"""
char_length = len(full_text)
# Short documents: one POST, no chunking, no saga
if char_length < SINGLE_EPISODE_THRESHOLD:
payload = {
"name": source,
"content": full_text,
"source_description": orientation,
"group_id": "aaron",
"timestamp": datetime.now().isoformat(),
}
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
return post_episode(payload, episode_label="single-ep")
chunks = chunk_text(full_text)
total_chunks = len(chunks)
log.info(
f" [single-ep] Chunked into {total_chunks} episodes ({char_length} chars); "
f"per-chunk POSTs with shared saga"
)
succeeded = 0
for i, chunk in enumerate(chunks):
chunk_num = i + 1
payload = {
"name": f"{source} [{chunk_num}/{total_chunks}]",
"content": chunk,
"source_description": orientation,
"group_id": "aaron",
"saga": source,
"timestamp": datetime.now().isoformat(),
}
try:
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
succeeded += 1
log.info(f" chunk {chunk_num}/{total_chunks} committed")
heartbeat_row(row_id)
except Exception as e:
# Annotate the exception with partial-success detail so the
# caller can write a clean failure_reason. Re-raise to abort
# the document; previously-committed chunks stay in the graph.
raise RuntimeError(
f"single_episode_partial: chunk {chunk_num}/{total_chunks} failed "
f"(succeeded: {succeeded}); error: {str(e)[:300]}"
) from e
log.info(f" [single-ep] All {total_chunks} chunks committed")
return {"ok": True, "chunks_committed": total_chunks}
def should_route_single_episode(supersedes_prior_state, state_type_confidence):
"""Routing decision for Phase A.
Single-episode pathway requires BOTH:
- supersedes_prior_state is true (Mistral judged it temporally superseding)
- confidence is medium or high (Mistral was confident enough to trust)
Anything else routes to bulk: false supersedes, NULL fields (legacy rows
pre-dating Stage 2 v2.2), low confidence even on supersedes=true. This
is the safer-cheaper default — bulk skips temporal invalidation, which
is the right behavior when we're not confident the content needs it.
"""
if not supersedes_prior_state:
return False
if state_type_confidence not in HIGH_TRUST_CONFIDENCE:
return False
return True
def process_one(row):
row_id, source, full_text, orientation = row
log.info(f"Ingesting to Graphiti: {source}")
(row_id, source, full_text, orientation,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale) = row
# Route decision
use_single_episode = should_route_single_episode(
supersedes_prior_state, state_type_confidence
)
pathway = "single-episode" if use_single_episode else "bulk"
log.info(
f"Ingesting to Graphiti: {source} "
f"[pathway={pathway}, state_type={state_type}, "
f"conf={state_type_confidence}, supersedes={supersedes_prior_state}]"
)
if state_type_rationale:
log.info(f" rationale: {state_type_rationale[:200]}")
pg = get_pg()
cur = pg.cursor()
@@ -204,9 +366,16 @@ def process_one(row):
(row_id,)
)
pg.commit()
pg.close()
try:
result = ingest_to_graphiti(source, full_text, orientation)
if use_single_episode:
result = ingest_single_episode(row_id, source, full_text, orientation)
else:
result = ingest_bulk(source, full_text, orientation)
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
@@ -214,6 +383,8 @@ def process_one(row):
return True
except Exception as e:
log.error(f" Graphiti ingest failed for {source}: {e}")
pg = get_pg()
cur = pg.cursor()
cur.execute("""
UPDATE stage_3_queue
SET failed_at = NOW(), failure_reason = %s
@@ -235,7 +406,9 @@ def run():
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT id, source, full_text, orientation
SELECT id, source, full_text, orientation,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale
FROM stage_3_queue
WHERE completed_at IS NULL
AND failed_at IS NULL