diff --git a/scripts/stage3_worker.py b/scripts/stage3_worker.py index f64ce0e..4f8ccb0 100644 --- a/scripts/stage3_worker.py +++ b/scripts/stage3_worker.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ Stage 3 Worker — Graphiti Ingest with Bulk-vs-Single-Episode Routing + + Encoder Instructions (v1.0) Polls stage_3_queue, routes each row to one of two ingest pathways based on state-type classification produced by Stage 2: @@ -12,6 +13,18 @@ state-type classification produced by Stage 2: confidence in {medium, high}. Per-chunk POST to /episodes with shared saga tag, full edge invalidation, per-chunk timeout/retry independence. +Both pathways pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar via +custom_extraction_instructions, which graphiti-core inserts into entity +and edge extraction prompts (NOT dedup prompts — that's intentional under +the encoder-stays-naive commitment). + +Architectural posture: the encoder is content-naïve. It does not draw on +prior knowledge of the user, the substrate, or the cycle's accumulated +work. Schema and personality live in the cycle's consolidated substrate, +where the dream phase shapes them. The encoder produces source-grounded +ground truth for the cycle to work from. See EXTRACTION_INSTRUCTIONS_V1 +below for the extraction guidance text. + Routing rationale: the single-episode pathway is the correct API per graphiti-core's docs for content that supersedes prior facts (it does edge invalidation that bulk skips). It costs more per chunk because of @@ -67,7 +80,7 @@ HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat") RETRY_ATTEMPTS = 2 POLL_INTERVAL = 5 INGEST_TIMEOUT = 600 -WORKER_VERSION = "2.3" +WORKER_VERSION = "2.4" # Match Stage 1 chunking parameters CHUNK_SIZE_WORDS = 500 @@ -84,6 +97,87 @@ MAX_CHUNKS_PER_SAGA = 10 # the expensive pathway. HIGH_TRUST_CONFIDENCE = ("medium", "high") +# Encoder extraction guidance v1.0 — see module docstring for posture rationale. +# Passed to graphiti-core via custom_extraction_instructions on both ingest +# pathways. Inserted into entity-extraction and edge-extraction prompts only; +# does NOT enter dedup prompts. Encoder-stays-naïve commitment is structural, +# not versioned: this text gets refined over time but the encoder does not +# acquire substrate context as the cycle matures. +EXTRACTION_INSTRUCTIONS_V1 = """\ +EXTRACTION GUIDANCE — BirdAI cascade + +The encoder's job is faithful capture from this chunk's text. It does +not draw on prior knowledge of the user, the substrate, or the cycle's +accumulated work. Schema, personality, and inferred context live in +the cycle's consolidated substrate, where the dream phase shapes them +through prediction-error replay and speculation. The encoder stays +content-naïve so the cycle has source-grounded ground truth to work +from. + +The orientation produced by an upstream pass describes content shape, +not content interpretation. Use it as forward-facing guidance for what +to attend to in this document. Do not let it bound or limit what you +extract. + +PREDICATE NAMING + +Produce semantic predicates that describe the actual relationship the +text states. Use verbs or verb phrases — "wrote", "advised", "founded", +"works at", "led to", "contradicts", "is autobiographical to" — not +generic placeholders. Reserve generic forms (for example, "relates to" +or "mentions") for cases where the text genuinely does not specify a +more particular relationship. The verb is the load-bearing part of +the fact; preserving it is what makes the relationship queryable later. + +EXTRACTION POSTURE + +Extract from this chunk's text as if each entity is encountered fresh. +Do not try to reconcile entities you find here with entities that +might already exist elsewhere in the graph. Redundant entity instances +are acceptable. Cross-document entity resolution is downstream cycle +work, not extraction work. + +When the same entity appears multiple times within this chunk with +slightly different spellings — a common artifact of voice transcription — +prefer the more frequent or more canonical-looking form. Do not invent +canonical forms; choose among the variants the text actually contains. + +EXTRACT FROM THE SOURCE + +Extract relationships the text states or strongly implies through +direct linguistic markers ("X led to Y", "X works for Y", "X met Y at +Z"). Do not extend extraction to relationships the text neither states +nor directly implies. Inferred relationships are produced by the +cycle's dream phase as speculative edges with explicit low-confidence +tagging, where they can be evaluated and either ratified or pruned by +subsequent cycle work. Encoding-time inference, mixed in with source- +grounded extraction, would lose the speculation/source distinction the +cycle's consolidation work relies on. + +DO NOT PRE-EMPT CYCLE WORK + +Do not omit relationships because they seem redundant with prior +extractions or with the existing graph. Cross-document entity +resolution and edge consolidation are downstream cycle operations; +redundant extraction at this stage is intentional. Extracting the +same fact from multiple sources gives the cycle's consolidation work +the recurrence signal it relies on. + +EXTRACTION DEPTH + +Use the orientation's frame_relationships and extraction_orientation +fields to inform what to attend to. If the orientation describes +cross-domain relational content, look for relationships that bridge +those domains explicitly, with named predicates for the bridging. +If the orientation describes single-domain technical content, look +for the structural relationships internal to that domain. + +Extract every entity and every relationship the text states. Do not +summarize, do not filter, do not omit content because it seems +incidental. The orientation tells you what to look for; the source +text tells you what is there. +""" + def get_pg(): return psycopg2.connect(PG_DSN) @@ -193,6 +287,8 @@ def ingest_bulk(source, full_text, orientation): - Large documents (chunks > MAX_CHUNKS_PER_SAGA): split into batches of MAX_CHUNKS_PER_SAGA, each its own bulk commit, all sharing the same saga tag so Graphiti links them as one document unit + + All three sub-paths pass EXTRACTION_INSTRUCTIONS_V1 to the sidecar. """ char_length = len(full_text) @@ -204,7 +300,11 @@ def ingest_bulk(source, full_text, orientation): "timestamp": datetime.now().isoformat(), }] log.info(f" [bulk] Single episode ({char_length} chars)") - return post_bulk({"episodes": episodes, "group_id": "aaron"}) + return post_bulk({ + "episodes": episodes, + "group_id": "aaron", + "custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1, + }) chunks = chunk_text(full_text) total_chunks = len(chunks) @@ -220,9 +320,12 @@ def ingest_bulk(source, full_text, orientation): for i, chunk in enumerate(chunks) ] log.info(f" [bulk] Chunked into {total_chunks} episodes ({char_length} chars)") - return post_bulk( - {"episodes": episodes, "group_id": "aaron", "saga": source} - ) + return post_bulk({ + "episodes": episodes, + "group_id": "aaron", + "saga": source, + "custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1, + }) # Large document: split into batches sharing the same saga tag batch_count = (total_chunks + MAX_CHUNKS_PER_SAGA - 1) // MAX_CHUNKS_PER_SAGA @@ -247,7 +350,12 @@ def ingest_bulk(source, full_text, orientation): batch_label = f"batch {batch_idx + 1}/{batch_count} (chunks {start + 1}-{end})" log.info(f" {batch_label} starting") last_result = post_bulk( - {"episodes": episodes, "group_id": "aaron", "saga": source}, + { + "episodes": episodes, + "group_id": "aaron", + "saga": source, + "custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1, + }, batch_label=batch_label, ) log.info(f" {batch_label} committed") @@ -261,6 +369,8 @@ def ingest_single_episode(row_id, source, full_text, orientation): with shared saga tag. Each call independent: own timeout, own retry envelope, own failure semantics. + Each chunk POST passes EXTRACTION_INSTRUCTIONS_V1 to the sidecar. + Partial-success behavior: if chunk N of total fails, chunks 1..N-1 stay committed (graphiti has already accepted them) and the function raises with detail about which chunk failed and how many succeeded. @@ -281,6 +391,7 @@ def ingest_single_episode(row_id, source, full_text, orientation): "source_description": orientation, "group_id": "aaron", "timestamp": datetime.now().isoformat(), + "custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1, } log.info(f" [single-ep] Single episode, no chunking ({char_length} chars)") return post_episode(payload, episode_label="single-ep") @@ -302,6 +413,7 @@ def ingest_single_episode(row_id, source, full_text, orientation): "group_id": "aaron", "saga": source, "timestamp": datetime.now().isoformat(), + "custom_extraction_instructions": EXTRACTION_INSTRUCTIONS_V1, } try: post_episode(payload, episode_label=f"chunk {chunk_num}/{total_chunks}")