Add Claude conversation export ingestion script
This commit is contained in:
@@ -0,0 +1,135 @@
|
|||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
import chromadb
|
||||||
|
|
||||||
|
# Paths
|
||||||
|
db_path = str(Path.home() / "aaronai" / "db")
|
||||||
|
EXPORT_DIR = "/home/aaron/nextcloud/data/data/aaron/files/Archive/Misc/Claude Export"
|
||||||
|
|
||||||
|
print("Loading embedding model...")
|
||||||
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||||
|
client = chromadb.PersistentClient(path=db_path)
|
||||||
|
collection = client.get_or_create_collection(
|
||||||
|
name="aaronai",
|
||||||
|
metadata={"hnsw:space": "cosine"}
|
||||||
|
)
|
||||||
|
|
||||||
|
def extract_messages(convo):
|
||||||
|
"""Extract messages from a Claude conversation object."""
|
||||||
|
messages = []
|
||||||
|
for msg in convo.get("chat_messages", []):
|
||||||
|
role = msg.get("sender", "")
|
||||||
|
if role not in ["human", "assistant"]:
|
||||||
|
continue
|
||||||
|
# Claude export stores content as a list of content blocks
|
||||||
|
content = msg.get("content", [])
|
||||||
|
text = ""
|
||||||
|
if isinstance(content, str):
|
||||||
|
text = content
|
||||||
|
elif isinstance(content, list):
|
||||||
|
for block in content:
|
||||||
|
if isinstance(block, dict) and block.get("type") == "text":
|
||||||
|
text += block.get("text", "")
|
||||||
|
elif isinstance(block, str):
|
||||||
|
text += block
|
||||||
|
text = text.strip()
|
||||||
|
if not text:
|
||||||
|
continue
|
||||||
|
created_at = msg.get("created_at", "")
|
||||||
|
messages.append((created_at, role, text))
|
||||||
|
return messages
|
||||||
|
|
||||||
|
def chunk_conversation(convo):
|
||||||
|
"""Turn a conversation into indexable chunks."""
|
||||||
|
chunks = []
|
||||||
|
title = convo.get("name", "Untitled conversation")
|
||||||
|
uuid = convo.get("uuid", "")
|
||||||
|
created_at = convo.get("created_at", "")
|
||||||
|
messages = extract_messages(convo)
|
||||||
|
if not messages:
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
# Chunk into sliding windows of 3 messages
|
||||||
|
window = []
|
||||||
|
for i, (ts, role, text) in enumerate(messages):
|
||||||
|
label = "You" if role == "human" else "Claude"
|
||||||
|
window.append(f"{label}: {text}")
|
||||||
|
if len(window) >= 3 or i == len(messages) - 1:
|
||||||
|
chunk_text = f"[Claude conversation: {title}]\n\n" + "\n\n".join(window)
|
||||||
|
chunk_id = f"claude_{uuid}_{i}"
|
||||||
|
chunks.append((chunk_id, chunk_text, {
|
||||||
|
"source": f"Claude: {title}",
|
||||||
|
"type": "claude_conversation",
|
||||||
|
"created_at": created_at,
|
||||||
|
}))
|
||||||
|
window = window[-1:] # overlap by 1
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
def ingest_file(jsonl_path):
|
||||||
|
print(f"Processing {jsonl_path.name}...")
|
||||||
|
conversations = []
|
||||||
|
with open(jsonl_path, encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
conversations.append(json.loads(line))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f"Found {len(conversations)} conversations")
|
||||||
|
total_chunks = 0
|
||||||
|
skipped = 0
|
||||||
|
|
||||||
|
for convo in conversations:
|
||||||
|
chunks = chunk_conversation(convo)
|
||||||
|
if not chunks:
|
||||||
|
skipped += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
ids = [c[0] for c in chunks]
|
||||||
|
texts = [c[1] for c in chunks]
|
||||||
|
metas = [c[2] for c in chunks]
|
||||||
|
|
||||||
|
# Check existing
|
||||||
|
existing = collection.get(ids=ids)
|
||||||
|
existing_ids = set(existing["ids"])
|
||||||
|
new = [(id, txt, meta) for id, txt, meta in zip(ids, texts, metas) if id not in existing_ids]
|
||||||
|
|
||||||
|
if not new:
|
||||||
|
continue
|
||||||
|
|
||||||
|
embeddings = embedder.encode([n[1] for n in new]).tolist()
|
||||||
|
collection.add(
|
||||||
|
ids=[n[0] for n in new],
|
||||||
|
documents=[n[1] for n in new],
|
||||||
|
metadatas=[n[2] for n in new],
|
||||||
|
embeddings=embeddings,
|
||||||
|
)
|
||||||
|
total_chunks += len(new)
|
||||||
|
|
||||||
|
print(f"Done. {total_chunks} chunks added, {skipped} conversations skipped.")
|
||||||
|
return total_chunks
|
||||||
|
|
||||||
|
# Find the export file
|
||||||
|
export_dir = Path(EXPORT_DIR)
|
||||||
|
export_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
jsonl_files = list(export_dir.glob("*.jsonl")) + list(export_dir.glob("**/*.jsonl"))
|
||||||
|
|
||||||
|
if not jsonl_files:
|
||||||
|
print(f"No .jsonl files found in {EXPORT_DIR}")
|
||||||
|
print("Place your Claude export conversations.jsonl file there and run again.")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
total = 0
|
||||||
|
for f in jsonl_files:
|
||||||
|
total += ingest_file(f)
|
||||||
|
|
||||||
|
print(f"\nTotal chunks added to corpus: {total}")
|
||||||
|
print(f"Database at: {db_path}")
|
||||||
Reference in New Issue
Block a user