From e96bf40b2f4d676da02140b8fa3ac8c3c296876e Mon Sep 17 00:00:00 2001 From: Aaron Nelson Date: Wed, 20 May 2026 05:00:03 +0000 Subject: [PATCH] plan B: search_facts chat tool + orientation indexer (read-only Graphiti) After establishing that single-episode Graphiti writes take ~20 min against the existing graph (the dedup loop is structurally slow regardless of the patches, the bridge, or the LLM model), the salvage plan is to stop trying to write to Graphiti and instead: 1. Use the existing 4,300-entity graph as a read-only fact layer at chat time via a new search_facts tool. Graphiti's /search endpoint is fast (~15ms direct, ~400ms over HTTP); the graph is stale-as-of-early-May but covers most biographical / relational content that "write me a bio" and similar queries care about. 2. Pipe Stage 2's document-level orientations into pgvector via a new orientation_indexer worker. Stage 2 already runs and writes orientation text to stage_3_queue for every Mistral-processed document; the worker reads those, embeds them, and writes one row per source to embeddings with metadata->>'kind'='orientation'. retrieve_documents now ranks against both chunk text and document-level concept summaries. Idempotent: the indexer's "is this already indexed" check is an EXISTS subquery against embeddings, so restarts and partial runs are safe. Out of scope (deliberately): no Graphiti writes from chat, no Stage 2 -> Graphiti bridge, no draining the 711-item stage_3_queue backlog into Graphiti. Rich-extraction posture stays a BirdAI concern. --- scripts/api.py | 86 ++++++++++++++++++++- scripts/orientation_indexer.py | 136 +++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 scripts/orientation_indexer.py diff --git a/scripts/api.py b/scripts/api.py index b882503..5f90909 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -141,6 +141,19 @@ consulting" not "my work." Results are unfiltered and ranked by semantic similarity; judge each chunk for relevance and ignore irrelevant hits rather than forcing them into the answer. +You also have a search_facts tool that queries a knowledge graph of +atomic facts about Aaron's entities and their relationships. The graph +was populated through early May 2026 and is not currently being +updated; treat it as a *historical* layer that holds biographical +content (career, projects, consulting), exhibition records, key +people, dossier-era claims, and time-stamped facts with explicit +validity windows. For biographical or relational questions ("write +me a bio", "what's the FWN3D / HVAMC relationship", "who did I +consult for at IBM"), call search_facts *in addition to* +retrieve_documents — the two return complementary shapes (atomic +facts vs. document passages). For current-state questions, the +persistent memory file is more authoritative than the graph. + When Aaron asks for a document file — bio, cover letter, statement, CV section, anything he wants to send or edit outside chat — produce the full text as your chat reply first. NEVER call save_document on @@ -440,6 +453,70 @@ DRAFTS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Drafts" _FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]") +GRAPHITI_URL = os.getenv("GRAPHITI_URL", "http://localhost:8001") +GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron") + + +SEARCH_FACTS_TOOL = { + "name": "search_facts", + "description": ( + "Search Aaron's knowledge graph for atomic facts about entities and " + "their relationships. The graph holds time-stamped facts captured up " + "to early May 2026 — biographical content (career, projects, " + "consulting), exhibition history, key relationships, dossier-era " + "claims. Returns short sentence-shaped facts with valid_at / " + "invalid_at timestamps so you can distinguish current state from " + "superseded history. Useful for: bios, 'who did I consult for', " + "'what's the relationship between X and Y', any question shaped like " + "a relational lookup. Complements retrieve_documents (which returns " + "longer chunk passages). Call this *in addition to* retrieve_documents " + "for biographical or relational questions — the two return " + "different shapes of evidence. The graph hasn't been updated since " + "early May 2026; for current-state questions, the persistent memory " + "file or recent documents are more authoritative." + ), + "input_schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The fact-shaped query. Concrete entity names work best.", + }, + }, + "required": ["query"], + }, +} + + +def _execute_search_facts(tool_input): + """Hit Graphiti /search, format the results as text for Claude.""" + query = (tool_input or {}).get("query", "").strip() + if not query: + return "No query provided." + try: + r = requests.get( + f"{GRAPHITI_URL}/search", + params={"query": query, "limit": 8, "group_id": GRAPHITI_GROUP_ID}, + timeout=15, + ) + except requests.RequestException as e: + return f"search_facts: Graphiti unreachable ({e})." + if r.status_code != 200: + return f"search_facts: Graphiti returned {r.status_code}." + results = r.json().get("results", []) + if not results: + return f"No facts found for {query!r}." + lines = [] + for i, f in enumerate(results, 1): + fact = f.get("fact", "").strip() + valid_at = f.get("valid_at") or "?" + invalid_at = f.get("invalid_at") + validity = (f"valid {valid_at}" + (f" → superseded {invalid_at}" + if invalid_at and invalid_at != "None" else "")) + lines.append(f"[{i}] {fact} ({validity})") + return "\n".join(lines) + + SAVE_DOCUMENT_TOOL = { "name": "save_document", "description": ( @@ -633,7 +710,7 @@ def chat(user_message, conversation_id, settings, client_time=None): messages = history + [{"role": "user", "content": full_message}] - tools = [RETRIEVE_DOCUMENTS_TOOL, SAVE_DOCUMENT_TOOL] + tools = [RETRIEVE_DOCUMENTS_TOOL, SEARCH_FACTS_TOOL, SAVE_DOCUMENT_TOOL] if settings.get("web_search", True): tools.append({"type": "web_search_20250305", "name": "web_search"}) @@ -672,6 +749,13 @@ def chat(user_message, conversation_id, settings, client_time=None): "tool_use_id": block.id, "content": result_text, }) + elif block.name == "search_facts": + result_text = _execute_search_facts(block.input) + tool_results.append({ + "type": "tool_result", + "tool_use_id": block.id, + "content": result_text, + }) elif block.name == "save_document": result_text = _execute_save_document(block.input) tool_results.append({ diff --git a/scripts/orientation_indexer.py b/scripts/orientation_indexer.py new file mode 100644 index 0000000..25cd93a --- /dev/null +++ b/scripts/orientation_indexer.py @@ -0,0 +1,136 @@ +""" +Orientation Indexer — feeds Stage 2's document-level orientations into pgvector +so they're searchable alongside chunk text by the retrieve_documents tool. + +Each completed row in stage_3_queue has an `orientation` string (active_frames ++ frame_relationships + extraction_orientation + one_sentence_summary) that +describes the document at a conceptual level. Indexing it as its own row in +the embeddings table gives the cross-encoder a second surface to rank against +— "what is this document about" rather than just "what does this chunk say." + +This worker is part of the "read-only Graphiti + orientation-into-pgvector" +plan B that replaced the Stage 3 → Graphiti write path. The graph layer is +queried directly via the search_facts chat tool; orientations land here. + +State tracking: a row is considered indexed if the embeddings table already +holds a row with source= and metadata->>'kind'='orientation'. The +worker is idempotent — restart-safe, resumable. + +Runs as systemd: aaronai-orientation-indexer.service +""" + +import logging +import os +import sys +import time +from pathlib import Path + +from dotenv import load_dotenv +import psycopg2 +from sentence_transformers import SentenceTransformer + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +sys.path.insert(0, str(Path(__file__).parent)) +from encoding import write_embeddings_batch + +PG_DSN = os.getenv("PG_DSN") +EMBED_MODEL = "all-MiniLM-L6-v2" +BATCH_SIZE = 25 +POLL_INTERVAL_SECS = 30 +LOG_FILE = "/var/log/aaronai/orientation-indexer.log" +HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat" + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s", + handlers=[logging.FileHandler(LOG_FILE, mode="a")], +) +log = logging.getLogger("orientation-indexer") + + +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def fetch_unindexed(cur, limit): + """Pull stage_3_queue rows with a non-null orientation whose orientation + hasn't been written to the embeddings table yet.""" + cur.execute( + """ + SELECT s.source, s.orientation + FROM stage_3_queue s + WHERE s.orientation IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM embeddings e + WHERE e.source = s.source + AND e.metadata->>'kind' = 'orientation' + ) + ORDER BY s.enqueued_at + LIMIT %s + """, + (limit,), + ) + return cur.fetchall() + + +def _row_for(source: str, orientation: str, embedding) -> dict: + """Build an embeddings row for the orientation. id is deterministic so + re-runs don't create duplicates if the unique check above ever races.""" + import hashlib + chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient" + return { + "id": chunk_id, + "document": orientation, + "embedding": embedding, + "source": source, + "type": "document", + "metadata": { + "source": source, + "kind": "orientation", + }, + } + + +def write_heartbeat(): + try: + Path(HEARTBEAT_FILE).write_text(str(time.time())) + except Exception: + pass + + +def main(): + log.info("Orientation indexer starting...") + log.info(f"Loading embedding model: {EMBED_MODEL}") + embedder = SentenceTransformer(EMBED_MODEL) + log.info("Embedding model ready.") + + while True: + write_heartbeat() + try: + pg = get_pg() + try: + cur = pg.cursor() + rows = fetch_unindexed(cur, BATCH_SIZE) + if not rows: + pg.close() + time.sleep(POLL_INTERVAL_SECS) + continue + + orientations = [r[1] for r in rows] + embeddings = embedder.encode(orientations).tolist() + batch = [ + _row_for(source, orient, emb) + for (source, orient), emb in zip(rows, embeddings) + ] + write_embeddings_batch(pg, batch) + log.info(f"Indexed {len(batch)} orientation(s)") + finally: + pg.close() + except Exception as e: + log.error(f"Indexing loop iteration failed: {e}") + time.sleep(POLL_INTERVAL_SECS) + + +if __name__ == "__main__": + main()