stage2_worker: v2.2 — add state-type classification for Stage 3 routing

Mistral pass now produces two concerns in a single flat JSON output:
  (a) orientation context (existing four fields, unchanged semantics)
  (b) state-type classification: state_type (current/reference/historical),
      state_type_confidence (low/medium/high), supersedes_prior_state (bool),
      state_type_rationale (text)

Routing fields written as explicit columns on stage_3_queue (separate
ALTER TABLE migration adds them: state_type, state_type_confidence,
supersedes_prior_state, state_type_rationale + index on supersedes).

Safe-cheap defaults on malformed Mistral output: state_type='reference',
confidence='low', supersedes=false. All defaults route to bulk pathway
(no temporal invalidation cost) so Mistral parse drift can't accidentally
trigger expensive single-episode ingest.

Phase A item 2 of three. Sidecar (item 1, commit 8b0a163) already plumbs
custom_extraction_instructions through to /episodes/bulk. Stage 3 routing
logic (item 3) follows.
This commit is contained in:
2026-05-01 19:02:11 +00:00
parent 8b0a163670
commit 70e87e3ab5
+137 -22
View File
@@ -1,12 +1,19 @@
#!/usr/bin/env python3
"""
Stage 2 Worker — Taxonomy-Free Mistral Orientation
Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3.
Runs as systemd service: aaronai-stage2.service
Stage 2 Worker — Taxonomy-Free Mistral Orientation + State-Type Classification
Polls stage_2_queue, runs Mistral pass that produces:
(a) orientation context (active frames, frame relationships, extraction focus)
(b) state-type classification for Stage 3 routing (current/reference/historical,
supersedes_prior_state boolean, confidence, rationale)
Enqueues Stage 3 with both concerns as explicit columns.
Routing:
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
- char_length >= 2000 → enqueue Stage 3 with orientation metadata
- char_length >= 2000 → enqueue Stage 3 with orientation + routing metadata
Runs as systemd service: aaronai-stage2.service
"""
import os, json, time, subprocess, logging, requests
@@ -33,22 +40,68 @@ CHAR_LENGTH_THRESHOLD = 2000
REQUEST_TIMEOUT = 300
RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5
WORKER_VERSION = "2.1"
WORKER_VERSION = "2.2"
# Valid values for state-type fields. Mistral output validated against these;
# anything outside falls through to safe-cheap defaults (bulk routing).
VALID_STATE_TYPES = ("current", "reference", "historical")
VALID_CONFIDENCE = ("low", "medium", "high")
# Safe-cheap defaults applied when Mistral output is missing or malformed.
# All route to bulk pathway (no temporal invalidation cost) per Phase A
# routing rule: route to single-episode only on supersedes_prior_state=true
# AND confidence in {medium, high}.
DEFAULT_STATE_TYPE = "reference"
DEFAULT_CONFIDENCE = "low"
DEFAULT_SUPERSEDES = False
DEFAULT_RATIONALE = "mistral output missing or malformed; default applied"
TAXFREE_PROMPT = (
"You are a metadata extraction system. Given a document, describe its content "
"shape for use as orientation context in a knowledge graph extraction pass.\n\n"
"Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n"
"Instead, describe:\n"
"- What domains or frames are active in this content (there may be several simultaneously)\n"
"- How those frames relate to each other in this specific document\n"
"- What kind of relational content a knowledge graph extractor should look for\n\n"
"Output JSON only. No prose, no explanation, no markdown.\n\n"
"Schema:\n"
"You are a metadata extraction system. Given a document, produce a JSON object "
"describing two distinct concerns about the document. Output JSON only — no prose, "
"no explanation, no markdown.\n\n"
"CONCERN 1 — ORIENTATION CONTEXT (for downstream knowledge-graph extraction):\n"
"Describe the content shape. Do not summarize content. Do not extract entities. "
"Do not assign a single category label.\n"
" - active_frames: which domains or frames are active in this content (there may "
"be several simultaneously)\n"
" - frame_relationships: how those frames relate in this specific document, "
"one sentence\n"
" - extraction_orientation: what kind of relational content a knowledge-graph "
"extractor should look for, one sentence\n"
" - one_sentence_summary: a single-sentence content summary\n\n"
"CONCERN 2 — STATE-TYPE CLASSIFICATION (for ingest routing):\n"
"Classify the document's relationship to time and prior facts. This is independent "
"of orientation: a document can be in a 'reference frame' (orientation) while "
"describing 'current state' (state-type), or vice versa. Judge the document's "
"ROLE, not its topic.\n"
" - state_type: one of\n"
" 'current' — describes the author's present state, recent decisions, or "
"ongoing situations as of the document's date\n"
" 'reference' — timeless or slow-changing material: external books, "
"documentation, technical reference, conceptual writing\n"
" 'historical' — describes past events, prior states, or archived material "
"the author is recording but not living in\n"
" - state_type_confidence: 'low' | 'medium' | 'high' — how confident you are in "
"the classification. Use 'low' when genuinely uncertain.\n"
" - supersedes_prior_state: true if this document describes facts that should "
"REPLACE previously-recorded facts about the same subjects (e.g. a journal entry "
"saying 'I no longer work at X', a status update, a corrected belief). false "
"otherwise. Default to false when uncertain.\n"
" - state_type_rationale: one sentence explaining the classification\n\n"
"Output schema (flat, all eight fields at the top level):\n"
'{"active_frames": ["<frame 1>", "<frame 2>"], '
'"frame_relationships": "<one sentence>", '
'"extraction_orientation": "<one sentence>", '
'"one_sentence_summary": "<one sentence>"}\n\n'
'"one_sentence_summary": "<one sentence>", '
'"state_type": "current|reference|historical", '
'"state_type_confidence": "low|medium|high", '
'"supersedes_prior_state": true|false, '
'"state_type_rationale": "<one sentence>"}\n\n'
"Document:\n"
)
@@ -100,6 +153,38 @@ def run_mistral(doc_text):
return {"error": "parse_failed", "raw": raw[:200]}
def normalize_state_fields(meta):
"""Validate and normalize the four state-type fields from Mistral output.
Anything missing or malformed falls through to safe-cheap defaults that
route to bulk pathway (no temporal invalidation work)."""
raw_state_type = meta.get("state_type")
if isinstance(raw_state_type, str) and raw_state_type.lower() in VALID_STATE_TYPES:
state_type = raw_state_type.lower()
else:
state_type = DEFAULT_STATE_TYPE
raw_conf = meta.get("state_type_confidence")
if isinstance(raw_conf, str) and raw_conf.lower() in VALID_CONFIDENCE:
confidence = raw_conf.lower()
else:
confidence = DEFAULT_CONFIDENCE
raw_supersedes = meta.get("supersedes_prior_state")
if isinstance(raw_supersedes, bool):
supersedes = raw_supersedes
else:
supersedes = DEFAULT_SUPERSEDES
raw_rationale = meta.get("state_type_rationale")
if isinstance(raw_rationale, str) and raw_rationale.strip():
rationale = raw_rationale.strip()[:1000]
else:
rationale = DEFAULT_RATIONALE
return state_type, confidence, supersedes, rationale
def build_orientation(meta):
frames = ", ".join(meta.get("active_frames", []))
rel = meta.get("frame_relationships", "")
@@ -108,20 +193,39 @@ def build_orientation(meta):
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
def enqueue_stage3(pg, source, full_text, orientation, metadata):
def enqueue_stage3(pg, source, full_text, orientation, metadata,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale):
"""Write Stage 3 queue row with orientation + explicit routing columns.
Routing columns (state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale) are first-class queue properties for Phase A.
Stage 3 reads them on every dequeue to choose bulk vs single-episode pathway.
The full Mistral metadata blob is also retained in stage2_metadata JSON for
debugging and future cycle work."""
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata)
VALUES (%s, %s, %s, %s)
INSERT INTO stage_3_queue (
source, full_text, orientation, stage2_metadata,
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
orientation = EXCLUDED.orientation,
stage2_metadata = EXCLUDED.stage2_metadata,
state_type = EXCLUDED.state_type,
state_type_confidence = EXCLUDED.state_type_confidence,
supersedes_prior_state = EXCLUDED.supersedes_prior_state,
state_type_rationale = EXCLUDED.state_type_rationale,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text, orientation, json.dumps(metadata)))
""", (source, full_text, orientation, json.dumps(metadata),
state_type, state_type_confidence, supersedes_prior_state,
state_type_rationale))
pg.commit()
@@ -144,7 +248,7 @@ def process_one(row):
return True
# Run Mistral
log.info(f" Running Mistral taxonomy-free pass...")
log.info(f" Running Mistral taxonomy-free + state-type pass...")
try:
meta = run_mistral(full_text)
except requests.exceptions.Timeout:
@@ -177,14 +281,25 @@ def process_one(row):
frames = meta.get("active_frames", [])
log.info(f" Frames: {frames}")
# Normalize state-type fields with safe-cheap defaults on malformed output.
# Note: Mistral may return valid orientation but malformed state-type;
# we accept the orientation and default the routing rather than fail
# the whole row, since defaults route to bulk (cheap, safe).
state_type, confidence, supersedes, rationale = normalize_state_fields(meta)
log.info(
f" State-type: {state_type} (conf={confidence}, "
f"supersedes={supersedes})"
)
orientation = build_orientation(meta)
meta["_model"] = "mistral:latest"
meta["_worker_version"] = WORKER_VERSION
meta["_generated_at"] = datetime.now().isoformat()
meta["char_length"] = char_length
# Enqueue Stage 3
enqueue_stage3(pg, source, full_text, orientation, meta)
# Enqueue Stage 3 with explicit routing columns
enqueue_stage3(pg, source, full_text, orientation, meta,
state_type, confidence, supersedes, rationale)
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()