plan B: search_facts chat tool + orientation indexer (read-only Graphiti)

After establishing that single-episode Graphiti writes take ~20 min against
the existing graph (the dedup loop is structurally slow regardless of the
patches, the bridge, or the LLM model), the salvage plan is to stop trying
to write to Graphiti and instead:

  1. Use the existing 4,300-entity graph as a read-only fact layer at chat
     time via a new search_facts tool. Graphiti's /search endpoint is fast
     (~15ms direct, ~400ms over HTTP); the graph is stale-as-of-early-May
     but covers most biographical / relational content that "write me a bio"
     and similar queries care about.

  2. Pipe Stage 2's document-level orientations into pgvector via a new
     orientation_indexer worker. Stage 2 already runs and writes orientation
     text to stage_3_queue for every Mistral-processed document; the worker
     reads those, embeds them, and writes one row per source to embeddings
     with metadata->>'kind'='orientation'. retrieve_documents now ranks
     against both chunk text and document-level concept summaries.

Idempotent: the indexer's "is this already indexed" check is an EXISTS
subquery against embeddings, so restarts and partial runs are safe.

Out of scope (deliberately): no Graphiti writes from chat, no Stage 2 ->
Graphiti bridge, no draining the 711-item stage_3_queue backlog into
Graphiti. Rich-extraction posture stays a BirdAI concern.
This commit is contained in:
2026-05-20 05:00:03 +00:00
parent 313c0f0341
commit e96bf40b2f
2 changed files with 221 additions and 1 deletions
+85 -1
View File
@@ -141,6 +141,19 @@ consulting" not "my work." Results are unfiltered and ranked by
semantic similarity; judge each chunk for relevance and ignore semantic similarity; judge each chunk for relevance and ignore
irrelevant hits rather than forcing them into the answer. irrelevant hits rather than forcing them into the answer.
You also have a search_facts tool that queries a knowledge graph of
atomic facts about Aaron's entities and their relationships. The graph
was populated through early May 2026 and is not currently being
updated; treat it as a *historical* layer that holds biographical
content (career, projects, consulting), exhibition records, key
people, dossier-era claims, and time-stamped facts with explicit
validity windows. For biographical or relational questions ("write
me a bio", "what's the FWN3D / HVAMC relationship", "who did I
consult for at IBM"), call search_facts *in addition to*
retrieve_documents — the two return complementary shapes (atomic
facts vs. document passages). For current-state questions, the
persistent memory file is more authoritative than the graph.
When Aaron asks for a document file — bio, cover letter, statement, When Aaron asks for a document file — bio, cover letter, statement,
CV section, anything he wants to send or edit outside chat — produce CV section, anything he wants to send or edit outside chat — produce
the full text as your chat reply first. NEVER call save_document on the full text as your chat reply first. NEVER call save_document on
@@ -440,6 +453,70 @@ DRAFTS_WEBDAV = f"{NEXTCLOUD_URL}/remote.php/dav/files/{NEXTCLOUD_USER}/Drafts"
_FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]") _FILENAME_SAFE_RE = re.compile(r"[^A-Za-z0-9_\-\. ]")
GRAPHITI_URL = os.getenv("GRAPHITI_URL", "http://localhost:8001")
GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "aaron")
SEARCH_FACTS_TOOL = {
"name": "search_facts",
"description": (
"Search Aaron's knowledge graph for atomic facts about entities and "
"their relationships. The graph holds time-stamped facts captured up "
"to early May 2026 — biographical content (career, projects, "
"consulting), exhibition history, key relationships, dossier-era "
"claims. Returns short sentence-shaped facts with valid_at / "
"invalid_at timestamps so you can distinguish current state from "
"superseded history. Useful for: bios, 'who did I consult for', "
"'what's the relationship between X and Y', any question shaped like "
"a relational lookup. Complements retrieve_documents (which returns "
"longer chunk passages). Call this *in addition to* retrieve_documents "
"for biographical or relational questions — the two return "
"different shapes of evidence. The graph hasn't been updated since "
"early May 2026; for current-state questions, the persistent memory "
"file or recent documents are more authoritative."
),
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The fact-shaped query. Concrete entity names work best.",
},
},
"required": ["query"],
},
}
def _execute_search_facts(tool_input):
"""Hit Graphiti /search, format the results as text for Claude."""
query = (tool_input or {}).get("query", "").strip()
if not query:
return "No query provided."
try:
r = requests.get(
f"{GRAPHITI_URL}/search",
params={"query": query, "limit": 8, "group_id": GRAPHITI_GROUP_ID},
timeout=15,
)
except requests.RequestException as e:
return f"search_facts: Graphiti unreachable ({e})."
if r.status_code != 200:
return f"search_facts: Graphiti returned {r.status_code}."
results = r.json().get("results", [])
if not results:
return f"No facts found for {query!r}."
lines = []
for i, f in enumerate(results, 1):
fact = f.get("fact", "").strip()
valid_at = f.get("valid_at") or "?"
invalid_at = f.get("invalid_at")
validity = (f"valid {valid_at}" + (f" → superseded {invalid_at}"
if invalid_at and invalid_at != "None" else ""))
lines.append(f"[{i}] {fact} ({validity})")
return "\n".join(lines)
SAVE_DOCUMENT_TOOL = { SAVE_DOCUMENT_TOOL = {
"name": "save_document", "name": "save_document",
"description": ( "description": (
@@ -633,7 +710,7 @@ def chat(user_message, conversation_id, settings, client_time=None):
messages = history + [{"role": "user", "content": full_message}] messages = history + [{"role": "user", "content": full_message}]
tools = [RETRIEVE_DOCUMENTS_TOOL, SAVE_DOCUMENT_TOOL] tools = [RETRIEVE_DOCUMENTS_TOOL, SEARCH_FACTS_TOOL, SAVE_DOCUMENT_TOOL]
if settings.get("web_search", True): if settings.get("web_search", True):
tools.append({"type": "web_search_20250305", "name": "web_search"}) tools.append({"type": "web_search_20250305", "name": "web_search"})
@@ -672,6 +749,13 @@ def chat(user_message, conversation_id, settings, client_time=None):
"tool_use_id": block.id, "tool_use_id": block.id,
"content": result_text, "content": result_text,
}) })
elif block.name == "search_facts":
result_text = _execute_search_facts(block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result_text,
})
elif block.name == "save_document": elif block.name == "save_document":
result_text = _execute_save_document(block.input) result_text = _execute_save_document(block.input)
tool_results.append({ tool_results.append({
+136
View File
@@ -0,0 +1,136 @@
"""
Orientation Indexer — feeds Stage 2's document-level orientations into pgvector
so they're searchable alongside chunk text by the retrieve_documents tool.
Each completed row in stage_3_queue has an `orientation` string (active_frames
+ frame_relationships + extraction_orientation + one_sentence_summary) that
describes the document at a conceptual level. Indexing it as its own row in
the embeddings table gives the cross-encoder a second surface to rank against
"what is this document about" rather than just "what does this chunk say."
This worker is part of the "read-only Graphiti + orientation-into-pgvector"
plan B that replaced the Stage 3 → Graphiti write path. The graph layer is
queried directly via the search_facts chat tool; orientations land here.
State tracking: a row is considered indexed if the embeddings table already
holds a row with source=<source> and metadata->>'kind'='orientation'. The
worker is idempotent — restart-safe, resumable.
Runs as systemd: aaronai-orientation-indexer.service
"""
import logging
import os
import sys
import time
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
from sentence_transformers import SentenceTransformer
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
sys.path.insert(0, str(Path(__file__).parent))
from encoding import write_embeddings_batch
PG_DSN = os.getenv("PG_DSN")
EMBED_MODEL = "all-MiniLM-L6-v2"
BATCH_SIZE = 25
POLL_INTERVAL_SECS = 30
LOG_FILE = "/var/log/aaronai/orientation-indexer.log"
HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE, mode="a")],
)
log = logging.getLogger("orientation-indexer")
def get_pg():
return psycopg2.connect(PG_DSN)
def fetch_unindexed(cur, limit):
"""Pull stage_3_queue rows with a non-null orientation whose orientation
hasn't been written to the embeddings table yet."""
cur.execute(
"""
SELECT s.source, s.orientation
FROM stage_3_queue s
WHERE s.orientation IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM embeddings e
WHERE e.source = s.source
AND e.metadata->>'kind' = 'orientation'
)
ORDER BY s.enqueued_at
LIMIT %s
""",
(limit,),
)
return cur.fetchall()
def _row_for(source: str, orientation: str, embedding) -> dict:
"""Build an embeddings row for the orientation. id is deterministic so
re-runs don't create duplicates if the unique check above ever races."""
import hashlib
chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient"
return {
"id": chunk_id,
"document": orientation,
"embedding": embedding,
"source": source,
"type": "document",
"metadata": {
"source": source,
"kind": "orientation",
},
}
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def main():
log.info("Orientation indexer starting...")
log.info(f"Loading embedding model: {EMBED_MODEL}")
embedder = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
while True:
write_heartbeat()
try:
pg = get_pg()
try:
cur = pg.cursor()
rows = fetch_unindexed(cur, BATCH_SIZE)
if not rows:
pg.close()
time.sleep(POLL_INTERVAL_SECS)
continue
orientations = [r[1] for r in rows]
embeddings = embedder.encode(orientations).tolist()
batch = [
_row_for(source, orient, emb)
for (source, orient), emb in zip(rows, embeddings)
]
write_embeddings_batch(pg, batch)
log.info(f"Indexed {len(batch)} orientation(s)")
finally:
pg.close()
except Exception as e:
log.error(f"Indexing loop iteration failed: {e}")
time.sleep(POLL_INTERVAL_SECS)
if __name__ == "__main__":
main()