From 7f07972109eb3d3382d2b6307cd318606e0d525c Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Sat, 2 May 2026 05:20:14 +0000 Subject: [PATCH] stage2_worker: ON CONFLICT clause resets all run-state fields on re-enqueue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: when a row in stage_3_queue gets re-enqueued (same source ingested again after Stage 2 re-runs), the ON CONFLICT (source) DO UPDATE clause updated content fields and reset enqueued_at, completed_at, failed_at, attempts — but did not reset started_at, failure_reason, or external_job_id. Stale started_at from a prior attempt makes the row invisible to the Stage 3 worker's claim filter (which uses started_at IS NULL). The row sits queued forever; Stage 3 never picks it up; the source effectively fails silently after a re-trigger. Discovered tonight while testing the bulk pathway after the substrate fix: a journal entry that had been ingested earlier (and manually marked completed during recovery from a worker timeout) showed enqueued_at from the new touch but started_at from the original 01:40 attempt. Fix extends the upsert clause to NULL all run-state fields so re-enqueue behaves as 'fresh attempt.' After fix, re-triggered journal entry routed cleanly through Stage 2 → Stage 3 → bulk pathway → sidecar bulk job → 60ms commit (worst-case dedup against already-known content). --- scripts/stage2_worker.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/stage2_worker.py b/scripts/stage2_worker.py index b8df56f..61064d6 100644 --- a/scripts/stage2_worker.py +++ b/scripts/stage2_worker.py @@ -220,8 +220,15 @@ def enqueue_stage3(pg, source, full_text, orientation, metadata, supersedes_prior_state = EXCLUDED.supersedes_prior_state, state_type_rationale = EXCLUDED.state_type_rationale, enqueued_at = NOW(), + -- Reset all run-state fields on re-enqueue. Without this, + -- stale started_at from a prior attempt makes the row + -- invisible to the Stage 3 worker's claim filter (which + -- typically uses started_at IS NULL). + started_at = NULL, completed_at = NULL, failed_at = NULL, + failure_reason = NULL, + external_job_id = NULL, attempts = 0 """, (source, full_text, orientation, json.dumps(metadata), state_type, state_type_confidence, supersedes_prior_state,