fix: max_coroutines=2, saga support in sidecar; stage3 chunking; TIMEOUT_MAX 0 persistent in falkordb compose

This commit is contained in:
2026-04-30 04:01:02 +00:00
parent 95d022ec64
commit 62b5b5453a
2 changed files with 184 additions and 0 deletions
+3
View File
@@ -65,6 +65,7 @@ async def lifespan(app: FastAPI):
embedder=SentenceTransformerEmbedder(),
cross_encoder=BGERerankerClient(),
graph_driver=FalkorDriver(host=FALKORDB_HOST, port=FALKORDB_PORT, database="aaron"),
max_coroutines=2,
)
await graphiti_instance.build_indices_and_constraints()
log.info(f"Graphiti ready — provider: {LLM_PROVIDER}, group: {GROUP_ID}")
@@ -83,6 +84,7 @@ class BulkEpisodeItem(BaseModel):
class BulkEpisodeRequest(BaseModel):
episodes: list[BulkEpisodeItem]
group_id: str | None = None
saga: str | None = None
class EpisodeRequest(BaseModel):
@@ -136,6 +138,7 @@ async def add_episodes_bulk(req: BulkEpisodeRequest):
result = await g.add_episode_bulk(
bulk_episodes=raw_episodes,
group_id=req.group_id or GROUP_ID,
saga=req.saga or None,
)
return {"ok": True, "count": len(raw_episodes)}
except Exception as e:
+181
View File
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Stage 3 Worker — Graphiti Ingest with Taxonomy-Free Orientation
Polls stage_3_queue, chunks documents, ingests as episodic saga to Graphiti.
Chunking rationale: Large documents sent as single episodes cause FalkorDB
write lock contention during entity deduplication. Chunking at ~500 words
(matching Stage 1) produces smaller deduplication passes that don't block.
Each document's chunks are linked via Graphiti's saga mechanism, preserving
document structure in the graph.
Runs as systemd service: aaronai-stage3.service
"""
import os, json, time, logging, requests
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
import psycopg2
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [stage3] %(levelname)s %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler("/var/log/aaronai/stage3.log", mode="a"),
]
)
log = logging.getLogger("stage3")
PG_DSN = os.getenv("PG_DSN")
GRAPHITI_URL = "http://localhost:8001"
HEARTBEAT_FILE = Path("/var/log/aaronai/stage3-heartbeat")
RETRY_ATTEMPTS = 2
POLL_INTERVAL = 5
INGEST_TIMEOUT = 600
WORKER_VERSION = "2.0"
# Match Stage 1 chunking parameters
CHUNK_SIZE_WORDS = 500
CHUNK_OVERLAP_WORDS = 50
# Documents under this threshold ingested as single episode (no chunking overhead)
SINGLE_EPISODE_THRESHOLD = 1500
def get_pg():
return psycopg2.connect(PG_DSN)
def write_heartbeat():
try:
HEARTBEAT_FILE.parent.mkdir(parents=True, exist_ok=True)
HEARTBEAT_FILE.write_text(datetime.now().isoformat())
except Exception:
pass
def chunk_text(text, chunk_size=CHUNK_SIZE_WORDS, overlap=CHUNK_OVERLAP_WORDS):
"""Split text into word-based chunks matching Stage 1 chunking."""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
if chunk.strip():
chunks.append(chunk)
start += chunk_size - overlap
return chunks
def ingest_to_graphiti(source, full_text, orientation):
"""
Ingest document to Graphiti as chunked episodes linked by saga.
Short documents (<1500 chars) are ingested as a single episode.
Long documents are chunked at 500 words (matching Stage 1) and
ingested as a bulk batch with saga=source linking them together.
"""
char_length = len(full_text)
if char_length < SINGLE_EPISODE_THRESHOLD:
# Single episode — short enough that deduplication won't block
episodes = [{
"name": source,
"content": full_text,
"source_description": orientation,
"timestamp": datetime.now().isoformat(),
}]
log.info(f" Single episode ({char_length} chars)")
payload = {"episodes": episodes, "group_id": "aaron"}
else:
# Chunk document — each chunk becomes a separate episode
chunks = chunk_text(full_text)
episodes = [
{
"name": f"{source} [{i+1}/{len(chunks)}]",
"content": chunk,
"source_description": orientation,
"timestamp": datetime.now().isoformat(),
}
for i, chunk in enumerate(chunks)
]
log.info(f" Chunked into {len(chunks)} episodes ({char_length} chars)")
# saga=source links all chunks into a document unit in the graph
payload = {"episodes": episodes, "group_id": "aaron", "saga": source}
resp = requests.post(f"{GRAPHITI_URL}/episodes/bulk", json=payload, timeout=INGEST_TIMEOUT)
resp.raise_for_status()
return resp.json()
def process_one(row):
row_id, source, full_text, orientation = row
log.info(f"Ingesting to Graphiti: {source}")
pg = get_pg()
cur = pg.cursor()
cur.execute(
"UPDATE stage_3_queue SET started_at = NOW(), attempts = attempts + 1 WHERE id = %s",
(row_id,)
)
pg.commit()
try:
result = ingest_to_graphiti(source, full_text, orientation)
cur.execute("UPDATE stage_3_queue SET completed_at = NOW() WHERE id = %s", (row_id,))
pg.commit()
pg.close()
log.info(f" Committed to Graphiti: {source}")
return True
except Exception as e:
log.error(f" Graphiti ingest failed for {source}: {e}")
cur.execute("""
UPDATE stage_3_queue
SET failed_at = NOW(), failure_reason = %s
WHERE id = %s
""", (str(e)[:500], row_id))
pg.commit()
pg.close()
return False
def run():
log.info(f"Stage 3 worker starting (v{WORKER_VERSION})")
while True:
write_heartbeat()
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT id, source, full_text, orientation
FROM stage_3_queue
WHERE completed_at IS NULL
AND failed_at IS NULL
AND (started_at IS NULL OR started_at < NOW() - INTERVAL '10 minutes')
AND attempts < %s
ORDER BY enqueued_at ASC
LIMIT 1
""", (RETRY_ATTEMPTS + 1,))
row = cur.fetchone()
pg.close()
if not row:
time.sleep(POLL_INTERVAL)
continue
process_one(row)
time.sleep(2)
except Exception as e:
log.error(f"Worker loop error: {e}")
time.sleep(10)
if __name__ == "__main__":
run()