feat: stage2/3 pipeline, taxonomy-free cascade, E1.8/E4 experiments, corpus migration state

This commit is contained in:
2026-04-30 04:04:31 +00:00
parent 62b5b5453a
commit 2b9a1782c1
14 changed files with 6145 additions and 5 deletions
+31 -3
View File
@@ -11,7 +11,7 @@ from docx import Document
from pypdf import PdfReader
from pptx import Presentation
load_dotenv(Path.home() / "aaronai" / ".env")
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
print("Loading embedding model...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
@@ -63,11 +63,34 @@ def make_id(filepath, chunk_index):
path_hash = hashlib.md5(str(filepath).encode()).hexdigest()[:8]
return f"{path_hash}_{chunk_index}"
def enqueue_stage2(source, full_text):
"""Enqueue document for Stage 2 (Mistral orientation) → Stage 3 (Graphiti ingest).
TEMPORARY: this queue feed will be removed when pgvector is decommissioned
and the watcher calls Stage 2 directly.
"""
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_2_queue (source, full_text, char_length)
VALUES (%s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
char_length = EXCLUDED.char_length,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text[:50000], len(full_text)))
pg.commit()
pg.close()
except Exception as e:
print(f" Stage 2 queue insert failed (non-fatal): {e}")
def ingest_file(filepath):
path = Path(filepath)
suffix = path.suffix.lower()
# Skip temp files
if path.name.startswith("~$") or path.name.startswith("."):
return 0
@@ -98,6 +121,7 @@ def ingest_file(filepath):
"folder": str(path.parent.relative_to(Path(sys.argv[1]) if len(sys.argv) > 1 else path.parent))
} for _ in chunks]
# STAGE 1: Write to pgvector (TEMPORARY — remove when chat agent migrates to Graphiti)
pg = get_pg()
cur = pg.cursor()
for chunk_id, chunk, embedding, meta in zip(ids, chunks, embeddings, metadatas):
@@ -111,12 +135,16 @@ def ingest_file(filepath):
metadata = EXCLUDED.metadata
""", (
chunk_id, chunk, embedding,
meta.get('source'), 'document', None,
meta.get("source"), "document", None,
json.dumps(meta)
))
pg.commit()
pg.close()
print(f" Indexed {len(chunks)} chunks: {path.name}")
# Enqueue for Stage 2 → Stage 3 (Graphiti pipeline)
enqueue_stage2(path.name, text)
return len(chunks)
except Exception as e:
+226
View File
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Stage 2 Worker — Taxonomy-Free Mistral Orientation
Polls stage_2_queue, runs Mistral taxonomy-free pass, enqueues Stage 3.
Runs as systemd service: aaronai-stage2.service
Routing:
- char_length < 2000 → skip Stage 3, mark complete (sparse content, cascade no benefit)
- char_length >= 2000 → enqueue Stage 3 with orientation metadata
"""
import os, json, time, subprocess, logging, requests
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [stage2] %(levelname)s %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("/var/log/aaronai/stage2.log", mode="a"),
]
)
log = logging.getLogger("stage2")
PG_DSN = os.getenv("PG_DSN")
OLLAMA_URL = "http://localhost:11434"
HEARTBEAT_FILE = Path("/var/log/aaronai/stage2-heartbeat")
CHAR_LENGTH_THRESHOLD = 2000
REQUEST_TIMEOUT = 300
RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5
WORKER_VERSION = "2.0"
TAXFREE_PROMPT = (
"You are a metadata extraction system. Given a document, describe its content "
"shape for use as orientation context in a knowledge graph extraction pass.\n\n"
"Do not summarize content. Do not extract entities. Do not assign a single category label.\n\n"
"Instead, describe:\n"
"- What domains or frames are active in this content (there may be several simultaneously)\n"
"- How those frames relate to each other in this specific document\n"
"- What kind of relational content a knowledge graph extractor should look for\n\n"
"Output JSON only. No prose, no explanation, no markdown.\n\n"
"Schema:\n"
'{"active_frames": ["<frame 1>", "<frame 2>"], '
'"frame_relationships": "<one sentence>", '
'"extraction_orientation": "<one sentence>", '
'"one_sentence_summary": "<one sentence>"}\n\n'
"Document:\n"
)
def get_pg():
return psycopg2.connect(PG_DSN)
def write_heartbeat():
try:
HEARTBEAT_FILE.parent.mkdir(parents=True, exist_ok=True)
HEARTBEAT_FILE.write_text(datetime.now().isoformat())
except Exception:
pass
def recover_wedge():
log.warning("Mistral wedge detected — restarting Ollama")
subprocess.run(["sudo", "systemctl", "restart", "ollama"], capture_output=True)
time.sleep(30)
for _ in range(3):
try:
r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
if r.status_code == 200:
log.info("Ollama recovered")
return True
except Exception:
time.sleep(5)
log.error("Ollama recovery failed")
return False
def run_mistral(doc_text):
payload = {
"model": "mistral:latest",
"prompt": TAXFREE_PROMPT + doc_text[:12000],
"stream": False,
"format": "json",
}
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
raw = resp.json().get("response", "{}")
try:
return json.loads(raw)
except Exception:
return {"error": "parse_failed", "raw": raw[:200]}
def build_orientation(meta):
frames = ", ".join(meta.get("active_frames", []))
rel = meta.get("frame_relationships", "")
orient = meta.get("extraction_orientation", "")
summary = meta.get("one_sentence_summary", "")
return f"Active frames: {frames}. Frame relationships: {rel} Extraction focus: {orient} Summary: {summary}"
def enqueue_stage3(pg, source, full_text, orientation, metadata):
cur = pg.cursor()
cur.execute("""
INSERT INTO stage_3_queue (source, full_text, orientation, stage2_metadata)
VALUES (%s, %s, %s, %s)
ON CONFLICT (source) DO UPDATE SET
full_text = EXCLUDED.full_text,
orientation = EXCLUDED.orientation,
stage2_metadata = EXCLUDED.stage2_metadata,
enqueued_at = NOW(),
completed_at = NULL,
failed_at = NULL,
attempts = 0
""", (source, full_text, orientation, json.dumps(metadata)))
pg.commit()
def process_one(row):
row_id, source, full_text, char_length = row
log.info(f"Processing: {source} ({char_length} chars)")
# Mark started
pg = get_pg()
cur = pg.cursor()
cur.execute("UPDATE stage_2_queue SET started_at = NOW(), attempts = attempts + 1 WHERE id = %s", (row_id,))
pg.commit()
# Routing: sparse content skips Stage 3
if char_length < CHAR_LENGTH_THRESHOLD:
log.info(f" Skipping Stage 3 (char_length={char_length} < {CHAR_LENGTH_THRESHOLD})")
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
return True
# Run Mistral
log.info(f" Running Mistral taxonomy-free pass...")
try:
meta = run_mistral(full_text)
except requests.exceptions.Timeout:
log.warning(f" Mistral timeout on {source}")
pg.close()
return False
except Exception as e:
log.error(f" Mistral error on {source}: {e}")
cur.execute("UPDATE stage_2_queue SET failed_at = NOW(), failure_reason = %s WHERE id = %s",
(str(e)[:500], row_id))
pg.commit()
pg.close()
return False
frames = meta.get("active_frames", [])
log.info(f" Frames: {frames}")
orientation = build_orientation(meta)
meta["_model"] = "mistral:latest"
meta["_worker_version"] = WORKER_VERSION
meta["_generated_at"] = datetime.now().isoformat()
meta["char_length"] = char_length
# Enqueue Stage 3
enqueue_stage3(pg, source, full_text, orientation, meta)
cur.execute("UPDATE stage_2_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
log.info(f" Enqueued Stage 3: {source}")
return True
def run():
log.info(f"Stage 2 worker starting (v{WORKER_VERSION})")
consecutive_failures = 0
while True:
write_heartbeat()
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT id, source, full_text, char_length
FROM stage_2_queue
WHERE completed_at IS NULL
AND failed_at IS NULL
AND (started_at IS NULL OR started_at < NOW() - INTERVAL '10 minutes')
AND attempts < %s
ORDER BY enqueued_at ASC
LIMIT 1
""", (RETRY_ATTEMPTS + 1,))
row = cur.fetchone()
pg.close()
if not row:
consecutive_failures = 0
time.sleep(POLL_INTERVAL)
continue
success = process_one(row)
if not success:
consecutive_failures += 1
if consecutive_failures >= 2:
log.warning("Multiple consecutive failures — checking for Mistral wedge")
recovered = recover_wedge()
if recovered:
consecutive_failures = 0
time.sleep(10)
else:
consecutive_failures = 0
time.sleep(1)
except Exception as e:
log.error(f"Worker loop error: {e}")
time.sleep(10)
if __name__ == "__main__":
run()
+298
View File
@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""
Tier 1 Graphiti Migration — pgvector to Graphiti for ~300 most-recent sources.
Resumable via state file at ~/aaronai/experiments/tier1_migration_state.json.
Usage:
python3 ~/aaronai/scripts/tier1_migration.py --dry-run
python3 ~/aaronai/scripts/tier1_migration.py
python3 ~/aaronai/scripts/tier1_migration.py --reset
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
import psycopg2
import requests
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
GRAPHITI_URL = "http://localhost:8001"
PG_DSN = os.environ["PG_DSN"]
MAX_SOURCES = 300
BATCH_SIZE = 4
BATCH_DELAY_S = 5
LONG_DOC_THRESHOLD = 5000
LONG_DOC_BATCH_SIZE = 2
EXPERIMENTS = Path.home() / "aaronai" / "experiments"
STATE_FILE = EXPERIMENTS / "tier1_migration_state.json"
RESULTS_FILE = EXPERIMENTS / "tier1_migration_results.json"
EXPERIMENTS.mkdir(parents=True, exist_ok=True)
def load_state():
if STATE_FILE.exists():
data = json.loads(STATE_FILE.read_text())
return set(data.get("ingested", [])), data.get("started_at"), data.get("total_cost_estimate", 0)
return set(), None, 0
def save_state(ingested, started_at, total_cost_estimate):
STATE_FILE.write_text(json.dumps({
"ingested": sorted(ingested),
"started_at": started_at,
"last_updated": time.strftime("%Y-%m-%dT%H:%M:%S"),
"total_cost_estimate": round(total_cost_estimate, 4),
"count": len(ingested),
}, indent=2))
def fetch_tier1_sources(cur, max_sources, exclude_set):
cur.execute("""
SELECT column_name FROM information_schema.columns
WHERE table_name = 'embeddings'
""")
columns = {r[0] for r in cur.fetchall()}
has_created_at = "created_at" in columns
if has_created_at:
order_clause = "MAX(created_at) DESC NULLS LAST"
else:
order_clause = "MAX(id) DESC"
cur.execute(f"""
SELECT source,
STRING_AGG(document, E'\n\n' ORDER BY id) AS full_doc,
{("MAX(created_at)" if has_created_at else "NULL")} AS most_recent
FROM embeddings
GROUP BY source
ORDER BY {order_clause}
LIMIT %s
""", (max_sources * 2,))
candidates = cur.fetchall()
selected = []
for source, doc, recent in candidates:
if not doc:
continue
if source in exclude_set:
continue
selected.append((source, doc, recent))
if len(selected) >= max_sources:
break
return selected
def submit_batch(batch):
payload = {
"episodes": [
{
"name": source,
"content": doc[:12000],
"source_description": "tier1_migration",
"timestamp": "2026-04-28T00:00:00",
}
for source, doc in batch
]
}
t0 = time.time()
try:
r = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=900)
elapsed = time.time() - t0
return {
"ok": r.ok,
"status_code": r.status_code,
"elapsed_s": round(elapsed, 2),
"error": None if r.ok else r.text[:500],
"sources": [s for s, _ in batch],
}
except Exception as e:
return {
"ok": False,
"status_code": None,
"elapsed_s": round(time.time() - t0, 2),
"error": str(e)[:500],
"sources": [s for s, _ in batch],
}
def chunk_for_batches(sources, base_batch_size, long_threshold, long_batch_size):
batch = []
current_size_target = base_batch_size
for source, doc, _ in sources:
is_long = len(doc) >= long_threshold
target_size = long_batch_size if is_long else base_batch_size
if batch and target_size != current_size_target:
yield batch
batch = []
current_size_target = target_size
batch.append((source, doc))
if len(batch) >= current_size_target:
yield batch
batch = []
if batch:
yield batch
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--max-sources", type=int, default=MAX_SOURCES)
parser.add_argument("--reset", action="store_true")
args = parser.parse_args()
if args.reset and STATE_FILE.exists():
confirm = input(f"Delete state file {STATE_FILE}? [y/N] ")
if confirm.lower() == "y":
STATE_FILE.unlink()
print("State file deleted. Resuming from scratch.")
else:
print("Aborted.")
return
print("=" * 70)
print("Tier 1 Graphiti Migration")
print("=" * 70)
try:
r = requests.get(f"{GRAPHITI_URL}/health", timeout=10)
if not r.ok:
print(f"ERROR: sidecar /health returned {r.status_code}")
return
print(f"Sidecar: {r.json()}")
except Exception as e:
print(f"ERROR: sidecar unreachable: {e}")
return
ingested, started_at, prior_cost = load_state()
if started_at:
print(f"Resuming run started at {started_at}")
print(f" {len(ingested)} sources already ingested")
print(f" Estimated cost so far: ${prior_cost:.2f}")
else:
started_at = time.strftime("%Y-%m-%dT%H:%M:%S")
print(f"Fresh run starting at {started_at}")
print()
print(f"Fetching tier 1 sources from pgvector (max={args.max_sources})...")
conn = psycopg2.connect(PG_DSN)
cur = conn.cursor()
sources = fetch_tier1_sources(cur, args.max_sources, ingested)
cur.close()
conn.close()
print(f" {len(sources)} sources to ingest (excluding {len(ingested)} already done)")
if not sources:
print()
print("Nothing to do — all tier 1 sources already ingested.")
return
short = sum(1 for _, d, _ in sources if len(d) < 1000)
medium = sum(1 for _, d, _ in sources if 1000 <= len(d) < 5000)
long_ = sum(1 for _, d, _ in sources if len(d) >= 5000)
print(f" Distribution: short={short} medium={medium} long={long_}")
print()
batches = list(chunk_for_batches(sources, BATCH_SIZE, LONG_DOC_THRESHOLD, LONG_DOC_BATCH_SIZE))
print(f" Will submit {len(batches)} batches (delay {BATCH_DELAY_S}s between)")
print()
if args.dry_run:
print("DRY RUN - first 10 batches:")
for i, batch in enumerate(batches[:10], 1):
print(f" [{i}] n={len(batch)} sources={[s[:40] for s, _ in batch]}")
print(f" ... and {max(0, len(batches) - 10)} more batches")
print()
print("Estimated cost: ${:.2f}".format(0.103 * sum(len(b) for b in batches)))
print("Estimated runtime: {:.1f} hours".format(
(sum(len(b) for b in batches) * 8 + len(batches) * BATCH_DELAY_S) / 3600
))
return
total_start = time.time()
batch_results = []
successful_episodes = 0
failed_episodes = 0
estimated_cost = prior_cost
for i, batch in enumerate(batches, 1):
avg_chars = int(sum(len(d) for _, d in batch) / len(batch))
bucket = "long" if avg_chars >= LONG_DOC_THRESHOLD else ("medium" if avg_chars >= 1000 else "short")
print(f"[{i:3d}/{len(batches)}] [{bucket:6s}] n={len(batch)} avg={avg_chars:6d}c", end=" ", flush=True)
result = submit_batch(batch)
batch_results.append(result)
if result["ok"]:
print(f" 200 {result['elapsed_s']}s")
for source, _ in batch:
ingested.add(source)
successful_episodes += len(batch)
estimated_cost += 0.103 * len(batch)
save_state(ingested, started_at, estimated_cost)
else:
err = (result["error"] or "")[:80]
print(f" FAIL: {err}")
failed_episodes += len(batch)
save_state(ingested, started_at, estimated_cost)
if "Max pending queries" in (result["error"] or ""):
print(f" FalkorDB queue overflow - pausing 30s")
time.sleep(30)
elif "timed out" in (result["error"] or "").lower():
print(f" Query timeout - pausing 15s")
time.sleep(15)
elif "rate" in (result["error"] or "").lower() or "429" in (result["error"] or ""):
print(f" Rate limited - pausing 60s")
time.sleep(60)
if i < len(batches):
time.sleep(BATCH_DELAY_S)
total_elapsed = time.time() - total_start
summary = {
"started_at": started_at,
"completed_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
"total_elapsed_s": round(total_elapsed, 1),
"total_elapsed_hours": round(total_elapsed / 3600, 2),
"n_batches": len(batches),
"successful_episodes": successful_episodes,
"failed_episodes": failed_episodes,
"total_ingested_now": len(ingested),
"estimated_total_cost": round(estimated_cost, 2),
"batch_results": batch_results,
}
RESULTS_FILE.write_text(json.dumps(summary, indent=2))
print()
print("=" * 70)
print("TIER 1 MIGRATION COMPLETE")
print("=" * 70)
print(f"Successful episodes: {successful_episodes}/{successful_episodes + failed_episodes}")
print(f"Failed episodes: {failed_episodes}")
print(f"Total ingested now: {len(ingested)}")
print(f"Wall-clock: {total_elapsed/3600:.2f} hours")
print(f"Estimated cost: ${estimated_cost:.2f}")
print()
print(f"State file: {STATE_FILE}")
print(f"Results file: {RESULTS_FILE}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
print("Interrupted. State saved. Re-run to resume.")
sys.exit(130)