diff --git a/scripts/api.py b/scripts/api.py index b882503..5f90909 100644 --- a/scripts/api.py +++ b/scripts/api.py @@ -141,6 +141,19 @@ consulting" not "my work." Results are unfiltered and ranked by semantic similarity; judge each chunk for relevance and ignore irrelevant hits rather than forcing them into the answer. +You also have a search_facts tool that queries a knowledge graph of +atomic facts about Aaron's entities and their relationships. The graph +was populated through early May 2026 and is not currently being +updated; treat it as a *historical* layer that holds biographical +content (career, projects, consulting), exhibition records, key +people, dossier-era claims, and time-stamped facts with explicit +validity windows. For biographical or relational questions ("write +me a bio", "what's the FWN3D / HVAMC relationship", "who did I +consult for at IBM"), call search_facts *in addition to* +retrieve_documents — the two return complementary shapes (atomic +facts vs. document passages). For current-state questions, the +persistent memory file is more authoritative than the graph. + When Aaron asks for a document file — bio, cover letter, statement, CV section, anything he wants to send or edit outside chat — produce the full text as your chat reply first. NEVER call save_document on @@ -440,6 +453,70 @@ DRAFTS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Drafts" _FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]") +GRAPHITI_URL = os.getenv("GRAPHITI_URL", "http://localhost:8001") +GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron") + + +SEARCH_FACTS_TOOL = { + "name": "search_facts", + "description": ( + "Search Aaron's knowledge graph for atomic facts about entities and " + "their relationships. The graph holds time-stamped facts captured up " + "to early May 2026 — biographical content (career, projects, " + "consulting), exhibition history, key relationships, dossier-era " + "claims. Returns short sentence-shaped facts with valid_at / " + "invalid_at timestamps so you can distinguish current state from " + "superseded history. Useful for: bios, 'who did I consult for', " + "'what's the relationship between X and Y', any question shaped like " + "a relational lookup. Complements retrieve_documents (which returns " + "longer chunk passages). Call this *in addition to* retrieve_documents " + "for biographical or relational questions — the two return " + "different shapes of evidence. The graph hasn't been updated since " + "early May 2026; for current-state questions, the persistent memory " + "file or recent documents are more authoritative." + ), + "input_schema": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The fact-shaped query. Concrete entity names work best.", + }, + }, + "required": ["query"], + }, +} + + +def _execute_search_facts(tool_input): + """Hit Graphiti /search, format the results as text for Claude.""" + query = (tool_input or {}).get("query", "").strip() + if not query: + return "No query provided." + try: + r = requests.get( + f"{GRAPHITI_URL}/search", + params={"query": query, "limit": 8, "group_id": GRAPHITI_GROUP_ID}, + timeout=15, + ) + except requests.RequestException as e: + return f"search_facts: Graphiti unreachable ({e})." + if r.status_code != 200: + return f"search_facts: Graphiti returned {r.status_code}." + results = r.json().get("results", []) + if not results: + return f"No facts found for {query!r}." + lines = [] + for i, f in enumerate(results, 1): + fact = f.get("fact", "").strip() + valid_at = f.get("valid_at") or "?" + invalid_at = f.get("invalid_at") + validity = (f"valid {valid_at}" + (f" → superseded {invalid_at}" + if invalid_at and invalid_at != "None" else "")) + lines.append(f"[{i}] {fact} ({validity})") + return "\n".join(lines) + + SAVE_DOCUMENT_TOOL = { "name": "save_document", "description": ( @@ -633,7 +710,7 @@ def chat(user_message, conversation_id, settings, client_time=None): messages = history + [{"role": "user", "content": full_message}] - tools = [RETRIEVE_DOCUMENTS_TOOL, SAVE_DOCUMENT_TOOL] + tools = [RETRIEVE_DOCUMENTS_TOOL, SEARCH_FACTS_TOOL, SAVE_DOCUMENT_TOOL] if settings.get("web_search", True): tools.append({"type": "web_search_20250305", "name": "web_search"}) @@ -672,6 +749,13 @@ def chat(user_message, conversation_id, settings, client_time=None): "tool_use_id": block.id, "content": result_text, }) + elif block.name == "search_facts": + result_text = _execute_search_facts(block.input) + tool_results.append({ + "type": "tool_result", + "tool_use_id": block.id, + "content": result_text, + }) elif block.name == "save_document": result_text = _execute_save_document(block.input) tool_results.append({ diff --git a/scripts/orientation_indexer.py b/scripts/orientation_indexer.py new file mode 100644 index 0000000..25cd93a --- /dev/null +++ b/scripts/orientation_indexer.py @@ -0,0 +1,136 @@ +""" +Orientation Indexer — feeds Stage 2's document-level orientations into pgvector +so they're searchable alongside chunk text by the retrieve_documents tool. + +Each completed row in stage_3_queue has an `orientation` string (active_frames ++ frame_relationships + extraction_orientation + one_sentence_summary) that +describes the document at a conceptual level. Indexing it as its own row in +the embeddings table gives the cross-encoder a second surface to rank against +— "what is this document about" rather than just "what does this chunk say." + +This worker is part of the "read-only Graphiti + orientation-into-pgvector" +plan B that replaced the Stage 3 → Graphiti write path. The graph layer is +queried directly via the search_facts chat tool; orientations land here. + +State tracking: a row is considered indexed if the embeddings table already +holds a row with source= and metadata->>'kind'='orientation'. The +worker is idempotent — restart-safe, resumable. + +Runs as systemd: aaronai-orientation-indexer.service +""" + +import logging +import os +import sys +import time +from pathlib import Path + +from dotenv import load_dotenv +import psycopg2 +from sentence_transformers import SentenceTransformer + +load_dotenv(Path.home() / "aaronai" / ".env", override=True) + +sys.path.insert(0, str(Path(__file__).parent)) +from encoding import write_embeddings_batch + +PG_DSN = os.getenv("PG_DSN") +EMBED_MODEL = "all-MiniLM-L6-v2" +BATCH_SIZE = 25 +POLL_INTERVAL_SECS = 30 +LOG_FILE = "/var/log/aaronai/orientation-indexer.log" +HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat" + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s", + handlers=[logging.FileHandler(LOG_FILE, mode="a")], +) +log = logging.getLogger("orientation-indexer") + + +def get_pg(): + return psycopg2.connect(PG_DSN) + + +def fetch_unindexed(cur, limit): + """Pull stage_3_queue rows with a non-null orientation whose orientation + hasn't been written to the embeddings table yet.""" + cur.execute( + """ + SELECT s.source, s.orientation + FROM stage_3_queue s + WHERE s.orientation IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM embeddings e + WHERE e.source = s.source + AND e.metadata->>'kind' = 'orientation' + ) + ORDER BY s.enqueued_at + LIMIT %s + """, + (limit,), + ) + return cur.fetchall() + + +def _row_for(source: str, orientation: str, embedding) -> dict: + """Build an embeddings row for the orientation. id is deterministic so + re-runs don't create duplicates if the unique check above ever races.""" + import hashlib + chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient" + return { + "id": chunk_id, + "document": orientation, + "embedding": embedding, + "source": source, + "type": "document", + "metadata": { + "source": source, + "kind": "orientation", + }, + } + + +def write_heartbeat(): + try: + Path(HEARTBEAT_FILE).write_text(str(time.time())) + except Exception: + pass + + +def main(): + log.info("Orientation indexer starting...") + log.info(f"Loading embedding model: {EMBED_MODEL}") + embedder = SentenceTransformer(EMBED_MODEL) + log.info("Embedding model ready.") + + while True: + write_heartbeat() + try: + pg = get_pg() + try: + cur = pg.cursor() + rows = fetch_unindexed(cur, BATCH_SIZE) + if not rows: + pg.close() + time.sleep(POLL_INTERVAL_SECS) + continue + + orientations = [r[1] for r in rows] + embeddings = embedder.encode(orientations).tolist() + batch = [ + _row_for(source, orient, emb) + for (source, orient), emb in zip(rows, embeddings) + ] + write_embeddings_batch(pg, batch) + log.info(f"Indexed {len(batch)} orientation(s)") + finally: + pg.close() + except Exception as e: + log.error(f"Indexing loop iteration failed: {e}") + time.sleep(POLL_INTERVAL_SECS) + + +if __name__ == "__main__": + main()