Compare commits
6 Commits
e38d283e59
...
8d560f9f5e
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d560f9f5e | |||
| 732e450d21 | |||
| 63c58b5bb3 | |||
| 6c2af55e7e | |||
| 5b4a299414 | |||
| b09e35892c |
@@ -8,6 +8,7 @@ dreamer_state.json
|
|||||||
corpus_integrity_report.json
|
corpus_integrity_report.json
|
||||||
watcher_state.json
|
watcher_state.json
|
||||||
watcher_status.json
|
watcher_status.json
|
||||||
|
reindex_status.json
|
||||||
|
|
||||||
# Logs (these belong in /var/log/)
|
# Logs (these belong in /var/log/)
|
||||||
*.log
|
*.log
|
||||||
|
|||||||
+214
-49
@@ -1,12 +1,13 @@
|
|||||||
import os
|
import os
|
||||||
|
import re
|
||||||
import json
|
import json
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import subprocess
|
import subprocess
|
||||||
import hashlib
|
import hashlib
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from datetime import datetime
|
from datetime import datetime, timedelta
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from sentence_transformers import SentenceTransformer
|
from sentence_transformers import SentenceTransformer, CrossEncoder
|
||||||
import anthropic
|
import anthropic
|
||||||
from fastapi import FastAPI, Request, Response, Depends, HTTPException, BackgroundTasks
|
from fastapi import FastAPI, Request, Response, Depends, HTTPException, BackgroundTasks
|
||||||
import psycopg2
|
import psycopg2
|
||||||
@@ -91,6 +92,7 @@ if HAS_WHISPER:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Whisper not available: {e}")
|
print(f"Whisper not available: {e}")
|
||||||
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||||
|
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
|
||||||
# ChromaDB removed — using pgvector
|
# ChromaDB removed — using pgvector
|
||||||
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
|
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
|
||||||
|
|
||||||
@@ -132,6 +134,7 @@ When making factual claims about Aaron — his history, credentials, locations,
|
|||||||
# Auth configuration
|
# Auth configuration
|
||||||
import os
|
import os
|
||||||
SESSION_PASSWORD = os.getenv("AARON_AI_PASSWORD", "changeme")
|
SESSION_PASSWORD = os.getenv("AARON_AI_PASSWORD", "changeme")
|
||||||
|
SESSION_MAX_AGE_SECONDS = 60 * 60 * 24 * 365
|
||||||
SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db")
|
SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db")
|
||||||
|
|
||||||
def _init_sessions():
|
def _init_sessions():
|
||||||
@@ -163,7 +166,10 @@ def delete_session(token: str):
|
|||||||
|
|
||||||
def session_exists(token: str) -> bool:
|
def session_exists(token: str) -> bool:
|
||||||
conn = _connect_sessions()
|
conn = _connect_sessions()
|
||||||
row = conn.execute("SELECT 1 FROM sessions WHERE token = ?", (token,)).fetchone()
|
cutoff = (datetime.now() - timedelta(seconds=SESSION_MAX_AGE_SECONDS)).isoformat()
|
||||||
|
conn.execute("DELETE FROM sessions WHERE created_at < ?", (cutoff,))
|
||||||
|
conn.commit()
|
||||||
|
row = conn.execute("SELECT 1 FROM sessions WHERE token = ? AND created_at >= ?", (token, cutoff)).fetchone()
|
||||||
conn.close()
|
conn.close()
|
||||||
return row is not None
|
return row is not None
|
||||||
|
|
||||||
@@ -239,30 +245,140 @@ def remove_from_memory(item):
|
|||||||
save_memory("\n".join(filtered))
|
save_memory("\n".join(filtered))
|
||||||
return len(lines) - len(filtered)
|
return len(lines) - len(filtered)
|
||||||
|
|
||||||
def retrieve_context(query, n_results=8):
|
HYBRID_CANDIDATES = 30
|
||||||
"""Pure semantic retrieval over pgvector. Top-N by cosine similarity, threshold 0.3.
|
RRF_K = 60
|
||||||
No CV pinning, no keyword routing — see architecture doc substrate-dependency section.
|
FINAL_LIMIT = 8
|
||||||
Substrate-level workarounds (entity-keyed routing, hybrid retrieval) live at the
|
|
||||||
Graphiti layer, not as wrapper logic above pgvector."""
|
_TSQUERY_SANITIZE_RE = re.compile(r"[^\w\s\"'-]")
|
||||||
|
|
||||||
|
CONVERSATION_TYPES = ["chatgpt_conversation", "claude_conversation", "aaronai_conversation"]
|
||||||
|
DOCUMENT_TYPES = ["document"]
|
||||||
|
MEMORY_TYPES = ["claude_memory"]
|
||||||
|
|
||||||
|
_CONVO_SIGNALS = (
|
||||||
|
"what did i tell", "what did we discuss", "what did we talk",
|
||||||
|
"in our conversation", "you mentioned", "we talked about",
|
||||||
|
"earlier you said", "earlier i said", "did i tell you",
|
||||||
|
"did i say", "what did chatgpt", "what did claude",
|
||||||
|
)
|
||||||
|
_DOC_SIGNALS = (
|
||||||
|
"write me a bio", "draft a bio", "my bio", "my cv", "my resume",
|
||||||
|
"my professional", "my work history", "my exhibitions",
|
||||||
|
"my publications", "my syllabi", "my courses", "my teaching",
|
||||||
|
"my philosophy", "about my career", "draft a cover letter",
|
||||||
|
"draft my", "write a bio", "professional bio",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _websearch_query(text: str) -> str:
|
||||||
|
"""Strip characters websearch_to_tsquery doesn't handle cleanly. Quoted
|
||||||
|
phrases and 'or' are preserved by the function itself."""
|
||||||
|
return _TSQUERY_SANITIZE_RE.sub(" ", text).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def classify_retrieval_intent(query: str):
|
||||||
|
"""Return a list of `type` values to filter retrieval on, or None for all types.
|
||||||
|
|
||||||
|
Implementation is a low-effort keyword classifier — explicitly tunable and
|
||||||
|
swappable. For more nuanced routing, replace this with an LLM classifier call
|
||||||
|
that returns the same shape: a list of valid type strings or None.
|
||||||
|
|
||||||
|
Precedence: conversation signals win over document signals — a question like
|
||||||
|
"what did I tell you about my CV" is asking about the conversation, not the CV.
|
||||||
|
"""
|
||||||
|
q = query.lower()
|
||||||
|
if any(s in q for s in _CONVO_SIGNALS):
|
||||||
|
return CONVERSATION_TYPES
|
||||||
|
if any(s in q for s in _DOC_SIGNALS):
|
||||||
|
return DOCUMENT_TYPES
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _rerank(query: str, candidates: list[tuple]) -> list[tuple]:
|
||||||
|
"""Cross-encoder rerank. Candidates are (id, document, source) tuples.
|
||||||
|
Returns the same tuples reordered by reranker score (highest first)."""
|
||||||
|
if not candidates:
|
||||||
|
return []
|
||||||
|
pairs = [(query, row[1]) for row in candidates]
|
||||||
|
scores = reranker.predict(pairs)
|
||||||
|
return [row for row, _ in sorted(zip(candidates, scores),
|
||||||
|
key=lambda x: x[1], reverse=True)]
|
||||||
|
|
||||||
|
|
||||||
|
def retrieve_context(query, n_results=FINAL_LIMIT, type_filter=None):
|
||||||
|
"""Hybrid retrieval (dense + lexical, RRF fused) followed by cross-encoder rerank.
|
||||||
|
|
||||||
|
- Dense (pgvector) handles paraphrase / semantic similarity.
|
||||||
|
- Lexical (tsvector) catches rare named tokens (FWN3D, Sono-Tek, course codes)
|
||||||
|
the embedding model has no signal for.
|
||||||
|
- RRF combines the two rankings without calibrating score scales.
|
||||||
|
- Cross-encoder rerank scores each (query, chunk) pair jointly, bridging
|
||||||
|
semantic gaps that bi-encoders can't (e.g., "write me a bio" -> CV chunk).
|
||||||
|
|
||||||
|
type_filter: optional list of `type` values to restrict the candidate pool to.
|
||||||
|
If None, retrieves from all types. Use classify_retrieval_intent() to derive."""
|
||||||
query_embedding = embedder.encode([query]).tolist()[0]
|
query_embedding = embedder.encode([query]).tolist()[0]
|
||||||
|
ts_query = _websearch_query(query)
|
||||||
|
|
||||||
context_pieces = []
|
context_pieces = []
|
||||||
sources = []
|
sources = []
|
||||||
|
|
||||||
|
where_sql = ""
|
||||||
|
type_param = ()
|
||||||
|
if type_filter:
|
||||||
|
where_sql = "WHERE type = ANY(%s)"
|
||||||
|
type_param = (list(type_filter),)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pg = get_pg()
|
pg = get_pg()
|
||||||
cur = pg.cursor()
|
cur = pg.cursor()
|
||||||
cur.execute("""
|
|
||||||
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
|
cur.execute(f"""
|
||||||
|
SELECT id, document, source
|
||||||
FROM embeddings
|
FROM embeddings
|
||||||
|
{where_sql}
|
||||||
ORDER BY embedding <=> %s::vector
|
ORDER BY embedding <=> %s::vector
|
||||||
LIMIT %s
|
LIMIT %s
|
||||||
""", (query_embedding, query_embedding, n_results))
|
""", (*type_param, query_embedding, HYBRID_CANDIDATES))
|
||||||
for doc, source, similarity in cur.fetchall():
|
dense_hits = cur.fetchall()
|
||||||
if similarity > 0.3:
|
|
||||||
context_pieces.append(doc)
|
lexical_hits = []
|
||||||
sources.append(source or "unknown")
|
if ts_query:
|
||||||
|
lex_where = "to_tsvector('english', document) @@ websearch_to_tsquery('english', %s)"
|
||||||
|
full_where = (f"WHERE {lex_where} AND type = ANY(%s)"
|
||||||
|
if type_filter else f"WHERE {lex_where}")
|
||||||
|
lex_params = ((ts_query, list(type_filter)) if type_filter else (ts_query,))
|
||||||
|
cur.execute(f"""
|
||||||
|
SELECT id, document, source
|
||||||
|
FROM embeddings
|
||||||
|
{full_where}
|
||||||
|
ORDER BY ts_rank(to_tsvector('english', document),
|
||||||
|
websearch_to_tsquery('english', %s)) DESC
|
||||||
|
LIMIT %s
|
||||||
|
""", (*lex_params, ts_query, HYBRID_CANDIDATES))
|
||||||
|
lexical_hits = cur.fetchall()
|
||||||
|
|
||||||
pg.close()
|
pg.close()
|
||||||
|
|
||||||
|
scores = {}
|
||||||
|
rows_by_id = {}
|
||||||
|
for rank, row in enumerate(dense_hits):
|
||||||
|
scores[row[0]] = scores.get(row[0], 0) + 1.0 / (RRF_K + rank + 1)
|
||||||
|
rows_by_id[row[0]] = row
|
||||||
|
for rank, row in enumerate(lexical_hits):
|
||||||
|
scores[row[0]] = scores.get(row[0], 0) + 1.0 / (RRF_K + rank + 1)
|
||||||
|
rows_by_id[row[0]] = row
|
||||||
|
|
||||||
|
rrf_ranked = sorted(scores.items(), key=lambda kv: kv[1], reverse=True)
|
||||||
|
candidates = [rows_by_id[doc_id] for doc_id, _ in rrf_ranked]
|
||||||
|
|
||||||
|
for _id, doc, source in _rerank(query, candidates)[:n_results]:
|
||||||
|
context_pieces.append(doc)
|
||||||
|
sources.append(source or "unknown")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"pgvector retrieval error: {e}")
|
print(f"hybrid retrieval error: {e}")
|
||||||
|
|
||||||
return context_pieces, sources
|
return context_pieces, sources
|
||||||
|
|
||||||
def get_conversation_history(conversation_id, limit=20):
|
def get_conversation_history(conversation_id, limit=20):
|
||||||
@@ -302,7 +418,8 @@ def create_conversation(title="New conversation"):
|
|||||||
|
|
||||||
def chat(user_message, conversation_id, settings, client_time=None):
|
def chat(user_message, conversation_id, settings, client_time=None):
|
||||||
memory = load_memory()
|
memory = load_memory()
|
||||||
context_pieces, sources = retrieve_context(user_message)
|
type_filter = classify_retrieval_intent(user_message)
|
||||||
|
context_pieces, sources = retrieve_context(user_message, type_filter=type_filter)
|
||||||
history = get_conversation_history(conversation_id)
|
history = get_conversation_history(conversation_id)
|
||||||
|
|
||||||
context_parts = []
|
context_parts = []
|
||||||
@@ -381,7 +498,7 @@ async def login(request: Request, response: Response):
|
|||||||
httponly=True,
|
httponly=True,
|
||||||
secure=True,
|
secure=True,
|
||||||
samesite="lax",
|
samesite="lax",
|
||||||
max_age=60 * 60 * 24 * 30
|
max_age=SESSION_MAX_AGE_SECONDS
|
||||||
)
|
)
|
||||||
response.body = b'{"ok": true}'
|
response.body = b'{"ok": true}'
|
||||||
response.status_code = 200
|
response.status_code = 200
|
||||||
@@ -686,44 +803,92 @@ async def run_dreamer(request: Request, auth: str = Depends(require_auth)):
|
|||||||
return JSONResponse({"started": False, "error": str(e)})
|
return JSONResponse({"started": False, "error": str(e)})
|
||||||
|
|
||||||
def transcribe_and_save(tmp_path, timestamp, nextcloud_url, nextcloud_user, nextcloud_password):
|
def transcribe_and_save(tmp_path, timestamp, nextcloud_url, nextcloud_user, nextcloud_password):
|
||||||
"""Background task — transcribes audio and saves to Nextcloud after endpoint returns."""
|
"""Background task — transcribes audio and saves to Nextcloud after endpoint returns.
|
||||||
|
Audio is preserved in Journal/Media/ on every terminal path; failed and empty-transcript
|
||||||
|
captures still produce a markdown record in Journal/Captures/ with a status field."""
|
||||||
import requests as req_lib
|
import requests as req_lib
|
||||||
nc_auth = (nextcloud_user, nextcloud_password)
|
nc_auth = (nextcloud_user, nextcloud_password)
|
||||||
|
month_dir = timestamp[:7]
|
||||||
|
audio_ext = os.path.splitext(tmp_path)[1] or ".webm"
|
||||||
|
audio_filename = f"{timestamp}-voice{audio_ext}"
|
||||||
|
audio_relpath = f"Journal/Media/{month_dir}/{audio_filename}"
|
||||||
|
|
||||||
|
def archive_audio() -> bool:
|
||||||
|
try:
|
||||||
|
with open(tmp_path, "rb") as f:
|
||||||
|
audio_bytes = f.read()
|
||||||
|
media_parent = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Media"
|
||||||
|
media_dir = f"{media_parent}/{month_dir}"
|
||||||
|
req_lib.request("MKCOL", media_parent, auth=nc_auth, timeout=10)
|
||||||
|
req_lib.request("MKCOL", media_dir, auth=nc_auth, timeout=10)
|
||||||
|
req_lib.put(f"{media_dir}/{audio_filename}", data=audio_bytes, auth=nc_auth, timeout=60)
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Audio archival failed for {timestamp}: {e}")
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp_path):
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
|
||||||
|
def write_capture(filename: str, content_md: str, status: str):
|
||||||
|
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
|
||||||
|
try:
|
||||||
|
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
|
||||||
|
req_lib.put(f"{captures_dir}/{filename}", data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Capture markdown write failed for {timestamp}: {e}")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
payload = {"type": "capture_saved", "filename": filename, "timestamp": timestamp, "status": status}
|
||||||
|
req_lib.post("http://localhost:8000/api/events/notify", json=payload, timeout=3)
|
||||||
|
req_lib.post("http://localhost:8000/api/captures/events/notify", json=payload, timeout=3)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
transcript = ""
|
||||||
|
transcribe_error = None
|
||||||
try:
|
try:
|
||||||
segments, _ = whisper_model.transcribe(
|
segments, _ = whisper_model.transcribe(
|
||||||
tmp_path, language="en", vad_filter=True, beam_size=1, initial_prompt=WHISPER_PROMPT
|
tmp_path, language="en", vad_filter=True, beam_size=1, initial_prompt=WHISPER_PROMPT
|
||||||
)
|
)
|
||||||
transcript = " ".join(s.text.strip() for s in segments).strip()
|
transcript = " ".join(s.text.strip() for s in segments).strip()
|
||||||
os.unlink(tmp_path)
|
|
||||||
if not transcript:
|
|
||||||
print(f"Async transcription empty for {timestamp} — nothing saved")
|
|
||||||
return
|
|
||||||
filename = f"{timestamp}-voice.md"
|
|
||||||
content_md = f"# Capture — {timestamp}\n\n**type:** voice\n**modality:** audio\n**status:** unprocessed\n\n---\n\n{transcript}\n"
|
|
||||||
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
|
|
||||||
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
|
|
||||||
url = f"{captures_dir}/{filename}"
|
|
||||||
req_lib.put(url, data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
|
|
||||||
print(f"Async transcription saved: {filename}")
|
|
||||||
# Notify SSE clients that transcription is complete
|
|
||||||
try:
|
|
||||||
import requests as _req
|
|
||||||
_req.post("http://localhost:8000/api/events/notify", json={
|
|
||||||
"type": "capture_saved",
|
|
||||||
"filename": filename,
|
|
||||||
"timestamp": timestamp,
|
|
||||||
}, timeout=3)
|
|
||||||
_req.post("http://localhost:8000/api/captures/events/notify", json={
|
|
||||||
"type": "capture_saved",
|
|
||||||
"filename": filename,
|
|
||||||
"timestamp": timestamp,
|
|
||||||
}, timeout=3)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if os.path.exists(tmp_path):
|
transcribe_error = str(e)
|
||||||
os.unlink(tmp_path)
|
|
||||||
print(f"Async transcription failed for {timestamp}: {e}")
|
audio_archived = archive_audio()
|
||||||
|
audio_line = f"**audio_path:** {audio_relpath}\n" if audio_archived else "**audio_archive_failed:** true\n"
|
||||||
|
|
||||||
|
if transcribe_error is not None:
|
||||||
|
filename = f"{timestamp}-voice-failed.md"
|
||||||
|
content_md = (
|
||||||
|
f"# Capture — {timestamp}\n\n"
|
||||||
|
f"**type:** voice\n**modality:** audio\n**status:** failed_transcription\n"
|
||||||
|
f"{audio_line}"
|
||||||
|
f"**error:** {transcribe_error}\n"
|
||||||
|
)
|
||||||
|
write_capture(filename, content_md, "failed_transcription")
|
||||||
|
print(f"Async transcription failed for {timestamp}: {transcribe_error}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if not transcript:
|
||||||
|
filename = f"{timestamp}-voice-empty.md"
|
||||||
|
content_md = (
|
||||||
|
f"# Capture — {timestamp}\n\n"
|
||||||
|
f"**type:** voice\n**modality:** audio\n**status:** empty_transcript\n"
|
||||||
|
f"{audio_line}"
|
||||||
|
)
|
||||||
|
write_capture(filename, content_md, "empty_transcript")
|
||||||
|
print(f"Async transcription empty for {timestamp}: audio archived")
|
||||||
|
return
|
||||||
|
|
||||||
|
filename = f"{timestamp}-voice.md"
|
||||||
|
content_md = (
|
||||||
|
f"# Capture — {timestamp}\n\n"
|
||||||
|
f"**type:** voice\n**modality:** audio\n**status:** saved\n"
|
||||||
|
f"{audio_line}\n---\n\n{transcript}\n"
|
||||||
|
)
|
||||||
|
write_capture(filename, content_md, "saved")
|
||||||
|
print(f"Async transcription saved: {filename}")
|
||||||
|
|
||||||
|
|
||||||
@app.post("/api/capture")
|
@app.post("/api/capture")
|
||||||
@@ -830,7 +995,7 @@ Keep the full description to 150-250 words. Do not speculate beyond what is visi
|
|||||||
|
|
||||||
**type:** {capture_type}
|
**type:** {capture_type}
|
||||||
**modality:** {modality}
|
**modality:** {modality}
|
||||||
**status:** unprocessed
|
**status:** saved
|
||||||
**media:** {media_path}
|
**media:** {media_path}
|
||||||
{f"**project:** {project}" if project else ""}
|
{f"**project:** {project}" if project else ""}
|
||||||
|
|
||||||
|
|||||||
+70
-4
@@ -12,6 +12,7 @@ Replaces four separate extract reimplementations and two extract-chunk-embed pat
|
|||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from docx import Document as DocxDocument
|
from docx import Document as DocxDocument
|
||||||
@@ -24,6 +25,62 @@ SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
|
|||||||
DEFAULT_CHUNK_SIZE = 500
|
DEFAULT_CHUNK_SIZE = 500
|
||||||
DEFAULT_CHUNK_OVERLAP = 50
|
DEFAULT_CHUNK_OVERLAP = 50
|
||||||
|
|
||||||
|
_BOLD_KV_RE = re.compile(r"^\*\*[\w +/-]+?:\*\*")
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_md_frontmatter(text: str) -> str:
|
||||||
|
"""Strip a leading frontmatter block from markdown, if present.
|
||||||
|
|
||||||
|
Recognizes two formats:
|
||||||
|
- YAML-style: file's first non-empty line is `---`, terminated by `---`.
|
||||||
|
Only triggered when no heading precedes — guards against `---`
|
||||||
|
horizontal rules that follow an H1.
|
||||||
|
- Capture-style: optional H1 heading, then one or more `**key:** value`
|
||||||
|
lines (and blanks), terminated by `---`. The H1 is preserved; the
|
||||||
|
key/value block + separator are removed.
|
||||||
|
|
||||||
|
Body `---` rules and body `**bold:**` lines are never touched — the scan
|
||||||
|
aborts as soon as a non-frontmatter line appears in the leading block.
|
||||||
|
"""
|
||||||
|
lines = text.splitlines()
|
||||||
|
n = len(lines)
|
||||||
|
i = 0
|
||||||
|
while i < n and not lines[i].strip():
|
||||||
|
i += 1
|
||||||
|
heading = None
|
||||||
|
if i < n and lines[i].startswith("# "):
|
||||||
|
heading = lines[i]
|
||||||
|
i += 1
|
||||||
|
while i < n and not lines[i].strip():
|
||||||
|
i += 1
|
||||||
|
if i >= n:
|
||||||
|
return text
|
||||||
|
first = lines[i].strip()
|
||||||
|
if heading is None and first == "---":
|
||||||
|
j = i + 1
|
||||||
|
while j < n and lines[j].strip() != "---":
|
||||||
|
j += 1
|
||||||
|
if j >= n:
|
||||||
|
return text
|
||||||
|
body_start = j + 1
|
||||||
|
elif _BOLD_KV_RE.match(first):
|
||||||
|
j = i
|
||||||
|
while j < n:
|
||||||
|
s = lines[j].strip()
|
||||||
|
if not s or _BOLD_KV_RE.match(s):
|
||||||
|
j += 1
|
||||||
|
continue
|
||||||
|
if s == "---":
|
||||||
|
body_start = j + 1
|
||||||
|
break
|
||||||
|
return text
|
||||||
|
else:
|
||||||
|
return text
|
||||||
|
else:
|
||||||
|
return text
|
||||||
|
body = "\n".join(lines[body_start:]).lstrip("\n")
|
||||||
|
return f"{heading}\n\n{body}" if heading else body
|
||||||
|
|
||||||
|
|
||||||
def _docx_cell_paragraphs(cell):
|
def _docx_cell_paragraphs(cell):
|
||||||
yield from (p for p in cell.paragraphs if p.text.strip())
|
yield from (p for p in cell.paragraphs if p.text.strip())
|
||||||
@@ -89,7 +146,10 @@ def extract_text(filepath: Path) -> str:
|
|||||||
parts.append(notes)
|
parts.append(notes)
|
||||||
return "\n".join(parts)
|
return "\n".join(parts)
|
||||||
elif suffix in {".txt", ".md"}:
|
elif suffix in {".txt", ".md"}:
|
||||||
return filepath.read_text(encoding="utf-8", errors="ignore")
|
text = filepath.read_text(encoding="utf-8", errors="ignore")
|
||||||
|
if suffix == ".md":
|
||||||
|
return _strip_md_frontmatter(text)
|
||||||
|
return text
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.warning(f"Text extraction failed for {filepath.name}: {e}")
|
log.warning(f"Text extraction failed for {filepath.name}: {e}")
|
||||||
return ""
|
return ""
|
||||||
@@ -142,8 +202,8 @@ def chunk_and_embed(text: str,
|
|||||||
return rows
|
return rows
|
||||||
|
|
||||||
|
|
||||||
def write_embeddings_batch(conn, batch: list[dict]) -> int:
|
def write_embeddings_batch(conn, batch: list[dict], commit: bool = True) -> int:
|
||||||
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits.
|
"""Single canonical INSERT. Sets created_at = NOW() server-side.
|
||||||
|
|
||||||
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
|
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
|
||||||
callers do not need to provide it. The application-layer assertion is the
|
callers do not need to provide it. The application-layer assertion is the
|
||||||
@@ -151,6 +211,11 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
|
|||||||
historical NULLs were resolved by the Improvement #2 backfill, and a
|
historical NULLs were resolved by the Improvement #2 backfill, and a
|
||||||
Python-level raise gives a faster, more debuggable failure than a
|
Python-level raise gives a faster, more debuggable failure than a
|
||||||
Postgres constraint error.
|
Postgres constraint error.
|
||||||
|
|
||||||
|
When commit=True (default), this function commits the connection itself.
|
||||||
|
When commit=False, the caller is responsible for committing. Use
|
||||||
|
commit=False when composing this write with other writes that must land
|
||||||
|
atomically in the same transaction.
|
||||||
"""
|
"""
|
||||||
if not batch:
|
if not batch:
|
||||||
return 0
|
return 0
|
||||||
@@ -173,5 +238,6 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
|
|||||||
metadata = EXCLUDED.metadata
|
metadata = EXCLUDED.metadata
|
||||||
""", (row["id"], row["document"], row["embedding"],
|
""", (row["id"], row["document"], row["embedding"],
|
||||||
row["source"], row["type"], json.dumps(row["metadata"])))
|
row["source"], row["type"], json.dumps(row["metadata"])))
|
||||||
conn.commit()
|
if commit:
|
||||||
|
conn.commit()
|
||||||
return len(batch)
|
return len(batch)
|
||||||
|
|||||||
@@ -0,0 +1,135 @@
|
|||||||
|
"""One-off: re-ingest docx+pptx after the 2026-05-04 extractor upgrade (commit 93c0d89).
|
||||||
|
|
||||||
|
Pre-upgrade extraction missed tables, headers/footers, text boxes, group shapes,
|
||||||
|
and pptx notes — leaving CVs/dossiers as section-header skeletons in the index.
|
||||||
|
|
||||||
|
Steps when run with --apply:
|
||||||
|
1. DELETE all embeddings rows where source ends in .docx or .pptx
|
||||||
|
2. Walk NEXTCLOUD_PATH and re-ingest every .docx/.pptx via _ingest_one
|
||||||
|
3. Stage 2 enqueue is suppressed (SKIP_STAGE2_ENQUEUE=1)
|
||||||
|
|
||||||
|
Without --apply: dry-run. Counts files and chunks, prints a sample, writes nothing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
os.environ["SKIP_STAGE2_ENQUEUE"] = "1"
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||||
|
|
||||||
|
import psycopg2
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
from ingest import _ingest_one, get_pg
|
||||||
|
|
||||||
|
NEXTCLOUD_PATH = Path("/home/aaron/nextcloud/data/data/aaron/files")
|
||||||
|
TARGET_EXTS = {".docx", ".pptx"}
|
||||||
|
|
||||||
|
APPLY = "--apply" in sys.argv
|
||||||
|
|
||||||
|
|
||||||
|
def count_stale():
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"SELECT lower(substring(source from '\\.[^.]+$')) AS ext, "
|
||||||
|
"COUNT(DISTINCT source) AS files, COUNT(*) AS chunks "
|
||||||
|
"FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$' "
|
||||||
|
"GROUP BY 1 ORDER BY 1"
|
||||||
|
)
|
||||||
|
rows = cur.fetchall()
|
||||||
|
pg.close()
|
||||||
|
return rows
|
||||||
|
|
||||||
|
|
||||||
|
def delete_stale():
|
||||||
|
pg = get_pg()
|
||||||
|
cur = pg.cursor()
|
||||||
|
cur.execute("DELETE FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$'")
|
||||||
|
deleted = cur.rowcount
|
||||||
|
pg.commit()
|
||||||
|
pg.close()
|
||||||
|
return deleted
|
||||||
|
|
||||||
|
|
||||||
|
def find_files():
|
||||||
|
files = []
|
||||||
|
for f in NEXTCLOUD_PATH.rglob("*"):
|
||||||
|
if not f.is_file():
|
||||||
|
continue
|
||||||
|
if f.suffix.lower() not in TARGET_EXTS:
|
||||||
|
continue
|
||||||
|
if f.name.startswith(("~$", ".")):
|
||||||
|
continue
|
||||||
|
files.append(f)
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
print(f"Mode: {'APPLY (destructive)' if APPLY else 'DRY-RUN (no writes)'}")
|
||||||
|
print(f"Target: {NEXTCLOUD_PATH}")
|
||||||
|
print(f"Extensions: {sorted(TARGET_EXTS)}")
|
||||||
|
print(f"SKIP_STAGE2_ENQUEUE={os.environ.get('SKIP_STAGE2_ENQUEUE')}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("Stale chunks currently in DB:")
|
||||||
|
for ext, files, chunks in count_stale():
|
||||||
|
print(f" {ext}: {files} files, {chunks} chunks")
|
||||||
|
print()
|
||||||
|
|
||||||
|
files = find_files()
|
||||||
|
by_ext = {}
|
||||||
|
for f in files:
|
||||||
|
by_ext.setdefault(f.suffix.lower(), []).append(f)
|
||||||
|
print(f"Files on disk to re-ingest:")
|
||||||
|
for ext, lst in sorted(by_ext.items()):
|
||||||
|
print(f" {ext}: {len(lst)} files")
|
||||||
|
print(f" total: {len(files)}")
|
||||||
|
print()
|
||||||
|
print("Sample (5 random):")
|
||||||
|
import random
|
||||||
|
for f in random.sample(files, min(5, len(files))):
|
||||||
|
print(f" {f}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
if not APPLY:
|
||||||
|
print("Dry-run only. Re-run with --apply to delete + re-ingest.")
|
||||||
|
return
|
||||||
|
|
||||||
|
print("Deleting stale chunks...")
|
||||||
|
n = delete_stale()
|
||||||
|
print(f" deleted {n} rows")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("Loading embedder...")
|
||||||
|
embedder = SentenceTransformer("all-MiniLM-L6-v2")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(f"Re-ingesting {len(files)} files...")
|
||||||
|
started = time.time()
|
||||||
|
ingested = failed = total_chunks = 0
|
||||||
|
for i, f in enumerate(files, 1):
|
||||||
|
n = _ingest_one(f, embedder, root=NEXTCLOUD_PATH)
|
||||||
|
if n > 0:
|
||||||
|
ingested += 1
|
||||||
|
total_chunks += n
|
||||||
|
else:
|
||||||
|
failed += 1
|
||||||
|
if i % 25 == 0 or i == len(files):
|
||||||
|
elapsed = time.time() - started
|
||||||
|
rate = i / elapsed if elapsed else 0
|
||||||
|
print(f" [{i}/{len(files)}] ingested={ingested} failed={failed} "
|
||||||
|
f"chunks={total_chunks} ({rate:.1f} files/s)")
|
||||||
|
elapsed = time.time() - started
|
||||||
|
print()
|
||||||
|
print(f"Done in {elapsed:.0f}s: {ingested} ingested, {failed} failed, "
|
||||||
|
f"{total_chunks} chunks written.")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
"""End-to-end test of retrieve_context with intent routing + reranking.
|
||||||
|
|
||||||
|
Avoids loading the full FastAPI app; replicates the chat-handler retrieval
|
||||||
|
call shape and prints classifier output + final ranked sources for each query.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
|
||||||
|
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
|
||||||
|
# Stub anthropic so api.py import doesn't fail without the SDK loaded.
|
||||||
|
# We only need retrieve_context + classify_retrieval_intent.
|
||||||
|
import types
|
||||||
|
sys.modules.setdefault("anthropic", types.ModuleType("anthropic"))
|
||||||
|
sys.modules["anthropic"].Anthropic = lambda **kw: None
|
||||||
|
|
||||||
|
# Same for whisper if present
|
||||||
|
if "faster_whisper" not in sys.modules:
|
||||||
|
sys.modules["faster_whisper"] = types.ModuleType("faster_whisper")
|
||||||
|
|
||||||
|
import importlib.util
|
||||||
|
spec = importlib.util.spec_from_file_location("api", Path(__file__).parent / "api.py")
|
||||||
|
api = importlib.util.module_from_spec(spec)
|
||||||
|
# Don't execute the whole module (it starts FastAPI). Instead, exec only definitions.
|
||||||
|
# Easier: just import the functions we need by exec'ing the file but catching errors.
|
||||||
|
try:
|
||||||
|
spec.loader.exec_module(api)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"(continuing despite api.py side-effect error: {e})")
|
||||||
|
|
||||||
|
retrieve_context = api.retrieve_context
|
||||||
|
classify_retrieval_intent = api.classify_retrieval_intent
|
||||||
|
|
||||||
|
QUERIES = [
|
||||||
|
"write me a bio",
|
||||||
|
"my professional bio",
|
||||||
|
"draft a bio for the Utah application",
|
||||||
|
"Aaron Nelson CV consulting and design work",
|
||||||
|
"FWN3D consulting",
|
||||||
|
"syllabi I have taught",
|
||||||
|
"philosophy of teaching",
|
||||||
|
"what did I tell Claude about FWN3D",
|
||||||
|
"what did we discuss about the Utah job",
|
||||||
|
"Hudson Valley Additive Manufacturing Center",
|
||||||
|
]
|
||||||
|
|
||||||
|
for q in QUERIES:
|
||||||
|
intent = classify_retrieval_intent(q)
|
||||||
|
pieces, sources = retrieve_context(q, type_filter=intent)
|
||||||
|
print(f"\n=== {q!r} ===")
|
||||||
|
print(f" intent: {intent}")
|
||||||
|
for i, src in enumerate(sources, 1):
|
||||||
|
print(f" {i}. {src}")
|
||||||
Reference in New Issue
Block a user