Compare commits

..

6 Commits

Author SHA1 Message Date
aaron 8d560f9f5e api.py: hybrid retrieval with intent routing and cross-encoder rerank
Replaces pure-dense top-8 retrieval with a three-stage pipeline:
- BM25 (tsvector + websearch_to_tsquery) and dense (pgvector) in parallel,
  fused with Reciprocal Rank Fusion
- Optional type filter driven by classify_retrieval_intent() so questions
  about prior conversations don't pull documents and vice versa
- Cross-encoder rerank (ms-marco-MiniLM-L-6-v2) over RRF candidates before
  taking final top-N

Also adds scripts/reindex_docx_pptx.py — one-off re-ingest used to recover
table/header/text-box content in docx and pptx after the 93c0d89 extractor
upgrade — and scripts/test_retrieval.py to exercise the new pipeline against
representative queries.

Schema: requires GIN index on to_tsvector('english', document) (already
created out-of-band via psql since Apache AGE in shared_preload_libraries
blocks ALTER TABLE on this database).
2026-05-19 21:11:15 +00:00
aaron 732e450d21 Stop silent data loss in voice capture pipeline
Empty transcripts and transcription failures previously
deleted the temp audio and returned without writing any
record to disk — violating parity-at-encode (raw content
is episodic context, not noise).

- Preserve audio in Journal/Media/YYYY-MM/ on all paths
  (success, empty, failure) instead of unlinking.
- Write a markdown entry to Journal/Captures/ on failure
  paths with status, audio_path, and error fields.
- Add status: saved to successful captures so frontmatter
  is uniform across success and failure.
- Fire SSE capture_saved events on all terminal paths,
  with status included.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:41:51 +00:00
aaron 63c58b5bb3 Extend session lifetime to 365 days
Single-user personal app threat model is theft-of-device, not
stolen-cookie. 30-day idle re-prompts created friction without
proportional security benefit. Server TTL and client max-age
remain in sync via shared constant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:29:38 +00:00
aaron 6c2af55e7e Server-side session TTL enforcement
- session_exists() now rejects rows older than 30 days,
  matching the client cookie max-age.
- Opportunistic cleanup of expired rows on session_exists()
  calls, preventing unbounded growth of sessions.db from
  orphaned tokens (PWA reinstalls, manual cookie clears).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:28:39 +00:00
aaron 5b4a299414 encoding.py: write_embeddings_batch accepts commit parameter for transactional composition
Adds an optional commit=True parameter to write_embeddings_batch. When True
(default, matching prior behavior), the function commits the connection
after the per-row UPSERT loop. When False, the caller manages the
transaction.

This unblocks fix #1 (pgvector-bypass paths) and fix #2 (watcher
two-transaction pattern), both of which need to compose embeddings writes
with other database writes in the same transaction. Without this lever,
either fix would require duplicating the UPSERT logic outside this helper
or introducing a second commit boundary inside an otherwise atomic
operation.

No behavior change for existing callers — they all use the default
commit=True and continue working unchanged.
2026-05-05 02:52:33 +00:00
aaron b09e35892c encoding.py: strip frontmatter from .md at extraction time
The capture endpoint (api.py:702, 833) writes Journal/Captures/*.md
files with a markdown-bold-style header block (`**type:** voice`,
`**modality:** audio`, `**status:** unprocessed`, optional `**media:**`
and `**project:**`) followed by a `---` separator. extract_text for .md
was a bare filepath.read_text, so every capture-derived chunk in
pgvector embedded the frontmatter as raw text, polluting retrieval.

Fix adds _strip_md_frontmatter, called only for the .md branch:

- Capture-style: optional leading H1 (preserved), then consecutive
  `**key:** value` lines (and blanks), terminated by `---`. The H1 is
  retained; the key/value block + separator are removed.
- YAML-style: file's first non-empty line is `---`, terminated by `---`.
  Only triggered when no heading precedes — guards against the common
  `# Title` + `---` (horizontal rule under heading) pattern seen in
  Journal/aaronai-architecture.md and four other Journal/*.md files.

Body `**bold:**` lines (e.g. `**Visual description:**` in image
captures) and body `---` horizontal rules are never touched: the scan
aborts as soon as a non-frontmatter line appears in the leading block.

briefing_generator_v2.py's split("---", 1) heuristic was reviewed and
not reused — fragile on substring matches and on documents with
multiple `---` rules.

Verified against:
- 2026-04-26-22-44-voice.md: frontmatter stripped, body retained, H1
  retained.
- 2026-04-27-04-34-image.md: frontmatter stripped, `**Visual
  description:**` and `**Voice annotation:**` body bold-headers
  retained, trailing `---` not consumed.
- Journal/aaronai-architecture.md (5 body `---` rules): output
  byte-identical to read_text (96101 chars).
- Synthetic YAML doc: stripped correctly when no leading heading.
- Synthetic plain markdown with body `---` rules: untouched.
- Empty input + heading-only file: untouched.

Existing capture chunks in pgvector retain polluted text; the fix only
affects future extractions. Backfill decision deferred — the cleanest
path is `touch -h Journal/Captures/*.md` to bump mtime and let the
watcher re-ingest naturally on the next cycle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 02:20:55 +00:00
5 changed files with 478 additions and 53 deletions
+1
View File
@@ -8,6 +8,7 @@ dreamer_state.json
corpus_integrity_report.json
watcher_state.json
watcher_status.json
reindex_status.json
# Logs (these belong in /var/log/)
*.log
+214 -49
View File
@@ -1,12 +1,13 @@
import os
import re
import json
import sqlite3
import subprocess
import hashlib
from pathlib import Path
from datetime import datetime
from datetime import datetime, timedelta
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from sentence_transformers import SentenceTransformer, CrossEncoder
import anthropic
from fastapi import FastAPI, Request, Response, Depends, HTTPException, BackgroundTasks
import psycopg2
@@ -91,6 +92,7 @@ if HAS_WHISPER:
except Exception as e:
print(f"Whisper not available: {e}")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
# ChromaDB removed — using pgvector
anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
@@ -132,6 +134,7 @@ When making factual claims about Aaron — his history, credentials, locations,
# Auth configuration
import os
SESSION_PASSWORD = os.getenv("AARON_AI_PASSWORD", "changeme")
SESSION_MAX_AGE_SECONDS = 60 * 60 * 24 * 365
SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db")
def _init_sessions():
@@ -163,7 +166,10 @@ def delete_session(token: str):
def session_exists(token: str) -> bool:
conn = _connect_sessions()
row = conn.execute("SELECT 1 FROM sessions WHERE token = ?", (token,)).fetchone()
cutoff = (datetime.now() - timedelta(seconds=SESSION_MAX_AGE_SECONDS)).isoformat()
conn.execute("DELETE FROM sessions WHERE created_at < ?", (cutoff,))
conn.commit()
row = conn.execute("SELECT 1 FROM sessions WHERE token = ? AND created_at >= ?", (token, cutoff)).fetchone()
conn.close()
return row is not None
@@ -239,30 +245,140 @@ def remove_from_memory(item):
save_memory("\n".join(filtered))
return len(lines) - len(filtered)
def retrieve_context(query, n_results=8):
"""Pure semantic retrieval over pgvector. Top-N by cosine similarity, threshold 0.3.
No CV pinning, no keyword routing — see architecture doc substrate-dependency section.
Substrate-level workarounds (entity-keyed routing, hybrid retrieval) live at the
Graphiti layer, not as wrapper logic above pgvector."""
HYBRID_CANDIDATES = 30
RRF_K = 60
FINAL_LIMIT = 8
_TSQUERY_SANITIZE_RE = re.compile(r"[^\w\s\"'-]")
CONVERSATION_TYPES = ["chatgpt_conversation", "claude_conversation", "aaronai_conversation"]
DOCUMENT_TYPES = ["document"]
MEMORY_TYPES = ["claude_memory"]
_CONVO_SIGNALS = (
"what did i tell", "what did we discuss", "what did we talk",
"in our conversation", "you mentioned", "we talked about",
"earlier you said", "earlier i said", "did i tell you",
"did i say", "what did chatgpt", "what did claude",
)
_DOC_SIGNALS = (
"write me a bio", "draft a bio", "my bio", "my cv", "my resume",
"my professional", "my work history", "my exhibitions",
"my publications", "my syllabi", "my courses", "my teaching",
"my philosophy", "about my career", "draft a cover letter",
"draft my", "write a bio", "professional bio",
)
def _websearch_query(text: str) -> str:
"""Strip characters websearch_to_tsquery doesn't handle cleanly. Quoted
phrases and 'or' are preserved by the function itself."""
return _TSQUERY_SANITIZE_RE.sub(" ", text).strip()
def classify_retrieval_intent(query: str):
"""Return a list of `type` values to filter retrieval on, or None for all types.
Implementation is a low-effort keyword classifier — explicitly tunable and
swappable. For more nuanced routing, replace this with an LLM classifier call
that returns the same shape: a list of valid type strings or None.
Precedence: conversation signals win over document signals — a question like
"what did I tell you about my CV" is asking about the conversation, not the CV.
"""
q = query.lower()
if any(s in q for s in _CONVO_SIGNALS):
return CONVERSATION_TYPES
if any(s in q for s in _DOC_SIGNALS):
return DOCUMENT_TYPES
return None
def _rerank(query: str, candidates: list[tuple]) -> list[tuple]:
"""Cross-encoder rerank. Candidates are (id, document, source) tuples.
Returns the same tuples reordered by reranker score (highest first)."""
if not candidates:
return []
pairs = [(query, row[1]) for row in candidates]
scores = reranker.predict(pairs)
return [row for row, _ in sorted(zip(candidates, scores),
key=lambda x: x[1], reverse=True)]
def retrieve_context(query, n_results=FINAL_LIMIT, type_filter=None):
"""Hybrid retrieval (dense + lexical, RRF fused) followed by cross-encoder rerank.
- Dense (pgvector) handles paraphrase / semantic similarity.
- Lexical (tsvector) catches rare named tokens (FWN3D, Sono-Tek, course codes)
the embedding model has no signal for.
- RRF combines the two rankings without calibrating score scales.
- Cross-encoder rerank scores each (query, chunk) pair jointly, bridging
semantic gaps that bi-encoders can't (e.g., "write me a bio" -> CV chunk).
type_filter: optional list of `type` values to restrict the candidate pool to.
If None, retrieves from all types. Use classify_retrieval_intent() to derive."""
query_embedding = embedder.encode([query]).tolist()[0]
ts_query = _websearch_query(query)
context_pieces = []
sources = []
where_sql = ""
type_param = ()
if type_filter:
where_sql = "WHERE type = ANY(%s)"
type_param = (list(type_filter),)
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("""
SELECT document, source, 1 - (embedding <=> %s::vector) as similarity
cur.execute(f"""
SELECT id, document, source
FROM embeddings
{where_sql}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, query_embedding, n_results))
for doc, source, similarity in cur.fetchall():
if similarity > 0.3:
context_pieces.append(doc)
sources.append(source or "unknown")
""", (*type_param, query_embedding, HYBRID_CANDIDATES))
dense_hits = cur.fetchall()
lexical_hits = []
if ts_query:
lex_where = "to_tsvector('english', document) @@ websearch_to_tsquery('english', %s)"
full_where = (f"WHERE {lex_where} AND type = ANY(%s)"
if type_filter else f"WHERE {lex_where}")
lex_params = ((ts_query, list(type_filter)) if type_filter else (ts_query,))
cur.execute(f"""
SELECT id, document, source
FROM embeddings
{full_where}
ORDER BY ts_rank(to_tsvector('english', document),
websearch_to_tsquery('english', %s)) DESC
LIMIT %s
""", (*lex_params, ts_query, HYBRID_CANDIDATES))
lexical_hits = cur.fetchall()
pg.close()
scores = {}
rows_by_id = {}
for rank, row in enumerate(dense_hits):
scores[row[0]] = scores.get(row[0], 0) + 1.0 / (RRF_K + rank + 1)
rows_by_id[row[0]] = row
for rank, row in enumerate(lexical_hits):
scores[row[0]] = scores.get(row[0], 0) + 1.0 / (RRF_K + rank + 1)
rows_by_id[row[0]] = row
rrf_ranked = sorted(scores.items(), key=lambda kv: kv[1], reverse=True)
candidates = [rows_by_id[doc_id] for doc_id, _ in rrf_ranked]
for _id, doc, source in _rerank(query, candidates)[:n_results]:
context_pieces.append(doc)
sources.append(source or "unknown")
except Exception as e:
print(f"pgvector retrieval error: {e}")
print(f"hybrid retrieval error: {e}")
return context_pieces, sources
def get_conversation_history(conversation_id, limit=20):
@@ -302,7 +418,8 @@ def create_conversation(title="New conversation"):
def chat(user_message, conversation_id, settings, client_time=None):
memory = load_memory()
context_pieces, sources = retrieve_context(user_message)
type_filter = classify_retrieval_intent(user_message)
context_pieces, sources = retrieve_context(user_message, type_filter=type_filter)
history = get_conversation_history(conversation_id)
context_parts = []
@@ -381,7 +498,7 @@ async def login(request: Request, response: Response):
httponly=True,
secure=True,
samesite="lax",
max_age=60 * 60 * 24 * 30
max_age=SESSION_MAX_AGE_SECONDS
)
response.body = b'{"ok": true}'
response.status_code = 200
@@ -686,44 +803,92 @@ async def run_dreamer(request: Request, auth: str = Depends(require_auth)):
return JSONResponse({"started": False, "error": str(e)})
def transcribe_and_save(tmp_path, timestamp, nextcloud_url, nextcloud_user, nextcloud_password):
"""Background task — transcribes audio and saves to Nextcloud after endpoint returns."""
"""Background task — transcribes audio and saves to Nextcloud after endpoint returns.
Audio is preserved in Journal/Media/ on every terminal path; failed and empty-transcript
captures still produce a markdown record in Journal/Captures/ with a status field."""
import requests as req_lib
nc_auth = (nextcloud_user, nextcloud_password)
month_dir = timestamp[:7]
audio_ext = os.path.splitext(tmp_path)[1] or ".webm"
audio_filename = f"{timestamp}-voice{audio_ext}"
audio_relpath = f"Journal/Media/{month_dir}/{audio_filename}"
def archive_audio() -> bool:
try:
with open(tmp_path, "rb") as f:
audio_bytes = f.read()
media_parent = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Media"
media_dir = f"{media_parent}/{month_dir}"
req_lib.request("MKCOL", media_parent, auth=nc_auth, timeout=10)
req_lib.request("MKCOL", media_dir, auth=nc_auth, timeout=10)
req_lib.put(f"{media_dir}/{audio_filename}", data=audio_bytes, auth=nc_auth, timeout=60)
return True
except Exception as e:
print(f"Audio archival failed for {timestamp}: {e}")
return False
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
def write_capture(filename: str, content_md: str, status: str):
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
try:
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
req_lib.put(f"{captures_dir}/{filename}", data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
except Exception as e:
print(f"Capture markdown write failed for {timestamp}: {e}")
return
try:
payload = {"type": "capture_saved", "filename": filename, "timestamp": timestamp, "status": status}
req_lib.post("http://localhost:8000/api/events/notify", json=payload, timeout=3)
req_lib.post("http://localhost:8000/api/captures/events/notify", json=payload, timeout=3)
except Exception:
pass
transcript = ""
transcribe_error = None
try:
segments, _ = whisper_model.transcribe(
tmp_path, language="en", vad_filter=True, beam_size=1, initial_prompt=WHISPER_PROMPT
)
transcript = " ".join(s.text.strip() for s in segments).strip()
os.unlink(tmp_path)
if not transcript:
print(f"Async transcription empty for {timestamp} — nothing saved")
return
filename = f"{timestamp}-voice.md"
content_md = f"# Capture — {timestamp}\n\n**type:** voice\n**modality:** audio\n**status:** unprocessed\n\n---\n\n{transcript}\n"
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
url = f"{captures_dir}/{filename}"
req_lib.put(url, data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
print(f"Async transcription saved: {filename}")
# Notify SSE clients that transcription is complete
try:
import requests as _req
_req.post("http://localhost:8000/api/events/notify", json={
"type": "capture_saved",
"filename": filename,
"timestamp": timestamp,
}, timeout=3)
_req.post("http://localhost:8000/api/captures/events/notify", json={
"type": "capture_saved",
"filename": filename,
"timestamp": timestamp,
}, timeout=3)
except Exception:
pass
except Exception as e:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
print(f"Async transcription failed for {timestamp}: {e}")
transcribe_error = str(e)
audio_archived = archive_audio()
audio_line = f"**audio_path:** {audio_relpath}\n" if audio_archived else "**audio_archive_failed:** true\n"
if transcribe_error is not None:
filename = f"{timestamp}-voice-failed.md"
content_md = (
f"# Capture — {timestamp}\n\n"
f"**type:** voice\n**modality:** audio\n**status:** failed_transcription\n"
f"{audio_line}"
f"**error:** {transcribe_error}\n"
)
write_capture(filename, content_md, "failed_transcription")
print(f"Async transcription failed for {timestamp}: {transcribe_error}")
return
if not transcript:
filename = f"{timestamp}-voice-empty.md"
content_md = (
f"# Capture — {timestamp}\n\n"
f"**type:** voice\n**modality:** audio\n**status:** empty_transcript\n"
f"{audio_line}"
)
write_capture(filename, content_md, "empty_transcript")
print(f"Async transcription empty for {timestamp}: audio archived")
return
filename = f"{timestamp}-voice.md"
content_md = (
f"# Capture — {timestamp}\n\n"
f"**type:** voice\n**modality:** audio\n**status:** saved\n"
f"{audio_line}\n---\n\n{transcript}\n"
)
write_capture(filename, content_md, "saved")
print(f"Async transcription saved: {filename}")
@app.post("/api/capture")
@@ -830,7 +995,7 @@ Keep the full description to 150-250 words. Do not speculate beyond what is visi
**type:** {capture_type}
**modality:** {modality}
**status:** unprocessed
**status:** saved
**media:** {media_path}
{f"**project:** {project}" if project else ""}
+70 -4
View File
@@ -12,6 +12,7 @@ Replaces four separate extract reimplementations and two extract-chunk-embed pat
import hashlib
import json
import logging
import re
from pathlib import Path
from docx import Document as DocxDocument
@@ -24,6 +25,62 @@ SUPPORTED = {".docx", ".pdf", ".pptx", ".txt", ".md"}
DEFAULT_CHUNK_SIZE = 500
DEFAULT_CHUNK_OVERLAP = 50
_BOLD_KV_RE = re.compile(r"^\*\*[\w +/-]+?:\*\*")
def _strip_md_frontmatter(text: str) -> str:
"""Strip a leading frontmatter block from markdown, if present.
Recognizes two formats:
- YAML-style: file's first non-empty line is `---`, terminated by `---`.
Only triggered when no heading precedes — guards against `---`
horizontal rules that follow an H1.
- Capture-style: optional H1 heading, then one or more `**key:** value`
lines (and blanks), terminated by `---`. The H1 is preserved; the
key/value block + separator are removed.
Body `---` rules and body `**bold:**` lines are never touched — the scan
aborts as soon as a non-frontmatter line appears in the leading block.
"""
lines = text.splitlines()
n = len(lines)
i = 0
while i < n and not lines[i].strip():
i += 1
heading = None
if i < n and lines[i].startswith("# "):
heading = lines[i]
i += 1
while i < n and not lines[i].strip():
i += 1
if i >= n:
return text
first = lines[i].strip()
if heading is None and first == "---":
j = i + 1
while j < n and lines[j].strip() != "---":
j += 1
if j >= n:
return text
body_start = j + 1
elif _BOLD_KV_RE.match(first):
j = i
while j < n:
s = lines[j].strip()
if not s or _BOLD_KV_RE.match(s):
j += 1
continue
if s == "---":
body_start = j + 1
break
return text
else:
return text
else:
return text
body = "\n".join(lines[body_start:]).lstrip("\n")
return f"{heading}\n\n{body}" if heading else body
def _docx_cell_paragraphs(cell):
yield from (p for p in cell.paragraphs if p.text.strip())
@@ -89,7 +146,10 @@ def extract_text(filepath: Path) -> str:
parts.append(notes)
return "\n".join(parts)
elif suffix in {".txt", ".md"}:
return filepath.read_text(encoding="utf-8", errors="ignore")
text = filepath.read_text(encoding="utf-8", errors="ignore")
if suffix == ".md":
return _strip_md_frontmatter(text)
return text
except Exception as e:
log.warning(f"Text extraction failed for {filepath.name}: {e}")
return ""
@@ -142,8 +202,8 @@ def chunk_and_embed(text: str,
return rows
def write_embeddings_batch(conn, batch: list[dict]) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side. Commits.
def write_embeddings_batch(conn, batch: list[dict], commit: bool = True) -> int:
"""Single canonical INSERT. Sets created_at = NOW() server-side.
Every row dict must supply 'type'. created_at is SQL-supplied (NOW()), so
callers do not need to provide it. The application-layer assertion is the
@@ -151,6 +211,11 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
historical NULLs were resolved by the Improvement #2 backfill, and a
Python-level raise gives a faster, more debuggable failure than a
Postgres constraint error.
When commit=True (default), this function commits the connection itself.
When commit=False, the caller is responsible for committing. Use
commit=False when composing this write with other writes that must land
atomically in the same transaction.
"""
if not batch:
return 0
@@ -173,5 +238,6 @@ def write_embeddings_batch(conn, batch: list[dict]) -> int:
metadata = EXCLUDED.metadata
""", (row["id"], row["document"], row["embedding"],
row["source"], row["type"], json.dumps(row["metadata"])))
conn.commit()
if commit:
conn.commit()
return len(batch)
+135
View File
@@ -0,0 +1,135 @@
"""One-off: re-ingest docx+pptx after the 2026-05-04 extractor upgrade (commit 93c0d89).
Pre-upgrade extraction missed tables, headers/footers, text boxes, group shapes,
and pptx notes — leaving CVs/dossiers as section-header skeletons in the index.
Steps when run with --apply:
1. DELETE all embeddings rows where source ends in .docx or .pptx
2. Walk NEXTCLOUD_PATH and re-ingest every .docx/.pptx via _ingest_one
3. Stage 2 enqueue is suppressed (SKIP_STAGE2_ENQUEUE=1)
Without --apply: dry-run. Counts files and chunks, prints a sample, writes nothing.
"""
import os
import sys
import time
from pathlib import Path
os.environ["SKIP_STAGE2_ENQUEUE"] = "1"
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
import psycopg2
from sentence_transformers import SentenceTransformer
sys.path.insert(0, str(Path(__file__).parent))
from ingest import _ingest_one, get_pg
NEXTCLOUD_PATH = Path("/home/aaron/nextcloud/data/data/aaron/files")
TARGET_EXTS = {".docx", ".pptx"}
APPLY = "--apply" in sys.argv
def count_stale():
pg = get_pg()
cur = pg.cursor()
cur.execute(
"SELECT lower(substring(source from '\\.[^.]+$')) AS ext, "
"COUNT(DISTINCT source) AS files, COUNT(*) AS chunks "
"FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$' "
"GROUP BY 1 ORDER BY 1"
)
rows = cur.fetchall()
pg.close()
return rows
def delete_stale():
pg = get_pg()
cur = pg.cursor()
cur.execute("DELETE FROM embeddings WHERE lower(source) ~ '\\.(docx|pptx)$'")
deleted = cur.rowcount
pg.commit()
pg.close()
return deleted
def find_files():
files = []
for f in NEXTCLOUD_PATH.rglob("*"):
if not f.is_file():
continue
if f.suffix.lower() not in TARGET_EXTS:
continue
if f.name.startswith(("~$", ".")):
continue
files.append(f)
return files
def main():
print(f"Mode: {'APPLY (destructive)' if APPLY else 'DRY-RUN (no writes)'}")
print(f"Target: {NEXTCLOUD_PATH}")
print(f"Extensions: {sorted(TARGET_EXTS)}")
print(f"SKIP_STAGE2_ENQUEUE={os.environ.get('SKIP_STAGE2_ENQUEUE')}")
print()
print("Stale chunks currently in DB:")
for ext, files, chunks in count_stale():
print(f" {ext}: {files} files, {chunks} chunks")
print()
files = find_files()
by_ext = {}
for f in files:
by_ext.setdefault(f.suffix.lower(), []).append(f)
print(f"Files on disk to re-ingest:")
for ext, lst in sorted(by_ext.items()):
print(f" {ext}: {len(lst)} files")
print(f" total: {len(files)}")
print()
print("Sample (5 random):")
import random
for f in random.sample(files, min(5, len(files))):
print(f" {f}")
print()
if not APPLY:
print("Dry-run only. Re-run with --apply to delete + re-ingest.")
return
print("Deleting stale chunks...")
n = delete_stale()
print(f" deleted {n} rows")
print()
print("Loading embedder...")
embedder = SentenceTransformer("all-MiniLM-L6-v2")
print()
print(f"Re-ingesting {len(files)} files...")
started = time.time()
ingested = failed = total_chunks = 0
for i, f in enumerate(files, 1):
n = _ingest_one(f, embedder, root=NEXTCLOUD_PATH)
if n > 0:
ingested += 1
total_chunks += n
else:
failed += 1
if i % 25 == 0 or i == len(files):
elapsed = time.time() - started
rate = i / elapsed if elapsed else 0
print(f" [{i}/{len(files)}] ingested={ingested} failed={failed} "
f"chunks={total_chunks} ({rate:.1f} files/s)")
elapsed = time.time() - started
print()
print(f"Done in {elapsed:.0f}s: {ingested} ingested, {failed} failed, "
f"{total_chunks} chunks written.")
if __name__ == "__main__":
main()
+58
View File
@@ -0,0 +1,58 @@
"""End-to-end test of retrieve_context with intent routing + reranking.
Avoids loading the full FastAPI app; replicates the chat-handler retrieval
call shape and prints classifier output + final ranked sources for each query.
"""
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path.home() / "aaronai" / ".env", override=True)
sys.path.insert(0, str(Path(__file__).parent))
# Stub anthropic so api.py import doesn't fail without the SDK loaded.
# We only need retrieve_context + classify_retrieval_intent.
import types
sys.modules.setdefault("anthropic", types.ModuleType("anthropic"))
sys.modules["anthropic"].Anthropic = lambda **kw: None
# Same for whisper if present
if "faster_whisper" not in sys.modules:
sys.modules["faster_whisper"] = types.ModuleType("faster_whisper")
import importlib.util
spec = importlib.util.spec_from_file_location("api", Path(__file__).parent / "api.py")
api = importlib.util.module_from_spec(spec)
# Don't execute the whole module (it starts FastAPI). Instead, exec only definitions.
# Easier: just import the functions we need by exec'ing the file but catching errors.
try:
spec.loader.exec_module(api)
except Exception as e:
print(f"(continuing despite api.py side-effect error: {e})")
retrieve_context = api.retrieve_context
classify_retrieval_intent = api.classify_retrieval_intent
QUERIES = [
"write me a bio",
"my professional bio",
"draft a bio for the Utah application",
"Aaron Nelson CV consulting and design work",
"FWN3D consulting",
"syllabi I have taught",
"philosophy of teaching",
"what did I tell Claude about FWN3D",
"what did we discuss about the Utah job",
"Hudson Valley Additive Manufacturing Center",
]
for q in QUERIES:
intent = classify_retrieval_intent(q)
pieces, sources = retrieve_context(q, type_filter=intent)
print(f"\n=== {q!r} ===")
print(f" intent: {intent}")
for i, src in enumerate(sources, 1):
print(f" {i}. {src}")