Compare commits
4 Commits
main
...
30beeb3a26
| Author | SHA1 | Date | |
|---|---|---|---|
| 30beeb3a26 | |||
| e7de7fb64b | |||
| 70e87e3ab5 | |||
| 8b0a163670 |
@@ -0,0 +1,23 @@
|
|||||||
|
-- 20260501-001 — Stage 3 queue routing columns for Phase A bulk-vs-single-episode routing
|
||||||
|
--
|
||||||
|
-- Adds four columns and one index to stage_3_queue, written by Stage 2 v2.2
|
||||||
|
-- and read by Stage 3 v2.3 to choose between bulk and single-episode ingest
|
||||||
|
-- pathways. See architecture doc and Phase A handoff (2026-05-01) for design.
|
||||||
|
--
|
||||||
|
-- Required by:
|
||||||
|
-- scripts/stage2_worker.py >= 2.2
|
||||||
|
-- scripts/stage3_worker.py >= 2.3
|
||||||
|
--
|
||||||
|
-- Idempotent: safe to re-apply against a database where the columns already
|
||||||
|
-- exist (was applied live before this file was created).
|
||||||
|
|
||||||
|
ALTER TABLE stage_3_queue
|
||||||
|
ADD COLUMN IF NOT EXISTS state_type TEXT,
|
||||||
|
ADD COLUMN IF NOT EXISTS state_type_confidence TEXT,
|
||||||
|
ADD COLUMN IF NOT EXISTS supersedes_prior_state BOOLEAN,
|
||||||
|
ADD COLUMN IF NOT EXISTS state_type_rationale TEXT;
|
||||||
|
|
||||||
|
-- Index on the routing signal — Stage 3 reads this on every dequeue,
|
||||||
|
-- and observability queries (item 6: routing_decisions) will filter on it.
|
||||||
|
CREATE INDEX IF NOT EXISTS stage_3_queue_supersedes_idx
|
||||||
|
ON stage_3_queue (supersedes_prior_state);
|
||||||
@@ -0,0 +1,37 @@
|
|||||||
|
# BirdAI database migrations
|
||||||
|
|
||||||
|
Schema changes applied to the BirdAI Postgres database, in chronological order.
|
||||||
|
Filenames are YYYYMMDD-NNN_short_description.sql where NNN is a sequence number
|
||||||
|
within the day for ordering when multiple migrations land same-day.
|
||||||
|
|
||||||
|
## Conventions
|
||||||
|
|
||||||
|
- Each file is idempotent: uses IF NOT EXISTS / IF EXISTS so it can be
|
||||||
|
re-run safely against a database that already has the change applied. This
|
||||||
|
matters because we don't track which migrations a given DB has applied (no
|
||||||
|
migrations table yet — that's its own future migration).
|
||||||
|
- Each file is a single logical change: one feature, one rollout. Don't pile
|
||||||
|
unrelated DDL into one file.
|
||||||
|
- Each file documents what it's for and which worker version requires it
|
||||||
|
in a header comment, so the relationship between schema and code is legible
|
||||||
|
from either side.
|
||||||
|
- Migrations are forward-only. No down-migrations. If a change is wrong,
|
||||||
|
write a new migration that fixes it.
|
||||||
|
|
||||||
|
## Applying
|
||||||
|
|
||||||
|
Against the live DB:
|
||||||
|
|
||||||
|
psql "$PG_DSN" -f migrations/YYYYMMDD-NNN_name.sql
|
||||||
|
|
||||||
|
Against a fresh DB (disaster recovery, dev clone), apply all files in order:
|
||||||
|
|
||||||
|
for f in migrations/*.sql; do
|
||||||
|
echo "Applying $f"
|
||||||
|
psql "$PG_DSN" -f "$f"
|
||||||
|
done
|
||||||
|
|
||||||
|
## Pending: migrations tracking table
|
||||||
|
|
||||||
|
There is no schema_migrations table yet. Adding one is itself a migration —
|
||||||
|
deferred until a second migration after this one lands and the need is real.
|
||||||
@@ -92,6 +92,12 @@ class BulkEpisodeRequest(BaseModel):
|
|||||||
episodes: list[BulkEpisodeItem]
|
episodes: list[BulkEpisodeItem]
|
||||||
group_id: str | None = None
|
group_id: str | None = None
|
||||||
saga: str | None = None
|
saga: str | None = None
|
||||||
|
# Batch-level extraction guidance. graphiti-core inserts this into the
|
||||||
|
# entity-extraction and edge-extraction prompts only — NOT into dedup
|
||||||
|
# prompts. Use to bias *what* gets extracted, not *how* dedup runs.
|
||||||
|
# Verified 2026-05-01 by reading extract_nodes.py, extract_edges.py,
|
||||||
|
# dedupe_nodes.py, dedupe_edges.py in graphiti-core.
|
||||||
|
custom_extraction_instructions: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class EpisodeRequest(BaseModel):
|
class EpisodeRequest(BaseModel):
|
||||||
@@ -101,6 +107,7 @@ class EpisodeRequest(BaseModel):
|
|||||||
timestamp: str | None = None
|
timestamp: str | None = None
|
||||||
group_id: str | None = None
|
group_id: str | None = None
|
||||||
custom_extraction_instructions: str | None = None
|
custom_extraction_instructions: str | None = None
|
||||||
|
saga: str | None = None
|
||||||
|
|
||||||
@app.get("/health")
|
@app.get("/health")
|
||||||
async def health():
|
async def health():
|
||||||
@@ -112,7 +119,7 @@ async def add_episode(req: EpisodeRequest):
|
|||||||
from graphiti_core.nodes import EpisodeType
|
from graphiti_core.nodes import EpisodeType
|
||||||
try:
|
try:
|
||||||
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
|
ref_time = datetime.fromisoformat(req.timestamp) if req.timestamp else datetime.now()
|
||||||
await g.add_episode(
|
kwargs = dict(
|
||||||
name=req.name,
|
name=req.name,
|
||||||
episode_body=req.content,
|
episode_body=req.content,
|
||||||
source=EpisodeType.text,
|
source=EpisodeType.text,
|
||||||
@@ -121,6 +128,11 @@ async def add_episode(req: EpisodeRequest):
|
|||||||
group_id=req.group_id or GROUP_ID,
|
group_id=req.group_id or GROUP_ID,
|
||||||
custom_extraction_instructions=req.custom_extraction_instructions,
|
custom_extraction_instructions=req.custom_extraction_instructions,
|
||||||
)
|
)
|
||||||
|
# Saga is supported on graphiti-core add_episode but kept optional
|
||||||
|
# so older callers don't need to know about it.
|
||||||
|
if req.saga is not None:
|
||||||
|
kwargs["saga"] = req.saga
|
||||||
|
await g.add_episode(**kwargs)
|
||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
|
log.error(f"Episode ingestion failed: {e}\n{traceback.format_exc()}")
|
||||||
@@ -142,11 +154,16 @@ async def add_episodes_bulk(req: BulkEpisodeRequest):
|
|||||||
reference_time=ref_time,
|
reference_time=ref_time,
|
||||||
))
|
))
|
||||||
try:
|
try:
|
||||||
result = await g.add_episode_bulk(
|
kwargs = dict(
|
||||||
bulk_episodes=raw_episodes,
|
bulk_episodes=raw_episodes,
|
||||||
group_id=req.group_id or GROUP_ID,
|
group_id=req.group_id or GROUP_ID,
|
||||||
saga=req.saga or None,
|
saga=req.saga or None,
|
||||||
)
|
)
|
||||||
|
# Pass-through only when set, so callers that don't supply
|
||||||
|
# instructions get graphiti-core's default behavior unchanged.
|
||||||
|
if req.custom_extraction_instructions is not None:
|
||||||
|
kwargs["custom_extraction_instructions"] = req.custom_extraction_instructions
|
||||||
|
result = await g.add_episode_bulk(**kwargs)
|
||||||
return {"ok": True, "count": len(raw_episodes)}
|
return {"ok": True, "count": len(raw_episodes)}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
|
log.error(f"Bulk ingestion failed: {e}\n{traceback.format_exc()}")
|
||||||
|
|||||||
+137
-22
@@ -1,12 +1,19 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Stage 2 Worker — Taxonomy-Free Mistral Orientation
|
Stage 2 Worker — Taxonomy-Free Mistral Orientation + State-Type Classification
|
||||||
Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3.
|
|
||||||
Runs as systemd service: aaronai-stage2.service
|
Polls stage_2_queue, runs Mistral pass that produces:
|
||||||
|
(a) orientation context (active frames, frame relationships, extraction focus)
|
||||||
|
(b) state-type classification for Stage 3 routing (current/reference/historical,
|
||||||
|
supersedes_prior_state boolean, confidence, rationale)
|
||||||
|
|
||||||
|
Enqueues Stage 3 with both concerns as explicit columns.
|
||||||
|
|
||||||
Routing:
|
Routing:
|
||||||
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
|
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
|
||||||
- char_length >= 2000 → enqueue Stage 3 with orientation metadata
|
- char_length >= 2000 → enqueue Stage 3 with orientation + routing metadata
|
||||||
|
|
||||||
|
Runs as systemd service: aaronai-stage2.service
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import os, json, time, subprocess, logging, requests
|
import os, json, time, subprocess, logging, requests
|
||||||
@@ -33,22 +40,68 @@ CHAR_LENGTH_THRESHOLD = 2000
|
|||||||
REQUEST_TIMEOUT = 300
|
REQUEST_TIMEOUT = 300
|
||||||
RETRY_ATTEMPTS = 2
|
RETRY_ATTEMPTS = 2
|
||||||
POLL_INTERVAL = 5
|
POLL_INTERVAL = 5
|
||||||
WORKER_VERSION = "2.1"
|
WORKER_VERSION = "2.2"
|
||||||
|
|
||||||
|
# Valid values for state-type fields. Mistral output validated against these;
|
||||||
|
# anything outside falls through to safe-cheap defaults (bulk routing).
|
||||||
|
VALID_STATE_TYPES = ("current", "reference", "historical")
|
||||||
|
VALID_CONFIDENCE = ("low", "medium", "high")
|
||||||
|
|
||||||
|
# Safe-cheap defaults applied when Mistral output is missing or malformed.
|
||||||
|
# All route to bulk pathway (no temporal invalidation cost) per Phase A
|
||||||
|
# routing rule: route to single-episode only on supersedes_prior_state=true
|
||||||
|
# AND confidence in {medium, high}.
|
||||||
|
DEFAULT_STATE_TYPE = "reference"
|
||||||
|
DEFAULT_CONFIDENCE = "low"
|
||||||
|
DEFAULT_SUPERSEDES = False
|
||||||
|
DEFAULT_RATIONALE = "mistral output missing or malformed; default applied"
|
||||||
|
|
||||||
TAXFREE_PROMPT = (
|
TAXFREE_PROMPT = (
|
||||||
"You are a metadata extraction system. Given a document, describe its content "
|
"You are a metadata extraction system. Given a document, produce a JSON object "
|
||||||
"shape for use as orientation context in a knowledge graph extraction pass.\n\n"
|
"describing two distinct concerns about the document. Output JSON only — no prose, "
|
||||||
"Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n"
|
"no explanation, no markdown.\n\n"
|
||||||
"Instead, describe:\n"
|
|
||||||
"- What domains or frames are active in this content (there may be several simultaneously)\n"
|
"CONCERN 1 — ORIENTATION CONTEXT (for downstream knowledge-graph extraction):\n"
|
||||||
"- How those frames relate to each other in this specific document\n"
|
"Describe the content shape. Do not summarize content. Do not extract entities. "
|
||||||
"- What kind of relational content a knowledge graph extractor should look for\n\n"
|
"Do not assign a single category label.\n"
|
||||||
"Output JSON only. No prose, no explanation, no markdown.\n\n"
|
" - active_frames: which domains or frames are active in this content (there may "
|
||||||
"Schema:\n"
|
"be several simultaneously)\n"
|
||||||
|
" - frame_relationships: how those frames relate in this specific document, "
|
||||||
|
"one sentence\n"
|
||||||
|
" - extraction_orientation: what kind of relational content a knowledge-graph "
|
||||||
|
"extractor should look for, one sentence\n"
|
||||||
|
" - one_sentence_summary: a single-sentence content summary\n\n"
|
||||||
|
|
||||||
|
"CONCERN 2 — STATE-TYPE CLASSIFICATION (for ingest routing):\n"
|
||||||
|
"Classify the document's relationship to time and prior facts. This is independent "
|
||||||
|
"of orientation: a document can be in a 'reference frame' (orientation) while "
|
||||||
|
"describing 'current state' (state-type), or vice versa. Judge the document's "
|
||||||
|
"ROLE, not its topic.\n"
|
||||||
|
" - state_type: one of\n"
|
||||||
|
" 'current' — describes the author's present state, recent decisions, or "
|
||||||
|
"ongoing situations as of the document's date\n"
|
||||||
|
" 'reference' — timeless or slow-changing material: external books, "
|
||||||
|
"documentation, technical reference, conceptual writing\n"
|
||||||
|
" 'historical' — describes past events, prior states, or archived material "
|
||||||
|
"the author is recording but not living in\n"
|
||||||
|
" - state_type_confidence: 'low' | 'medium' | 'high' — how confident you are in "
|
||||||
|
"the classification. Use 'low' when genuinely uncertain.\n"
|
||||||
|
" - supersedes_prior_state: true if this document describes facts that should "
|
||||||
|
"REPLACE previously-recorded facts about the same subjects (e.g. a journal entry "
|
||||||
|
"saying 'I no longer work at X', a status update, a corrected belief). false "
|
||||||
|
"otherwise. Default to false when uncertain.\n"
|
||||||
|
" - state_type_rationale: one sentence explaining the classification\n\n"
|
||||||
|
|
||||||
|
"Output schema (flat, all eight fields at the top level):\n"
|
||||||
'{"active_frames": ["<frame 1>", "<frame 2>"], '
|
'{"active_frames": ["<frame 1>", "<frame 2>"], '
|
||||||
'"frame_relationships": "<one sentence>", '
|
'"frame_relationships": "<one sentence>", '
|
||||||
'"extraction_orientation": "<one sentence>", '
|
'"extraction_orientation": "<one sentence>", '
|
||||||
'"one_sentence_summary": "<one sentence>"}\n\n'
|
'"one_sentence_summary": "<one sentence>", '
|
||||||
|
'"state_type": "current|reference|historical", '
|
||||||
|
'"state_type_confidence": "low|medium|high", '
|
||||||
|
'"supersedes_prior_state": true|false, '
|
||||||
|
'"state_type_rationale": "<one sentence>"}\n\n'
|
||||||
|
|
||||||
"Document:\n"
|
"Document:\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -100,6 +153,38 @@ def run_mistral(doc_text):
|
|||||||
return {"error": "parse_failed", "raw": raw[:200]}
|
return {"error": "parse_failed", "raw": raw[:200]}
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_state_fields(meta):
|
||||||
|
"""Validate and normalize the four state-type fields from Mistral output.
|
||||||
|
Anything missing or malformed falls through to safe-cheap defaults that
|
||||||
|
route to bulk pathway (no temporal invalidation work)."""
|
||||||
|
|
||||||
|
raw_state_type = meta.get("state_type")
|
||||||
|
if isinstance(raw_state_type, str) and raw_state_type.lower() in VALID_STATE_TYPES:
|
||||||
|
state_type = raw_state_type.lower()
|
||||||
|
else:
|
||||||
|
state_type = DEFAULT_STATE_TYPE
|
||||||
|
|
||||||
|
raw_conf = meta.get("state_type_confidence")
|
||||||
|
if isinstance(raw_conf, str) and raw_conf.lower() in VALID_CONFIDENCE:
|
||||||
|
confidence = raw_conf.lower()
|
||||||
|
else:
|
||||||
|
confidence = DEFAULT_CONFIDENCE
|
||||||
|
|
||||||
|
raw_supersedes = meta.get("supersedes_prior_state")
|
||||||
|
if isinstance(raw_supersedes, bool):
|
||||||
|
supersedes = raw_supersedes
|
||||||
|
else:
|
||||||
|
supersedes = DEFAULT_SUPERSEDES
|
||||||
|
|
||||||
|
raw_rationale = meta.get("state_type_rationale")
|
||||||
|
if isinstance(raw_rationale, str) and raw_rationale.strip():
|
||||||
|
rationale = raw_rationale.strip()[:1000]
|
||||||
|
else:
|
||||||
|
rationale = DEFAULT_RATIONALE
|
||||||
|
|
||||||
|
return state_type, confidence, supersedes, rationale
|
||||||
|
|
||||||
|
|
||||||
def build_orientation(meta):
|
def build_orientation(meta):
|
||||||
frames = ", ".join(meta.get("active_frames", []))
|
frames = ", ".join(meta.get("active_frames", []))
|
||||||
rel = meta.get("frame_relationships", "")
|
rel = meta.get("frame_relationships", "")
|
||||||
@@ -108,20 +193,39 @@ def build_orientation(meta):
|
|||||||
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
|
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
|
||||||
|
|
||||||
|
|
||||||
def enqueue_stage3(pg, source, full_text, orientation, metadata):
|
def enqueue_stage3(pg, source, full_text, orientation, metadata,
|
||||||
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale):
|
||||||
|
"""Write Stage 3 queue row with orientation + explicit routing columns.
|
||||||
|
|
||||||
|
Routing columns (state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale) are first-class queue properties for Phase A.
|
||||||
|
Stage 3 reads them on every dequeue to choose bulk vs single-episode pathway.
|
||||||
|
The full Mistral metadata blob is also retained in stage2_metadata JSON for
|
||||||
|
debugging and future cycle work."""
|
||||||
cur = pg.cursor()
|
cur = pg.cursor()
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata)
|
INSERT INTO stage_3_queue (
|
||||||
VALUES (%s, %s, %s, %s)
|
source, full_text, orientation, stage2_metadata,
|
||||||
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale
|
||||||
|
)
|
||||||
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||||
ON CONFLICT (source) DO UPDATE SET
|
ON CONFLICT (source) DO UPDATE SET
|
||||||
full_text = EXCLUDED.full_text,
|
full_text = EXCLUDED.full_text,
|
||||||
orientation = EXCLUDED.orientation,
|
orientation = EXCLUDED.orientation,
|
||||||
stage2_metadata = EXCLUDED.stage2_metadata,
|
stage2_metadata = EXCLUDED.stage2_metadata,
|
||||||
|
state_type = EXCLUDED.state_type,
|
||||||
|
state_type_confidence = EXCLUDED.state_type_confidence,
|
||||||
|
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
|
||||||
|
state_type_rationale = EXCLUDED.state_type_rationale,
|
||||||
enqueued_at = NOW(),
|
enqueued_at = NOW(),
|
||||||
completed_at = NULL,
|
completed_at = NULL,
|
||||||
failed_at = NULL,
|
failed_at = NULL,
|
||||||
attempts = 0
|
attempts = 0
|
||||||
""", (source, full_text, orientation, json.dumps(metadata)))
|
""", (source, full_text, orientation, json.dumps(metadata),
|
||||||
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale))
|
||||||
pg.commit()
|
pg.commit()
|
||||||
|
|
||||||
|
|
||||||
@@ -144,7 +248,7 @@ def process_one(row):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
# Run Mistral
|
# Run Mistral
|
||||||
log.info(f" Running Mistral taxonomy-free pass...")
|
log.info(f" Running Mistral taxonomy-free + state-type pass...")
|
||||||
try:
|
try:
|
||||||
meta = run_mistral(full_text)
|
meta = run_mistral(full_text)
|
||||||
except requests.exceptions.Timeout:
|
except requests.exceptions.Timeout:
|
||||||
@@ -177,14 +281,25 @@ def process_one(row):
|
|||||||
frames = meta.get("active_frames", [])
|
frames = meta.get("active_frames", [])
|
||||||
log.info(f" Frames: {frames}")
|
log.info(f" Frames: {frames}")
|
||||||
|
|
||||||
|
# Normalize state-type fields with safe-cheap defaults on malformed output.
|
||||||
|
# Note: Mistral may return valid orientation but malformed state-type;
|
||||||
|
# we accept the orientation and default the routing rather than fail
|
||||||
|
# the whole row, since defaults route to bulk (cheap, safe).
|
||||||
|
state_type, confidence, supersedes, rationale = normalize_state_fields(meta)
|
||||||
|
log.info(
|
||||||
|
f" State-type: {state_type} (conf={confidence}, "
|
||||||
|
f"supersedes={supersedes})"
|
||||||
|
)
|
||||||
|
|
||||||
orientation = build_orientation(meta)
|
orientation = build_orientation(meta)
|
||||||
meta["_model"] = "mistral:latest"
|
meta["_model"] = "mistral:latest"
|
||||||
meta["_worker_version"] = WORKER_VERSION
|
meta["_worker_version"] = WORKER_VERSION
|
||||||
meta["_generated_at"] = datetime.now().isoformat()
|
meta["_generated_at"] = datetime.now().isoformat()
|
||||||
meta["char_length"] = char_length
|
meta["char_length"] = char_length
|
||||||
|
|
||||||
# Enqueue Stage 3
|
# Enqueue Stage 3 with explicit routing columns
|
||||||
enqueue_stage3(pg, source, full_text, orientation, meta)
|
enqueue_stage3(pg, source, full_text, orientation, meta,
|
||||||
|
state_type, confidence, supersedes, rationale)
|
||||||
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
||||||
pg.commit()
|
pg.commit()
|
||||||
pg.close()
|
pg.close()
|
||||||
|
|||||||
+202
-29
@@ -1,22 +1,45 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation
|
Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing
|
||||||
Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti.
|
|
||||||
|
|
||||||
Chunking rationale: Large documents sent as single episodes cause FalkorDB
|
Polls stage_3_queue, routes each row to one of two ingest pathways based on
|
||||||
write lock contention during entity deduplication. Chunking at ~500 words
|
state-type classification produced by Stage 2:
|
||||||
(matching Stage 1) produces smaller deduplication passes that don't block.
|
|
||||||
Each document's chunks are linked via Graphiti's saga mechanism, preserving
|
|
||||||
document structure in the graph.
|
|
||||||
|
|
||||||
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed sagas of
|
- BULK pathway (existing): supersedes_prior_state=false OR confidence=low
|
||||||
17 and 19 chunks deadlock the sidecar's Python-side coordination. Documents
|
OR routing fields missing. Fast, no temporal invalidation.
|
||||||
producing more than MAX_CHUNKS_PER_SAGA chunks are split into multiple bulk
|
|
||||||
commits, each tagged with the same saga value so Graphiti still links them.
|
|
||||||
|
|
||||||
Wedge detection: 2026-05-01 incident also surfaced the asymmetry with Stage 2 —
|
- SINGLE-EPISODE pathway (new): supersedes_prior_state=true AND
|
||||||
Stage 3 had no recovery path when the sidecar deadlocked. Now mirrors Stage 2's
|
confidence in {medium, high}. Per-chunk POST to /episodes with shared
|
||||||
consecutive_failures pattern with sidecar restart on threshold.
|
saga tag, full edge invalidation, per-chunk timeout/retry independence.
|
||||||
|
|
||||||
|
Routing rationale: the single-episode pathway is the correct API per
|
||||||
|
graphiti-core's docs for content that supersedes prior facts (it does
|
||||||
|
edge invalidation that bulk skips). It costs more per chunk because of
|
||||||
|
the resolve_edge LLM call; the routing rule keeps that cost bounded to
|
||||||
|
content that actually needs it.
|
||||||
|
|
||||||
|
Chunking rationale (preserved from prior versions): Large documents sent
|
||||||
|
as single episodes cause FalkorDB write lock contention during entity
|
||||||
|
deduplication. Chunking at ~500 words (matching Stage 1) produces smaller
|
||||||
|
deduplication passes that don't block. Each document's chunks are linked
|
||||||
|
via Graphiti's saga mechanism, preserving document structure in the graph.
|
||||||
|
|
||||||
|
Per-chunk heartbeat: single-episode pathway updates stage_3_queue.started_at
|
||||||
|
after each successful chunk POST so a long-running document doesn't cross
|
||||||
|
the 10-minute stale threshold mid-process and get re-dequeued by another
|
||||||
|
worker (or the same worker on next loop iteration). started_at thus means
|
||||||
|
"last activity timestamp" rather than "began at" — semantics that match
|
||||||
|
the dequeue query's intent (catch dead workers, not slow ones).
|
||||||
|
|
||||||
|
Saga-size limit (MAX_CHUNKS_PER_SAGA): 2026-05-01 incident showed bulk
|
||||||
|
sagas of 17 and 19 chunks deadlock the sidecar's Python-side coordination.
|
||||||
|
Documents producing more than MAX_CHUNKS_PER_SAGA chunks on the bulk
|
||||||
|
pathway are split into multiple bulk commits, each tagged with the same
|
||||||
|
saga value so Graphiti still links them. The single-episode pathway
|
||||||
|
doesn't need this split since each chunk is its own POST.
|
||||||
|
|
||||||
|
Wedge detection: mirrors Stage 2's consecutive_failures pattern with
|
||||||
|
sidecar restart on threshold.
|
||||||
|
|
||||||
Runs as systemd service: aaronai-stage3.service
|
Runs as systemd service: aaronai-stage3.service
|
||||||
"""
|
"""
|
||||||
@@ -44,17 +67,23 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
|
|||||||
RETRY_ATTEMPTS = 2
|
RETRY_ATTEMPTS = 2
|
||||||
POLL_INTERVAL = 5
|
POLL_INTERVAL = 5
|
||||||
INGEST_TIMEOUT = 600
|
INGEST_TIMEOUT = 600
|
||||||
WORKER_VERSION = "2.2"
|
WORKER_VERSION = "2.3"
|
||||||
|
|
||||||
# Match Stage 1 chunking parameters
|
# Match Stage 1 chunking parameters
|
||||||
CHUNK_SIZE_WORDS = 500
|
CHUNK_SIZE_WORDS = 500
|
||||||
CHUNK_OVERLAP_WORDS = 50
|
CHUNK_OVERLAP_WORDS = 50
|
||||||
# Documents under this threshold ingested as single episode (no chunking overhead)
|
# Documents under this threshold ingested as single episode (no chunking overhead)
|
||||||
SINGLE_EPISODE_THRESHOLD = 1500
|
SINGLE_EPISODE_THRESHOLD = 1500
|
||||||
# Sagas larger than this many chunks split into multiple commits
|
# Bulk-pathway sagas larger than this many chunks split into multiple commits
|
||||||
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
|
# (2026-05-01 incident: 17 and 19 chunk sagas deadlocked sidecar)
|
||||||
MAX_CHUNKS_PER_SAGA = 10
|
MAX_CHUNKS_PER_SAGA = 10
|
||||||
|
|
||||||
|
# Routing rule: single-episode pathway requires both signals positive.
|
||||||
|
# Anything else (false, NULL, low confidence) routes to bulk — the
|
||||||
|
# safer-cheaper default. Mistral parse drift can't accidentally trigger
|
||||||
|
# the expensive pathway.
|
||||||
|
HIGH_TRUST_CONFIDENCE = ("medium", "high")
|
||||||
|
|
||||||
|
|
||||||
def get_pg():
|
def get_pg():
|
||||||
return psycopg2.connect(PG_DSN)
|
return psycopg2.connect(PG_DSN)
|
||||||
@@ -109,6 +138,22 @@ def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
|
|||||||
return chunks
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
|
def heartbeat_row(row_id):
|
||||||
|
"""Refresh stage_3_queue.started_at to NOW() so a long-running single-episode
|
||||||
|
ingest doesn't cross the 10-minute stale threshold mid-process. Called
|
||||||
|
after each successful chunk POST. Best-effort: failures are logged but
|
||||||
|
don't fail the chunk — the worst case is a stale-threshold re-dequeue,
|
||||||
|
which graphiti's dedup will handle as a no-op."""
|
||||||
|
try:
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute("UPDATE stage_3_queue SET started_at = NOW() WHERE id = %s", (row_id,))
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f" Heartbeat update failed (continuing): {e}")
|
||||||
|
|
||||||
|
|
||||||
def post_bulk(payload, batch_label=""):
|
def post_bulk(payload, batch_label=""):
|
||||||
"""Single POST to /episodes/bulk with consistent error handling."""
|
"""Single POST to /episodes/bulk with consistent error handling."""
|
||||||
resp = requests.post(
|
resp = requests.post(
|
||||||
@@ -122,16 +167,32 @@ def post_bulk(payload, batch_label=""):
|
|||||||
return resp.json()
|
return resp.json()
|
||||||
|
|
||||||
|
|
||||||
def ingest_to_graphiti(source, full_text, orientation):
|
def post_episode(payload, episode_label=""):
|
||||||
"""
|
"""Single POST to /episodes (singular) with consistent error handling.
|
||||||
Ingest document to Graphiti as chunked episodes linked by saga.
|
Used by the single-episode pathway, one call per chunk."""
|
||||||
|
resp = requests.post(
|
||||||
|
f"{GRAPHITI_URL}/episodes",
|
||||||
|
json=payload,
|
||||||
|
timeout=INGEST_TIMEOUT
|
||||||
|
)
|
||||||
|
if not resp.ok:
|
||||||
|
prefix = f"{episode_label} " if episode_label else ""
|
||||||
|
raise RuntimeError(f"{prefix}Sidecar {resp.status_code}: {resp.text[:500]}")
|
||||||
|
return resp.json()
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_bulk(source, full_text, orientation):
|
||||||
|
"""
|
||||||
|
Bulk-pathway ingest: documents that don't supersede prior state.
|
||||||
|
Skips edge invalidation. Cheap. Three sub-paths by document size:
|
||||||
|
|
||||||
Three paths:
|
|
||||||
- Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
|
- Short documents (<SINGLE_EPISODE_THRESHOLD): single episode, no saga
|
||||||
|
[note: 'single episode' here means one bulk call with one item, NOT
|
||||||
|
the single-episode-pathway; naming overlap is unfortunate but local]
|
||||||
- Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
|
- Medium documents (chunks <= MAX_CHUNKS_PER_SAGA): one bulk commit, saga-linked
|
||||||
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
- Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of
|
||||||
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag
|
MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga
|
||||||
so Graphiti links them as one document unit
|
tag so Graphiti links them as one document unit
|
||||||
"""
|
"""
|
||||||
char_length = len(full_text)
|
char_length = len(full_text)
|
||||||
|
|
||||||
@@ -142,7 +203,7 @@ def ingest_to_graphiti(source, full_text, orientation):
|
|||||||
"source_description": orientation,
|
"source_description": orientation,
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
}]
|
}]
|
||||||
log.info(f" Single episode ({char_length} chars)")
|
log.info(f" [bulk] Single episode ({char_length} chars)")
|
||||||
return post_bulk({"episodes": episodes, "group_id": "aaron"})
|
return post_bulk({"episodes": episodes, "group_id": "aaron"})
|
||||||
|
|
||||||
chunks = chunk_text(full_text)
|
chunks = chunk_text(full_text)
|
||||||
@@ -158,7 +219,7 @@ def ingest_to_graphiti(source, full_text, orientation):
|
|||||||
}
|
}
|
||||||
for i, chunk in enumerate(chunks)
|
for i, chunk in enumerate(chunks)
|
||||||
]
|
]
|
||||||
log.info(f" Chunked into {total_chunks} episodes ({char_length} chars)")
|
log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)")
|
||||||
return post_bulk(
|
return post_bulk(
|
||||||
{"episodes": episodes, "group_id": "aaron", "saga": source}
|
{"episodes": episodes, "group_id": "aaron", "saga": source}
|
||||||
)
|
)
|
||||||
@@ -166,7 +227,7 @@ def ingest_to_graphiti(source, full_text, orientation):
|
|||||||
# Large document: split into batches sharing the same saga tag
|
# Large document: split into batches sharing the same saga tag
|
||||||
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA
|
||||||
log.info(
|
log.info(
|
||||||
f" Chunked into {total_chunks} episodes ({char_length} chars); "
|
f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars); "
|
||||||
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
|
f"splitting into {batch_count} batches of up to {MAX_CHUNKS_PER_SAGA}"
|
||||||
)
|
)
|
||||||
last_result = None
|
last_result = None
|
||||||
@@ -193,9 +254,110 @@ def ingest_to_graphiti(source, full_text, orientation):
|
|||||||
return last_result
|
return last_result
|
||||||
|
|
||||||
|
|
||||||
|
def ingest_single_episode(row_id, source, full_text, orientation):
|
||||||
|
"""
|
||||||
|
Single-episode pathway: documents that supersede prior state with
|
||||||
|
medium-or-high confidence. Each chunk is its own POST to /episodes
|
||||||
|
with shared saga tag. Each call independent: own timeout, own retry
|
||||||
|
envelope, own failure semantics.
|
||||||
|
|
||||||
|
Partial-success behavior: if chunk N of total fails, chunks 1..N-1
|
||||||
|
stay committed (graphiti has already accepted them) and the function
|
||||||
|
raises with detail about which chunk failed and how many succeeded.
|
||||||
|
The caller marks the row failed_at with that detail; the operator
|
||||||
|
decides whether to re-enqueue. Re-ingestion will re-POST chunks 1..N-1
|
||||||
|
against the graph; graphiti's dedup will handle them as no-ops.
|
||||||
|
|
||||||
|
Heartbeats stage_3_queue.started_at after each successful chunk so the
|
||||||
|
row doesn't cross the 10-minute stale threshold while actively progressing.
|
||||||
|
"""
|
||||||
|
char_length = len(full_text)
|
||||||
|
|
||||||
|
# Short documents: one POST, no chunking, no saga
|
||||||
|
if char_length < SINGLE_EPISODE_THRESHOLD:
|
||||||
|
payload = {
|
||||||
|
"name": source,
|
||||||
|
"content": full_text,
|
||||||
|
"source_description": orientation,
|
||||||
|
"group_id": "aaron",
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
}
|
||||||
|
log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)")
|
||||||
|
return post_episode(payload, episode_label="single-ep")
|
||||||
|
|
||||||
|
chunks = chunk_text(full_text)
|
||||||
|
total_chunks = len(chunks)
|
||||||
|
log.info(
|
||||||
|
f" [single-ep] Chunked into {total_chunks} episodes ({char_length} chars); "
|
||||||
|
f"per-chunk POSTs with shared saga"
|
||||||
|
)
|
||||||
|
|
||||||
|
succeeded = 0
|
||||||
|
for i, chunk in enumerate(chunks):
|
||||||
|
chunk_num = i + 1
|
||||||
|
payload = {
|
||||||
|
"name": f"{source} [{chunk_num}/{total_chunks}]",
|
||||||
|
"content": chunk,
|
||||||
|
"source_description": orientation,
|
||||||
|
"group_id": "aaron",
|
||||||
|
"saga": source,
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")
|
||||||
|
succeeded += 1
|
||||||
|
log.info(f" chunk {chunk_num}/{total_chunks} committed")
|
||||||
|
heartbeat_row(row_id)
|
||||||
|
except Exception as e:
|
||||||
|
# Annotate the exception with partial-success detail so the
|
||||||
|
# caller can write a clean failure_reason. Re-raise to abort
|
||||||
|
# the document; previously-committed chunks stay in the graph.
|
||||||
|
raise RuntimeError(
|
||||||
|
f"single_episode_partial: chunk {chunk_num}/{total_chunks} failed "
|
||||||
|
f"(succeeded: {succeeded}); error: {str(e)[:300]}"
|
||||||
|
) from e
|
||||||
|
|
||||||
|
log.info(f" [single-ep] All {total_chunks} chunks committed")
|
||||||
|
return {"ok": True, "chunks_committed": total_chunks}
|
||||||
|
|
||||||
|
|
||||||
|
def should_route_single_episode(supersedes_prior_state, state_type_confidence):
|
||||||
|
"""Routing decision for Phase A.
|
||||||
|
|
||||||
|
Single-episode pathway requires BOTH:
|
||||||
|
- supersedes_prior_state is true (Mistral judged it temporally superseding)
|
||||||
|
- confidence is medium or high (Mistral was confident enough to trust)
|
||||||
|
|
||||||
|
Anything else routes to bulk: false supersedes, NULL fields (legacy rows
|
||||||
|
pre-dating Stage 2 v2.2), low confidence even on supersedes=true. This
|
||||||
|
is the safer-cheaper default — bulk skips temporal invalidation, which
|
||||||
|
is the right behavior when we're not confident the content needs it.
|
||||||
|
"""
|
||||||
|
if not supersedes_prior_state:
|
||||||
|
return False
|
||||||
|
if state_type_confidence not in HIGH_TRUST_CONFIDENCE:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def process_one(row):
|
def process_one(row):
|
||||||
row_id, source, full_text, orientation = row
|
(row_id, source, full_text, orientation,
|
||||||
log.info(f"Ingesting to Graphiti: {source}")
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale) = row
|
||||||
|
|
||||||
|
# Route decision
|
||||||
|
use_single_episode = should_route_single_episode(
|
||||||
|
supersedes_prior_state, state_type_confidence
|
||||||
|
)
|
||||||
|
pathway = "single-episode" if use_single_episode else "bulk"
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f"Ingesting to Graphiti: {source} "
|
||||||
|
f"[pathway={pathway}, state_type={state_type}, "
|
||||||
|
f"conf={state_type_confidence}, supersedes={supersedes_prior_state}]"
|
||||||
|
)
|
||||||
|
if state_type_rationale:
|
||||||
|
log.info(f" rationale: {state_type_rationale[:200]}")
|
||||||
|
|
||||||
pg = get_pg()
|
pg = get_pg()
|
||||||
cur = pg.cursor()
|
cur = pg.cursor()
|
||||||
@@ -204,9 +366,16 @@ def process_one(row):
|
|||||||
(row_id,)
|
(row_id,)
|
||||||
)
|
)
|
||||||
pg.commit()
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = ingest_to_graphiti(source, full_text, orientation)
|
if use_single_episode:
|
||||||
|
result = ingest_single_episode(row_id, source, full_text, orientation)
|
||||||
|
else:
|
||||||
|
result = ingest_bulk(source, full_text, orientation)
|
||||||
|
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
|
||||||
pg.commit()
|
pg.commit()
|
||||||
pg.close()
|
pg.close()
|
||||||
@@ -214,6 +383,8 @@ def process_one(row):
|
|||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error(f" Graphiti ingest failed for {source}: {e}")
|
log.error(f" Graphiti ingest failed for {source}: {e}")
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
UPDATE stage_3_queue
|
UPDATE stage_3_queue
|
||||||
SET failed_at = NOW(), failure_reason = %s
|
SET failed_at = NOW(), failure_reason = %s
|
||||||
@@ -235,7 +406,9 @@ def run():
|
|||||||
pg = get_pg()
|
pg = get_pg()
|
||||||
cur = pg.cursor()
|
cur = pg.cursor()
|
||||||
cur.execute("""
|
cur.execute("""
|
||||||
SELECT id, source, full_text, orientation
|
SELECT id, source, full_text, orientation,
|
||||||
|
state_type, state_type_confidence, supersedes_prior_state,
|
||||||
|
state_type_rationale
|
||||||
FROM stage_3_queue
|
FROM stage_3_queue
|
||||||
WHERE completed_at IS NULL
|
WHERE completed_at IS NULL
|
||||||
AND failed_at IS NULL
|
AND failed_at IS NULL
|
||||||
|
|||||||
Reference in New Issue
Block a user