diff --git a/scripts/stage2_worker.py b/scripts/stage2_worker.py index 3630bda..fec01c4 100644 --- a/scripts/stage2_worker.py +++ b/scripts/stage2_worker.py @@ -33,7 +33,7 @@ CHAR_LENGTH_THRESHOLD = 2000 REQUEST_TIMEOUT = 300 RETRY_ATTEMPTS = 2 POLL_INTERVAL = 5 -WORKER_VERSION = "2.0" +WORKER_VERSION = "2.1" TAXFREE_PROMPT = ( "You are a metadata extraction system. Given a document, describe its content " @@ -67,7 +67,10 @@ def write_heartbeat(): def recover_wedge(): log.warning("Mistral wedge detected — restarting Ollama") - subprocess.run(["sudo", "systemctl", "restart", "ollama"], capture_output=True) + result = subprocess.run(["/usr/bin/sudo", "/bin/systemctl", "restart", "ollama"], capture_output=True, text=True) + if result.returncode != 0: + log.error(f"Ollama restart failed (rc={result.returncode}): stdout={result.stdout!r} stderr={result.stderr!r}") + return False time.sleep(30) for _ in range(3): try: @@ -146,6 +149,11 @@ def process_one(row): meta = run_mistral(full_text) except requests.exceptions.Timeout: log.warning(f" Mistral timeout on {source}") + cur.execute( + "UPDATE stage_2_queue SET failed_at = NOW(), failure_reason = %s WHERE id = %s", + (f"mistral_timeout_after_{REQUEST_TIMEOUT}s", row_id) + ) + pg.commit() pg.close() return False except Exception as e: @@ -156,6 +164,16 @@ def process_one(row): pg.close() return False + if meta.get("error") == "parse_failed": + log.warning(f" Mistral parse failure on {source}: {meta.get('raw', '')[:100]}") + cur.execute( + "UPDATE stage_2_queue SET failed_at = NOW(), failure_reason = %s WHERE id = %s", + ("mistral_parse_failure", row_id) + ) + pg.commit() + pg.close() + return False + frames = meta.get("active_frames", []) log.info(f" Frames: {frames}") @@ -209,8 +227,9 @@ def run(): if consecutive_failures >= 2: log.warning("Multiple consecutive failures — checking for Mistral wedge") recovered = recover_wedge() - if recovered: - consecutive_failures = 0 + if not recovered: + log.error("Wedge recovery failed — continuing anyway") + consecutive_failures = 0 time.sleep(10) else: consecutive_failures = 0