diff --git a/scripts/stage2_worker.py b/scripts/stage2_worker.py index fec01c4..b8df56f 100644 --- a/scripts/stage2_worker.py +++ b/scripts/stage2_worker.py @@ -1,12 +1,19 @@ #!/usr/bin/env python3 """ -Stage 2 Worker — Taxonomy-Free Mistral Orientation -Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3. -Runs as systemd service: aaronai-stage2.service +Stage 2 Worker — Taxonomy-Free Mistral Orientation + State-Type Classification + +Polls stage_2_queue, runs Mistral pass that produces: + (a) orientation context (active frames, frame relationships, extraction focus) + (b) state-type classification for Stage 3 routing (current/reference/historical, + supersedes_prior_state boolean, confidence, rationale) + +Enqueues Stage 3 with both concerns as explicit columns. Routing: - char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit) -- char_length >= 2000 → enqueue Stage 3 with orientation metadata +- char_length >= 2000 → enqueue Stage 3 with orientation + routing metadata + +Runs as systemd service: aaronai-stage2.service """ import os, json, time, subprocess, logging, requests @@ -33,22 +40,68 @@ CHAR_LENGTH_THRESHOLD = 2000 REQUEST_TIMEOUT = 300 RETRY_ATTEMPTS = 2 POLL_INTERVAL = 5 -WORKER_VERSION = "2.1" +WORKER_VERSION = "2.2" + +# Valid values for state-type fields. Mistral output validated against these; +# anything outside falls through to safe-cheap defaults (bulk routing). +VALID_STATE_TYPES = ("current", "reference", "historical") +VALID_CONFIDENCE = ("low", "medium", "high") + +# Safe-cheap defaults applied when Mistral output is missing or malformed. +# All route to bulk pathway (no temporal invalidation cost) per Phase A +# routing rule: route to single-episode only on supersedes_prior_state=true +# AND confidence in {medium, high}. +DEFAULT_STATE_TYPE = "reference" +DEFAULT_CONFIDENCE = "low" +DEFAULT_SUPERSEDES = False +DEFAULT_RATIONALE = "mistral output missing or malformed; default applied" TAXFREE_PROMPT = ( - "You are a metadata extraction system. Given a document, describe its content " - "shape for use as orientation context in a knowledge graph extraction pass.\n\n" - "Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n" - "Instead, describe:\n" - "- What domains or frames are active in this content (there may be several simultaneously)\n" - "- How those frames relate to each other in this specific document\n" - "- What kind of relational content a knowledge graph extractor should look for\n\n" - "Output JSON only. No prose, no explanation, no markdown.\n\n" - "Schema:\n" + "You are a metadata extraction system. Given a document, produce a JSON object " + "describing two distinct concerns about the document. Output JSON only — no prose, " + "no explanation, no markdown.\n\n" + + "CONCERN 1 — ORIENTATION CONTEXT (for downstream knowledge-graph extraction):\n" + "Describe the content shape. Do not summarize content. Do not extract entities. " + "Do not assign a single category label.\n" + " - active_frames: which domains or frames are active in this content (there may " + "be several simultaneously)\n" + " - frame_relationships: how those frames relate in this specific document, " + "one sentence\n" + " - extraction_orientation: what kind of relational content a knowledge-graph " + "extractor should look for, one sentence\n" + " - one_sentence_summary: a single-sentence content summary\n\n" + + "CONCERN 2 — STATE-TYPE CLASSIFICATION (for ingest routing):\n" + "Classify the document's relationship to time and prior facts. This is independent " + "of orientation: a document can be in a 'reference frame' (orientation) while " + "describing 'current state' (state-type), or vice versa. Judge the document's " + "ROLE, not its topic.\n" + " - state_type: one of\n" + " 'current' — describes the author's present state, recent decisions, or " + "ongoing situations as of the document's date\n" + " 'reference' — timeless or slow-changing material: external books, " + "documentation, technical reference, conceptual writing\n" + " 'historical' — describes past events, prior states, or archived material " + "the author is recording but not living in\n" + " - state_type_confidence: 'low' | 'medium' | 'high' — how confident you are in " + "the classification. Use 'low' when genuinely uncertain.\n" + " - supersedes_prior_state: true if this document describes facts that should " + "REPLACE previously-recorded facts about the same subjects (e.g. a journal entry " + "saying 'I no longer work at X', a status update, a corrected belief). false " + "otherwise. Default to false when uncertain.\n" + " - state_type_rationale: one sentence explaining the classification\n\n" + + "Output schema (flat, all eight fields at the top level):\n" '{"active_frames": ["", ""], ' '"frame_relationships": "", ' '"extraction_orientation": "", ' - '"one_sentence_summary": ""}\n\n' + '"one_sentence_summary": "", ' + '"state_type": "current|reference|historical", ' + '"state_type_confidence": "low|medium|high", ' + '"supersedes_prior_state": true|false, ' + '"state_type_rationale": ""}\n\n' + "Document:\n" ) @@ -100,6 +153,38 @@ def run_mistral(doc_text): return {"error": "parse_failed", "raw": raw[:200]} +def normalize_state_fields(meta): + """Validate and normalize the four state-type fields from Mistral output. + Anything missing or malformed falls through to safe-cheap defaults that + route to bulk pathway (no temporal invalidation work).""" + + raw_state_type = meta.get("state_type") + if isinstance(raw_state_type, str) and raw_state_type.lower() in VALID_STATE_TYPES: + state_type = raw_state_type.lower() + else: + state_type = DEFAULT_STATE_TYPE + + raw_conf = meta.get("state_type_confidence") + if isinstance(raw_conf, str) and raw_conf.lower() in VALID_CONFIDENCE: + confidence = raw_conf.lower() + else: + confidence = DEFAULT_CONFIDENCE + + raw_supersedes = meta.get("supersedes_prior_state") + if isinstance(raw_supersedes, bool): + supersedes = raw_supersedes + else: + supersedes = DEFAULT_SUPERSEDES + + raw_rationale = meta.get("state_type_rationale") + if isinstance(raw_rationale, str) and raw_rationale.strip(): + rationale = raw_rationale.strip()[:1000] + else: + rationale = DEFAULT_RATIONALE + + return state_type, confidence, supersedes, rationale + + def build_orientation(meta): frames = ", ".join(meta.get("active_frames", [])) rel = meta.get("frame_relationships", "") @@ -108,20 +193,39 @@ def build_orientation(meta): return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}" -def enqueue_stage3(pg, source, full_text, orientation, metadata): +def enqueue_stage3(pg, source, full_text, orientation, metadata, + state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale): + """Write Stage 3 queue row with orientation + explicit routing columns. + + Routing columns (state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale) are first-class queue properties for Phase A. + Stage 3 reads them on every dequeue to choose bulk vs single-episode pathway. + The full Mistral metadata blob is also retained in stage2_metadata JSON for + debugging and future cycle work.""" cur = pg.cursor() cur.execute(""" - INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata) - VALUES (%s, %s, %s, %s) + INSERT INTO stage_3_queue ( + source, full_text, orientation, stage2_metadata, + state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (source) DO UPDATE SET full_text = EXCLUDED.full_text, orientation = EXCLUDED.orientation, stage2_metadata = EXCLUDED.stage2_metadata, + state_type = EXCLUDED.state_type, + state_type_confidence = EXCLUDED.state_type_confidence, + supersedes_prior_state = EXCLUDED.supersedes_prior_state, + state_type_rationale = EXCLUDED.state_type_rationale, enqueued_at = NOW(), completed_at = NULL, failed_at = NULL, attempts = 0 - """, (source, full_text, orientation, json.dumps(metadata))) + """, (source, full_text, orientation, json.dumps(metadata), + state_type, state_type_confidence, supersedes_prior_state, + state_type_rationale)) pg.commit() @@ -144,7 +248,7 @@ def process_one(row): return True # Run Mistral - log.info(f" Running Mistral taxonomy-free pass...") + log.info(f" Running Mistral taxonomy-free + state-type pass...") try: meta = run_mistral(full_text) except requests.exceptions.Timeout: @@ -177,14 +281,25 @@ def process_one(row): frames = meta.get("active_frames", []) log.info(f" Frames: {frames}") + # Normalize state-type fields with safe-cheap defaults on malformed output. + # Note: Mistral may return valid orientation but malformed state-type; + # we accept the orientation and default the routing rather than fail + # the whole row, since defaults route to bulk (cheap, safe). + state_type, confidence, supersedes, rationale = normalize_state_fields(meta) + log.info( + f" State-type: {state_type} (conf={confidence}, " + f"supersedes={supersedes})" + ) + orientation = build_orientation(meta) meta["_model"] = "mistral:latest" meta["_worker_version"] = WORKER_VERSION meta["_generated_at"] = datetime.now().isoformat() meta["char_length"] = char_length - # Enqueue Stage 3 - enqueue_stage3(pg, source, full_text, orientation, meta) + # Enqueue Stage 3 with explicit routing columns + enqueue_stage3(pg, source, full_text, orientation, meta, + state_type, confidence, supersedes, rationale) cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,)) pg.commit() pg.close()