Files
aaronAI/scripts/e1_run_cascade_corrected.py
T

182 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""E1 corrected re-run — cascade orientation passed via custom_extraction_instructions."""
import json
import os
import requests
import time
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
EXPERIMENTS = Path.home() / "aaronai" / "experiments"
SAMPLE_FILE = EXPERIMENTS / "cascade_reextract_sample.json"
RESULTS_FILE = EXPERIMENTS / "cascade_reextract_results.json"
PG_DSN = os.environ["PG_DSN"]
SIDECAR_URL = "http://localhost:8001"
TEST_GROUP_ID = "aaron_cascade_test"
MAX_DOC_CHARS = 12000
METADATA_PROMPT = """You are a metadata extraction system. Given a document, produce structural and content metadata in strict JSON format.
Do not summarize the content beyond the one-sentence summary field. Do not extract entities or relationships. Do not interpret meaning. Produce only the metadata schema below.
Output JSON only. No prose, no explanation, no markdown code fences.
Schema:
{
"language": "<ISO 639-1 code>",
"char_length": <integer>,
"primary_format": "<prose|slides|code|structured|mixed>",
"structural_signals": {
"has_headings": <boolean>,
"has_bullet_lists": <boolean>,
"has_numbered_lists": <boolean>,
"has_tables": <boolean>,
"has_code_blocks": <boolean>,
"has_dates": <boolean>
},
"content_signals": {
"has_named_people": <boolean>,
"has_institutional_language": <boolean>,
"has_technical_terminology": <boolean>,
"has_first_person": <boolean>,
"has_quotations": <boolean>
},
"domain_class": "<technical|administrative|educational|personal|conversational>",
"one_sentence_summary": "<one sentence describing what the document is about>"
}
Document:
"""
def get_pg():
return psycopg2.connect(PG_DSN)
def fetch_source_text(source):
conn = get_pg()
cur = conn.cursor()
cur.execute("""
SELECT STRING_AGG(document, E'\n\n' ORDER BY id) AS full_doc
FROM embeddings WHERE source = %s
""", (source,))
row = cur.fetchone()
conn.close()
if row is None or row[0] is None:
return None
return row[0]
def run_mistral_metadata(text):
truncated = text[:MAX_DOC_CHARS]
prompt = METADATA_PROMPT + truncated
response = requests.post(
"http://localhost:11434/api/generate",
json={"model": "mistral:latest", "prompt": prompt, "stream": False, "format": "json"},
timeout=180,
)
response.raise_for_status()
raw = response.json()["response"]
try:
metadata = json.loads(raw)
metadata["char_length"] = len(truncated)
return metadata
except json.JSONDecodeError:
return {"error": "JSON parse failed", "raw": raw[:500]}
def format_metadata_as_orientation(metadata):
"""Format metadata as orient-not-bound extraction instructions."""
if "error" in metadata:
return None
summary = metadata.get("one_sentence_summary", "")
domain = metadata.get("domain_class", "unknown")
fmt = metadata.get("primary_format", "unknown")
return (
f"This is a {domain} document in {fmt} format. "
f"Summary: {summary} "
f"This metadata is provided to orient your extraction, not to constrain it. "
f"Extract entities and relationships freely from the document text itself; "
f"the metadata is descriptive context, not a checklist."
)
def submit_episode_singular(name, content, custom_instructions):
"""Submit episode to Graphiti's singular /episodes endpoint with cascade orientation."""
payload = {
"name": name,
"content": content[:MAX_DOC_CHARS],
"source_description": "e1_corrected_run", # neutral label, not the cascade text
"timestamp": "2026-04-28T00:00:00",
"group_id": TEST_GROUP_ID,
"custom_extraction_instructions": custom_instructions,
}
response = requests.post(f"{SIDECAR_URL}/episodes", json=payload, timeout=300)
response.raise_for_status()
return response.json()
def main():
with open(SAMPLE_FILE) as f:
sample = json.load(f)
selected = sample["selected"]
print(f"E1 CORRECTED re-run — {len(selected)} episodes via /episodes (singular)")
print(f"Cascade orientation passed in custom_extraction_instructions.\n")
results = []
for i, ep in enumerate(selected, 1):
name = ep["name"]
bucket = ep["bucket"]
print(f"[{i}/{len(selected)}] [{bucket}] {name}")
record = {"name": name, "bucket": bucket, "tier1_entities": ep["entities"]}
print(f" Fetching source text...", end=" ", flush=True)
text = fetch_source_text(name)
if text is None:
print("FAILED — no chunks in pgvector")
record["error"] = "no source text"
results.append(record)
continue
record["doc_chars"] = len(text)
print(f"{len(text)} chars")
print(f" Generating Mistral metadata...", end=" ", flush=True)
t0 = time.time()
metadata = run_mistral_metadata(text)
elapsed = time.time() - t0
record["metadata"] = metadata
record["metadata_elapsed_s"] = round(elapsed, 1)
if "error" in metadata:
print(f"FAILED in {elapsed:.1f}s")
else:
print(f"{elapsed:.1f}s — domain={metadata.get('domain_class')}, format={metadata.get('primary_format')}")
custom_instructions = format_metadata_as_orientation(metadata)
record["custom_extraction_instructions"] = custom_instructions
print(f" Submitting via /episodes (singular) with custom_extraction_instructions...", end=" ", flush=True)
t0 = time.time()
try:
result = submit_episode_singular(name, text, custom_instructions)
elapsed = time.time() - t0
print(f"{elapsed:.1f}s — OK")
record["submit_elapsed_s"] = round(elapsed, 1)
record["submit_result"] = result
except Exception as e:
elapsed = time.time() - t0
print(f"{elapsed:.1f}s — FAILED: {e}")
record["submit_error"] = str(e)
results.append(record)
with open(RESULTS_FILE, "w") as f:
json.dump({"results": results}, f, indent=2, default=str)
print()
print(f"\nDone. Results saved to {RESULTS_FILE}")
if __name__ == "__main__":
main()