Files
aaron 3f7fba7e0e scripts/: separate production from experimental and deprecated
Moves 28 experiment scripts to scripts/experiments/ (E1, E1.4, E1.6, E2,
base_class, cascade, cost_test, briefing, consistency, token series).
Moves 2 dissolved-layer scripts to scripts/deprecated/ (consolidator_v0_1.py,
tier1_migration.py — under the bespoke decision both target retired
substrate work).
Removes 19 .bak* files from disk (gitignored, never tracked; git history
is the durable record of every prior version).

The 11 production scripts remain in scripts/. All systemd ExecStart paths,
api.py subprocess calls, and cron jobs continue to resolve correctly —
verified by grep against /etc/systemd/system/aaronai-*.service, scripts/
references in api.py, and the user crontab.

Track 1 inventory cross-cutting finding: scripts/ mixed 11 production
files with 32 experimental scripts and ~20 .bak files. After this commit
a clean-room reader can identify the live workers from a directory listing
alone.

Found by Track 1 inventory 2026-05-02. See
~/aaronai/docs/scripts-reorg-plan-2026-05-02.md for full reasoning.

After commit, run:
1. git log --oneline -3 — show the new commit on top
2. git status — confirm clean working tree (modulo the docs/ untracked files which are intentional)
2026-05-02 23:28:24 +00:00

191 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""E1 orchestration — fetch source text, run Mistral metadata, submit to Graphiti test group_id."""
import json
import os
import requests
import subprocess
import time
from pathlib import Path
import psycopg2
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env")
EXPERIMENTS = Path.home() / "aaronai" / "experiments"
SAMPLE_FILE = EXPERIMENTS / "cascade_reextract_sample.json"
RESULTS_FILE = EXPERIMENTS / "cascade_reextract_results.json"
PG_DSN = os.environ["PG_DSN"]
SIDECAR_URL = "http://localhost:8001"
TEST_GROUP_ID = "aaron_cascade_test"
MAX_DOC_CHARS = 12000 # Same cap as Tier 1 for parity
# Stage 2 metadata prompt — verbatim from stage-2-worker-spec.md
METADATA_PROMPT = """You are a metadata extraction system. Given a document, produce structural and content metadata in strict JSON format.
Do not summarize the content beyond the one-sentence summary field. Do not extract entities or relationships. Do not interpret meaning. Produce only the metadata schema below.
Output JSON only. No prose, no explanation, no markdown code fences.
Schema:
{
"language": "<ISO 639-1 code>",
"char_length": <integer>,
"primary_format": "<prose|slides|code|structured|mixed>",
"structural_signals": {
"has_headings": <boolean>,
"has_bullet_lists": <boolean>,
"has_numbered_lists": <boolean>,
"has_tables": <boolean>,
"has_code_blocks": <boolean>,
"has_dates": <boolean>
},
"content_signals": {
"has_named_people": <boolean>,
"has_institutional_language": <boolean>,
"has_technical_terminology": <boolean>,
"has_first_person": <boolean>,
"has_quotations": <boolean>
},
"domain_class": "<technical|administrative|educational|personal|conversational>",
"one_sentence_summary": "<one sentence describing what the document is about>"
}
Document:
"""
def get_pg():
return psycopg2.connect(PG_DSN)
def fetch_source_text(source):
"""Reassemble the full document from pgvector chunks, mirroring tier1_migration.py logic."""
conn = get_pg()
cur = conn.cursor()
cur.execute("""
SELECT STRING_AGG(document, E'\n\n' ORDER BY id) AS full_doc
FROM embeddings WHERE source = %s
""", (source,))
row = cur.fetchone()
conn.close()
if row is None or row[0] is None:
return None
return row[0]
def run_mistral_metadata(text):
"""Call local Mistral via Ollama for base-class metadata."""
truncated = text[:MAX_DOC_CHARS]
prompt = METADATA_PROMPT + truncated
response = requests.post(
"http://localhost:11434/api/generate",
json={"model": "mistral:latest", "prompt": prompt, "stream": False, "format": "json"},
timeout=180,
)
response.raise_for_status()
raw = response.json()["response"]
try:
metadata = json.loads(raw)
# Override char_length with python-computed value (per stage-2-worker-spec)
metadata["char_length"] = len(truncated)
return metadata
except json.JSONDecodeError:
return {"error": "JSON parse failed", "raw": raw[:500]}
def format_metadata_as_orientation(metadata):
"""Format the base-class metadata as a source_description for Graphiti, with orient-not-bound framing."""
if "error" in metadata:
return f"tier1_cascade_test (metadata generation failed: {metadata['error']})"
summary = metadata.get("one_sentence_summary", "")
domain = metadata.get("domain_class", "unknown")
fmt = metadata.get("primary_format", "unknown")
return (
f"This is a {domain} document in {fmt} format. "
f"Summary: {summary} "
f"This metadata is provided to orient your extraction, not to constrain it. "
f"Extract entities and relationships freely from the document text itself; "
f"the metadata is descriptive context, not a checklist."
)
def submit_episode(name, content, source_description):
"""Submit episode to Graphiti sidecar at the test group_id."""
payload = {
"episodes": [{
"name": name,
"content": content[:MAX_DOC_CHARS],
"source_description": source_description,
"timestamp": "2026-04-28T00:00:00",
}],
"group_id": TEST_GROUP_ID,
}
response = requests.post(f"{SIDECAR_URL}/episodes/bulk", json=payload, timeout=300)
response.raise_for_status()
return response.json()
def main():
with open(SAMPLE_FILE) as f:
sample = json.load(f)
selected = sample["selected"]
print(f"E1 cascade re-extraction starting — {len(selected)} episodes to test group_id={TEST_GROUP_ID}\n")
results = []
for i, ep in enumerate(selected, 1):
name = ep["name"]
bucket = ep["bucket"]
print(f"[{i}/{len(selected)}] [{bucket}] {name}")
record = {"name": name, "bucket": bucket, "tier1_entities": ep["entities"]}
# Fetch text
print(f" Fetching source text...", end=" ", flush=True)
text = fetch_source_text(name)
if text is None:
print("FAILED — no chunks in pgvector")
record["error"] = "no source text"
results.append(record)
continue
record["doc_chars"] = len(text)
print(f"{len(text)} chars")
# Mistral metadata
print(f" Generating Mistral metadata...", end=" ", flush=True)
t0 = time.time()
metadata = run_mistral_metadata(text)
elapsed = time.time() - t0
record["metadata"] = metadata
record["metadata_elapsed_s"] = round(elapsed, 1)
if "error" in metadata:
print(f"FAILED in {elapsed:.1f}s")
else:
print(f"{elapsed:.1f}s — domain={metadata.get('domain_class')}, format={metadata.get('primary_format')}")
# Submit to Graphiti
source_desc = format_metadata_as_orientation(metadata)
record["source_description"] = source_desc
print(f" Submitting to Graphiti test group...", end=" ", flush=True)
t0 = time.time()
try:
result = submit_episode(name, text, source_desc)
elapsed = time.time() - t0
print(f"{elapsed:.1f}s — OK")
record["submit_elapsed_s"] = round(elapsed, 1)
record["submit_result"] = result
except Exception as e:
elapsed = time.time() - t0
print(f"{elapsed:.1f}s — FAILED: {e}")
record["submit_error"] = str(e)
results.append(record)
# Save intermediate state after each episode
with open(RESULTS_FILE, "w") as f:
json.dump({"results": results}, f, indent=2, default=str)
print()
print(f"\nDone. Results saved to {RESULTS_FILE}")
if __name__ == "__main__":
main()