7c7b649775
Writers now enforce type and created_at:
- encoding.py: ValueError raised at write_embeddings_batch if row dict lacks
'type'. created_at remains SQL-supplied (NOW() server-side). ON CONFLICT
DO UPDATE now also rewrites type=EXCLUDED.type and preserves the original
created_at via COALESCE(embeddings.created_at, EXCLUDED.created_at) — a
re-ingest re-classifies type but does not overwrite a backfilled mtime.
- ingest_conversations.py: same assertion. ON CONFLICT intentionally keeps
EXCLUDED.created_at semantics (Aaron-AI conversation created_at tracks
convo.updated_at; re-runs should refresh).
- Column-level NOT NULL is not added; application-layer raise gives a
faster, more debuggable failure than a Postgres constraint error.
Retrieval propagates type into chunks:
- retrieve() SELECT now includes type; chunk dicts carry "type": etype.
- WHERE clause built dynamically from excluded_sources and the new
--type-filter CLI arg (experimental, default None, pgvector retrieval
only — Graphiti chunks have no embeddings.type to filter on).
- retrieve_graphiti unchanged; its chunks lack the type field.
Manifests carry type_distribution per stage:
- dream_pipeline writes stage_data[<stage>]["type_distribution"] for nrem,
early_rem, late_rem — a Counter over chunk types, filtering None so
Graphiti chunks (when DREAMER_SUBSTRATE=graphiti) don't pollute the
distribution. Pgvector chunks always carry type post-backfill; if None
appears, the backfill or writer enforcement has regressed.
Verification:
B1 force re-ingest of "Finite and infinite games -- James Carse.pdf":
all 84 chunks preserved created_at=2026-04-27T06:11:55Z
B2 missing-type assertion raises ValueError, no row leaked to embeddings
B3 ast.parse(*) clean; EXPLAIN renders for {no excl/no filter,
type_filter only, excl 2 elems, excl 1 elem edge case, both};
all five plans use HNSW index scan with correct Filter clauses
C1 retrieve("nrem") returns 8 chunks each carrying "type" key
C2 type_distribution = {'document': 5, 'chatgpt_conversation': 3} —
2 distinct types, 62.5/37.5 split (looser bar: >=2 types,
no single type >=90%)
The type and created_at fields are now load-bearing: every dream manifest
emits type_distribution per stage. Reverting the backfill makes the
distribution show NULLs at every dream run.
166 lines
5.4 KiB
Python
166 lines
5.4 KiB
Python
"""
|
|
Nightly ingestion of Aaron AI conversations into pgvector.
|
|
Chunks by exchange (user + assistant together), deduplicates by conversation_id + index.
|
|
Only indexes conversations with more than 3 exchanges.
|
|
"""
|
|
import sqlite3
|
|
import psycopg2
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
|
|
|
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
|
|
PG_DSN = os.getenv("PG_DSN")
|
|
MIN_EXCHANGES = 3
|
|
|
|
print("Loading embedding model...")
|
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
|
|
|
def get_conversations():
|
|
conn = sqlite3.connect(CONVERSATIONS_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
SELECT c.id, c.title, c.created_at, c.updated_at,
|
|
COUNT(m.id) as message_count
|
|
FROM conversations c
|
|
JOIN messages m ON m.conversation_id = c.id
|
|
GROUP BY c.id
|
|
HAVING message_count >= ?
|
|
ORDER BY c.updated_at DESC
|
|
""", (MIN_EXCHANGES * 2,))
|
|
convos = c.fetchall()
|
|
conn.close()
|
|
return convos
|
|
|
|
def get_messages(conversation_id):
|
|
conn = sqlite3.connect(CONVERSATIONS_DB)
|
|
conn.row_factory = sqlite3.Row
|
|
c = conn.cursor()
|
|
c.execute("""
|
|
SELECT role, content, timestamp
|
|
FROM messages
|
|
WHERE conversation_id = ?
|
|
ORDER BY timestamp ASC
|
|
""", (conversation_id,))
|
|
msgs = c.fetchall()
|
|
conn.close()
|
|
return msgs
|
|
|
|
def chunk_conversation(convo, messages):
|
|
chunks = []
|
|
title = convo['title'] or 'Untitled'
|
|
conv_id = convo['id']
|
|
|
|
# Pair user + assistant exchanges
|
|
pairs = []
|
|
i = 0
|
|
msgs = list(messages)
|
|
while i < len(msgs):
|
|
if msgs[i]['role'] == 'user':
|
|
user_msg = msgs[i]['content']
|
|
asst_msg = msgs[i+1]['content'] if i+1 < len(msgs) and msgs[i+1]['role'] == 'assistant' else ''
|
|
pairs.append((user_msg, asst_msg))
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
# Sliding window of 2 exchanges
|
|
for idx in range(0, len(pairs), 2):
|
|
window = pairs[idx:idx+2]
|
|
text_parts = []
|
|
for user, asst in window:
|
|
if user:
|
|
text_parts.append(f"You: {user[:500]}")
|
|
if asst:
|
|
text_parts.append(f"Aaron AI: {asst[:500]}")
|
|
|
|
if not text_parts:
|
|
continue
|
|
|
|
chunk_text = f"[Aaron AI conversation: {title}]\n\n" + "\n\n".join(text_parts)
|
|
chunk_id = f"aaronai_conv_{conv_id}_{idx}"
|
|
chunks.append((chunk_id, chunk_text, {
|
|
"source": f"Aaron AI: {title}",
|
|
"type": "aaronai_conversation",
|
|
"created_at": convo['updated_at'],
|
|
}))
|
|
|
|
return chunks
|
|
|
|
def run():
|
|
pg = psycopg2.connect(PG_DSN)
|
|
cur = pg.cursor()
|
|
|
|
conversations = get_conversations()
|
|
print(f"Found {len(conversations)} conversations with {MIN_EXCHANGES}+ exchanges")
|
|
|
|
total_added = 0
|
|
total_skipped = 0
|
|
|
|
for convo in conversations:
|
|
messages = get_messages(convo['id'])
|
|
chunks = chunk_conversation(convo, messages)
|
|
|
|
if not chunks:
|
|
continue
|
|
|
|
ids = [c[0] for c in chunks]
|
|
|
|
# Check which already exist
|
|
cur.execute("SELECT id FROM embeddings WHERE id = ANY(%s)", (ids,))
|
|
existing = set(r[0] for r in cur.fetchall())
|
|
new_chunks = [c for c in chunks if c[0] not in existing]
|
|
|
|
if not new_chunks:
|
|
total_skipped += len(chunks)
|
|
continue
|
|
|
|
# Embed and insert
|
|
texts = [c[1] for c in new_chunks]
|
|
embeddings = embedder.encode(texts, show_progress_bar=False).tolist()
|
|
|
|
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
|
|
if not meta.get("type"):
|
|
raise ValueError(
|
|
f"chunk {chunk_id!r} missing 'type'; writers must supply it "
|
|
f"(see Improvement #2 in docs/birdai-component-inventory)"
|
|
)
|
|
# ON CONFLICT below intentionally overwrites created_at (unlike encoding.py's
|
|
# COALESCE): an Aaron-AI conversation's created_at tracks convo.updated_at,
|
|
# which advances on activity. Re-running this script on an active conv
|
|
# should refresh the timestamp, not preserve the first-seen one.
|
|
cur.execute("""
|
|
INSERT INTO embeddings (id, document, embedding, source, type, created_at, metadata)
|
|
VALUES (%s, %s, %s::vector, %s, %s, %s, %s)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
document = EXCLUDED.document,
|
|
embedding = EXCLUDED.embedding,
|
|
source = EXCLUDED.source,
|
|
type = EXCLUDED.type,
|
|
created_at = EXCLUDED.created_at
|
|
""", (
|
|
chunk_id, chunk_text, embedding,
|
|
meta['source'], meta['type'], meta['created_at'],
|
|
json.dumps(meta)
|
|
))
|
|
|
|
pg.commit()
|
|
total_added += len(new_chunks)
|
|
total_skipped += len(existing)
|
|
|
|
cur.execute("SELECT COUNT(*) FROM embeddings")
|
|
total = cur.fetchone()[0]
|
|
pg.close()
|
|
|
|
print(f"Done: {total_added} chunks added, {total_skipped} already indexed")
|
|
print(f"Corpus total: {total} chunks")
|
|
|
|
if __name__ == "__main__":
|
|
run()
|