Files
aaronAI/scripts/orientation_indexer.py
aaron e96bf40b2f plan B: search_facts chat tool + orientation indexer (read-only Graphiti)
After establishing that single-episode Graphiti writes take ~20 min against
the existing graph (the dedup loop is structurally slow regardless of the
patches, the bridge, or the LLM model), the salvage plan is to stop trying
to write to Graphiti and instead:

  1. Use the existing 4,300-entity graph as a read-only fact layer at chat
     time via a new search_facts tool. Graphiti's /search endpoint is fast
     (~15ms direct, ~400ms over HTTP); the graph is stale-as-of-early-May
     but covers most biographical / relational content that "write me a bio"
     and similar queries care about.

  2. Pipe Stage 2's document-level orientations into pgvector via a new
     orientation_indexer worker. Stage 2 already runs and writes orientation
     text to stage_3_queue for every Mistral-processed document; the worker
     reads those, embeds them, and writes one row per source to embeddings
     with metadata->>'kind'='orientation'. retrieve_documents now ranks
     against both chunk text and document-level concept summaries.

Idempotent: the indexer's "is this already indexed" check is an EXISTS
subquery against embeddings, so restarts and partial runs are safe.

Out of scope (deliberately): no Graphiti writes from chat, no Stage 2 ->
Graphiti bridge, no draining the 711-item stage_3_queue backlog into
Graphiti. Rich-extraction posture stays a BirdAI concern.
2026-05-20 05:00:03 +00:00

137 lines
4.2 KiB
Python

"""
Orientation Indexer — feeds Stage 2's document-level orientations into pgvector
so they're searchable alongside chunk text by the retrieve_documents tool.
Each completed row in stage_3_queue has an `orientation` string (active_frames
+ frame_relationships + extraction_orientation + one_sentence_summary) that
describes the document at a conceptual level. Indexing it as its own row in
the embeddings table gives the cross-encoder a second surface to rank against
"what is this document about" rather than just "what does this chunk say."
This worker is part of the "read-only Graphiti + orientation-into-pgvector"
plan B that replaced the Stage 3 → Graphiti write path. The graph layer is
queried directly via the search_facts chat tool; orientations land here.
State tracking: a row is considered indexed if the embeddings table already
holds a row with source=<source> and metadata->>'kind'='orientation'. The
worker is idempotent — restart-safe, resumable.
Runs as systemd: aaronai-orientation-indexer.service
"""
import logging
import os
import sys
import time
from pathlib import Path
from dotenv import load_dotenv
import psycopg2
from sentence_transformers import SentenceTransformer
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
sys.path.insert(0, str(Path(__file__).parent))
from encoding import write_embeddings_batch
PG_DSN = os.getenv("PG_DSN")
EMBED_MODEL = "all-MiniLM-L6-v2"
BATCH_SIZE = 25
POLL_INTERVAL_SECS = 30
LOG_FILE = "/var/log/aaronai/orientation-indexer.log"
HEARTBEAT_FILE = "/var/log/aaronai/orientation-indexer-heartbeat"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [orientation-indexer] %(levelname)s %(message)s",
handlers=[logging.FileHandler(LOG_FILE, mode="a")],
)
log = logging.getLogger("orientation-indexer")
def get_pg():
return psycopg2.connect(PG_DSN)
def fetch_unindexed(cur, limit):
"""Pull stage_3_queue rows with a non-null orientation whose orientation
hasn't been written to the embeddings table yet."""
cur.execute(
"""
SELECT s.source, s.orientation
FROM stage_3_queue s
WHERE s.orientation IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM embeddings e
WHERE e.source = s.source
AND e.metadata->>'kind' = 'orientation'
)
ORDER BY s.enqueued_at
LIMIT %s
""",
(limit,),
)
return cur.fetchall()
def _row_for(source: str, orientation: str, embedding) -> dict:
"""Build an embeddings row for the orientation. id is deterministic so
re-runs don't create duplicates if the unique check above ever races."""
import hashlib
chunk_id = hashlib.md5(f"orientation:{source}".encode()).hexdigest()[:8] + "_orient"
return {
"id": chunk_id,
"document": orientation,
"embedding": embedding,
"source": source,
"type": "document",
"metadata": {
"source": source,
"kind": "orientation",
},
}
def write_heartbeat():
try:
Path(HEARTBEAT_FILE).write_text(str(time.time()))
except Exception:
pass
def main():
log.info("Orientation indexer starting...")
log.info(f"Loading embedding model: {EMBED_MODEL}")
embedder = SentenceTransformer(EMBED_MODEL)
log.info("Embedding model ready.")
while True:
write_heartbeat()
try:
pg = get_pg()
try:
cur = pg.cursor()
rows = fetch_unindexed(cur, BATCH_SIZE)
if not rows:
pg.close()
time.sleep(POLL_INTERVAL_SECS)
continue
orientations = [r[1] for r in rows]
embeddings = embedder.encode(orientations).tolist()
batch = [
_row_for(source, orient, emb)
for (source, orient), emb in zip(rows, embeddings)
]
write_embeddings_batch(pg, batch)
log.info(f"Indexed {len(batch)} orientation(s)")
finally:
pg.close()
except Exception as e:
log.error(f"Indexing loop iteration failed: {e}")
time.sleep(POLL_INTERVAL_SECS)
if __name__ == "__main__":
main()