Compare commits

..

11 Commits

Author SHA1 Message Date
aaron 7b77794319 api.py: enable PRAGMA foreign_keys=ON in _connect helper; clean up 2 message orphans
The messages table declares FOREIGN KEY (conversation_id) REFERENCES
conversations(id), but PRAGMA foreign_keys was never enabled — SQLite
defaults it to OFF per connection, and _connect() did not set it. Two
orphan rows existed in messages (conversation_id='test123' pointing at
a never-existing conversation; both rows from one ~11-second test event
on 2026-04-26).

Audit before changing the PRAGMA:
- All FOREIGN KEY declarations across both DBs (conversations.db,
  sessions.db) accounted for via PRAGMA foreign_key_list on each
  table. Only one FK exists: messages.conversation_id ->
  conversations.id, ON DELETE NO ACTION.
- All tables enumerated via sqlite_master. Two tables in
  conversations.db (conversations, messages); one in sessions.db
  (sessions). No surprises.
- PRAGMA foreign_key_check confirmed exactly the 2 known orphans and
  zero violations elsewhere.

Both delete paths in api.py (delete_conversation at :471, and
clear_all_conversations at :986) already delete from messages BEFORE
conversations, so cascade behavior was correct in code. The orphan
state was caused by a direct INSERT against a non-existent
conversation_id at chat-test time, which an unenforced FK silently
accepted. Turning the PRAGMA on prevents this class of bug at insert
time, not delete time — no delete-path code changes were needed.

Order of operations followed the constraint that orphan cleanup must
precede PRAGMA-on (SQLite would not retroactively delete orphans, but
foreign_key_check would surface them confusingly on any future
operation that touched the messages table):
1. DELETE FROM messages WHERE conversation_id NOT IN (SELECT id FROM
   conversations) — removed the 2 known orphans.
2. Added PRAGMA foreign_keys=ON to _connect() so every connection
   from _connect_conversations() and _connect_sessions() gets FK
   enforcement (SQLite requires per-connection setting).
3. Restarted aaronai.service.

Verification:
- Smoke: GET /api/conversations and /api/conversations/{id}/messages
  both return 200 with expected payloads against the live api.
- E2E single-delete: synthetic conversation + 2 messages inserted via
  the api's _connect helper (FK on); DELETE /api/conversations/{id}
  via the live endpoint removed both rows from both tables.
- Clear-all e2e: skipped on live DB (destructive) — code shape is
  structurally identical to single-delete, no FK-relevant logic
  difference.
- Load-bearing negative test: INSERT into messages with a
  non-existent conversation_id via _connect_conversations() raised
  sqlite3.IntegrityError("FOREIGN KEY constraint failed"). This is
  what proves the PRAGMA actually took effect, not just that we set
  it.

Final counts: 7 conversations, 290 messages (down from 292 by the 2
orphans cleaned up).

Note: an explicit BEGIN/COMMIT around the two-execute delete paths
was considered and skipped. SQLite's implicit-transactional default
already gives the atomicity needed; explicit transactions would be
clarity-only and belong in a separate commit.
2026-05-04 16:41:55 +00:00
aaron d985f9e91e dream.py: raise_for_status on manifest writes; total_chunks as actual corpus count
Two correctness bugs in dream_pipeline manifest assembly.

write_manifest at lines 487-491 swallowed HTTP 4xx/5xx responses
silently. requests.put() only raises on transport-level errors (DNS,
connection refused, timeout); 401/403/500/507 come back as Response
objects and never trigger the except. The code printed "Manifest
written" while the manifest never persisted. The same file's deliver()
function at line 434 already used response.raise_for_status() — the
pattern was already established, write_manifest just skipped it.

Fix: bind the response and call raise_for_status() before the success
print. The except message changes from "(non-critical)" to "manifest
not persisted" because HTTP failure now means manifest data was lost,
which is critical, not quiet.

corpus_data["total_chunks"] at lines 621-622 stored
delta["new_chunks"], duplicating the sibling field
new_chunks_since_last_dream. The field name claimed absolute corpus
size; the value was a delta of recently-touched files. Verified in
live manifests: total_chunks: 0 while pgvector held 11,379+ document
embeddings.

Fix: query SELECT COUNT(*) FROM embeddings inside dream_pipeline,
store as total_chunks. Tightly-scoped one-shot connect via the
existing get_pg() helper. Telemetry query failure is treated as
non-critical and falls back to 0 — pgvector hiccup should not crash
an otherwise successful dream pipeline.

Bonus finding (not fixed in this commit): new_chunks_since_last_dream
is itself misnamed. observe_corpus() reads the watcher's mtime cache
and counts files (not chunks) whose mtime is newer than last_dream.
Both fields were "files touched since last dream" duplicated under
two different names; this commit fixes only the total_chunks
semantics. Renaming new_chunks_since_last_dream is out of scope —
manifests are write-only telemetry today, no consumer reads either
field, and the rename is a separate decision.

Verification: real pipeline run produced manifest with total_chunks
matching SELECT COUNT(*) directly; doubled as a smoke test for the
embedder cache (single Loading weights line), type_distribution
propagation, and the manifest write success path.
2026-05-04 16:29:04 +00:00
aaron b9eea6cb62 watcher.py: extend lockfile filter to catch UTF-8-mangled ~$ prefixes
Three rows in ingest_failures were Office lockfile leftovers whose
filename starts with ~� (~ followed by the UTF-8 replacement
character) instead of ~$. Somewhere in the Nextcloud sync chain the $
byte was lost or replaced; the file now lives on disk as a real file
with this corrupted name. The watcher's ("~$", ".") prefix filter
didn't match, so each cycle tried to ingest these as pptx, hit
BadZipFile inside python-pptx (lockfiles aren't real Office documents),
and they ended up permanently in ingest_failures.

Three filter sites in watcher.py applied the lockfile prefix check:
  - ingest_file() at :127
  - get_changed_files() at :200
  - IngestHandler._should_ignore() at :290

All three now match ("~$", "~", ".") — broadened to catch any tilde
prefix, not just ~$. The cross-check against pgvector embeddings and
disk found zero legitimate tilde-prefixed files in the corpus, so the
broader filter has no false-positive risk in this corpus.

Cleanup: 3 ingest_failures rows resolved (filepath LIKE '%/~%').
Unresolved count drops 97 → 94.

If a fourth filter site is ever added, the right shape is consolidating
the lockfile prefix check to a shared function or constant. Three
parallel sites with three different tuple orderings is acceptable for
now but worth normalizing if the surface grows.
2026-05-04 16:19:56 +00:00
aaron 93c0d89308 encoding.py: extend docx and pptx extractors to walk tables, headers/footers, text-boxes, group shapes, and notes
The previous extractors walked only top-level body paragraphs (docx) and
top-level shape.text (pptx). Diagnostic on the 17 non-PDF "no_text"
ingest failures revealed that 13 docx files in the failure cohort have
100% of their content in tables (paras_with_text=0, table_cells=6-108).
These are syllabi, rosters, rubrics, and homework worksheets structured
as a single document-wide table — high-value academic content the corpus
was silently missing.

docx walker now covers:
- body paragraphs (existing)
- tables, including nested tables in cells (recursive helper)
- header and footer paragraphs per section
- text-box content via XPath against w:txbxContent (no first-class API
  in python-docx; future-proofing — none of the current failure cohort
  has text-boxes)

pptx walker now covers:
- top-level shape text (existing)
- recursive descent into group shapes
- table cell text via shape.has_table / shape.table.iter_cells()
- speaker notes via slide.notes_slide.notes_text_frame.text

Out of scope: SmartArt diagrams, chart titles/labels, OLE objects,
content controls. None of the current failure cohort has these.

Recovery: 13 of 17 failures now ingest successfully. The 4 remaining are
image-only pptx files (Renders.pptx, Ribbon Cutting Slideshow.pptx, two
GH Slicer Notes variants — all PICTURE-shape decks with no text in any
walkable structure). They stay in ingest_failures unresolved, awaiting
OCR or path exclusion.

Side effect worth noting: the regression check on 4 known-good files
that were already producing embeddings showed all four gained content
under the new walker — a Mod03 pptx grew from 23,993 to 57,462 chars
(+33,469), Braskem Report docx grew 33,050 to 38,977 (+5,927), DDF MA
program docx grew 37,210 to 47,603 (+10,393), SUNY PIF GRANT pptx grew
22,259 to 23,546 (+1,287). These files have been in the corpus all
along with table or notes content silently dropped. They will surface
the additional content on next re-ingest, improving retrieval quality
for any future query that touches them.

Cleanup: ingest_file already calls resolve_ingest_failure on successful
ingest, so the 13 recovered files were marked resolved=TRUE during the
retry pass. No separate cleanup SQL was needed.
2026-05-04 16:12:56 +00:00
aaron f18fb64fe5 watcher.py: exclude generative-graphic folders and zero-byte files
Two-sample diagnostic of the 128 ingest_failures rows surfaced two
folders whose contents are exclusively non-text PDFs (iText-produced
generative graphics from Processing sketches and computational design
sketches) and three zero-byte test artifacts. None of these have ever
produced an embedding chunk, and they have nothing extractable to
contribute. Excluding them removes 19 / 128 (15%) of the locked-out
failures from the cohort and prevents future versions of the same
patterns from re-failing.

Folder exclusions use path.parts membership rather than substring
matching — eliminates false-match risk if similarly-named folders
appear elsewhere in the corpus (e.g. an unrelated "Generative Design"
or "Computational Design 2017" directory created later). The existing
"Admin/Backups" / "Journal/Media" substring checks are looser, but
new exclusions take the tighter pattern.

Zero-byte filter goes in get_changed_files() only — the actual
ingestion gate. Adding stat() to _should_ignore() (the FS-event noise
filter) would introduce a race where the file is gone between event
fire and stat call. Empty files briefly trigger pending=True but
produce no work after debounce; cosmetic only.

Cleanup applied separately via UPDATE: 19 ingest_failures rows for
these paths marked resolved=TRUE. Unresolved-failure count: 129 -> 110.

Verified: get_changed_files() with empty state returns 1418 changed
files; all 5 excluded probes (2 folder-matched + 3 zero-byte) absent
from the result, control file present. Watcher service restarted
clean; startup scan reports no missed files.
2026-05-04 06:24:08 +00:00
aaron 72e07afc03 watcher.py: do not mark failed ingests as successfully ingested
ingest_files() updated state[path] = mtime unconditionally after every
ingest_file() call. ingest_file() returns 0 when text extraction fails,
embedding fails, no chunks are produced, or the pgvector write fails —
in every one of those cases, the path was still recorded as ingested
at the current mtime. On the next pass, get_changed_files() saw the
mtime match and skipped the file, locking it out of the corpus until
something modified it on disk.

record_ingest_failure() writes to a UI-visible failures table, but
nothing reads that table to retry. So failures accumulated silently:
the file was simultaneously logged as failed AND tracked in
watcher_state as up-to-date, and the second condition won.

Fix: only update watcher_state when ingest_file returns count > 0.
Failed ingests will be retried on the next watcher cycle until they
succeed or are explicitly excluded.

Diagnostic at fix time: 129 rows in ingest_failures, 128 currently
locked out of the corpus (filepath in watcher_state with mtime matching
current disk). 128/129 are text_extraction failures, mostly scanned
PDFs (106 .pdf, 13 .docx, 7 .pptx, 2 .md, 1 .txt). 1 source no longer
exists on disk. 0 have had their disk mtime change since failing — i.e.
without this fix, none of them would ever retry. Cross-check shows
watcher_state has 1466 paths vs. 1061 distinct sources in pgvector
embeddings, leaving a residual silent-gap of ~276 files after
accounting for failures.

Historical cleanup of files already locked out by this bug is tracked
separately. New failures from this commit forward will retry naturally.
2026-05-04 03:52:01 +00:00
aaron c3011c80a5 api.py: route all sqlite3.connect() through helpers; enable synchronous=NORMAL per-conn
Followup to 4204806 (WAL + index + backup.sh). The previous commit
deferred synchronous=NORMAL because it's a per-connection PRAGMA and
api.py has 16 sqlite3.connect() call sites — setting it once at init
would have applied to nothing afterwards.

Adds three helpers near the *_DB constants:
- _connect(path): inner; sets PRAGMA synchronous=NORMAL and uses
  timeout=5.0 (5000ms busy_timeout) on every new connection.
- _connect_conversations(), _connect_sessions(): named wrappers so call
  sites read explicitly.

Mechanical replacement at all 16 call sites: 4 sessions, 12 conversations.
No semantic change beyond the PRAGMA + busy_timeout — every site still
opens-then-closes, no held-open connections.

busy_timeout=5000ms is cheap insurance: under WAL with api.py as sole
writer, contention should be near-zero, but the backup.sh online-backup
path briefly holds a read lock on the source, and any future second
writer would otherwise hit SQLITE_BUSY immediately on contention.

Combined effect with WAL: per-write fsync count drops from ~2 to ~1
(WAL alone) further reduced by synchronous=NORMAL deferring fsyncs to
checkpoint boundaries. No durability loss for the use case (single
host, app crash tolerated, OS crash gives at most one lost transaction).

Not included: foreign_keys=ON. Audit found 2 orphan rows in messages
(conversation_id pointing to deleted conversations) and untested write
paths that could begin raising IntegrityError. Tracked as separate
followup: inspect orphans, identify the delete path that didn't
cascade, clean up, then enable enforcement and test chat delete flow
end-to-end.
2026-05-04 03:39:13 +00:00
aaron 4204806c80 conversations.db, sessions.db: enable WAL, add message index; update backup.sh
Both databases ran with journal_mode=delete — every write rewrote the
rollback journal per transaction. WAL eliminates the journal-rewrite and
lets readers run without blocking writers.

Index on messages(conversation_id, timestamp DESC) is preventive — only
280 rows today, but the access pattern (load conversation history in
order) is exactly what a composite index serves, and we don't want to
re-revisit this when the table grows.

backup.sh updated in the same commit because WAL changes the on-disk
layout: a bare `cp` of just the .db file can miss recently-committed
transactions that still live in the -wal sidecar, and can race with
concurrent writes to produce a torn file. Switched to the SQLite Online
Backup API via python3 -c "...src.backup(dst)..." — same mechanism as
the sqlite3 CLI's `.backup` (which isn't installed on this host),
handles WAL correctly without forcing a checkpoint, and is non-locking
from the writer's perspective. Verified backup integrity_check returns
ok and row counts match.

Note: synchronous=NORMAL was considered but deferred — it's a
per-connection PRAGMA, and applying it correctly requires a connect
helper that wraps every sqlite3.connect() call site in api.py (~14
sites). Out of scope for this commit; tracked as a follow-up. WAL alone
delivers the journal-rewrite elimination and reader/writer concurrency
improvements; the additional fsync reduction from synchronous=NORMAL is
a smaller marginal win on top.

Confirmed via concurrency audit that api.py is the sole writer to both
databases. ingest_conversations.py and dream.py are read-only consumers
of conversations.db; nothing else touches sessions.db.
2026-05-04 03:24:51 +00:00
aaron c5fc517fef ingest_conversations.py: lazy-load embedder to match ingest.py pattern
Embedder was instantiated at module import (~30-60s, ~200MB) regardless
of whether new conversations existed. On nights with no new content
(most nights per the logs), the script paid the load cost and exited
immediately. ingest.py:134 already uses lazy loading; this brings the
two ingest scripts into a consistent shape.
2026-05-04 03:13:45 +00:00
aaron b35d44ef58 dream.py: cache the SentenceTransformer embedder across retrieve() calls
Pipeline mode calls retrieve() three times (NREM, Early REM, Late REM).
Previously each call re-imported and re-instantiated SentenceTransformer
("all-MiniLM-L6-v2"), allocating ~200MB and spending 30-60s on disk->CPU
init three times sequentially. lru_cache(maxsize=1) makes the load happen
once per process.

Expected: pipeline runtime drops ~100-180s, removes 2x redundant 200MB
allocations, and reduces transient memory pressure during the same window
when other nightly jobs may run.
2026-05-04 03:11:22 +00:00
aaron a27f22ceaf api.py: switch whisper to distil-large-v3, beam_size=1, cpu_threads=4
Three changes to reduce voice-note transcription latency on the VPS:
- Model: large-v3 -> distil-large-v3 (~6x faster, near-identical English
  accuracy; language is already hardcoded "en").
- beam_size: 5 (default) -> 1 (~3-4x faster on clean audio).
- cpu_threads: 8 -> 4 (the box has 8 cores running api, dreamer, watcher,
  nextcloud concurrently; ctranslate2's inter-op pool plus context switching
  makes 4 effectively faster than 8 here).

Combined effect expected ~10-15x over prior config. No accuracy regression
expected for the voice-note use case (English, clean audio, domain terms
already supplied via initial_prompt).
2026-05-04 01:00:32 +00:00
6 changed files with 132 additions and 38 deletions
+36 -19
View File
@@ -38,6 +38,19 @@ load_dotenv(Path.home() / "aaronai" / ".env")
MEMORY_PATH = Path.home() / "aaronai" / "memory.md" MEMORY_PATH = Path.home() / "aaronai" / "memory.md"
CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
def _connect(path):
conn = sqlite3.connect(path, timeout=5.0)
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def _connect_conversations():
return _connect(CONVERSATIONS_DB)
def _connect_sessions():
return _connect(SESSIONS_DB)
SETTINGS_PATH = Path.home() / "aaronai" / "settings.json" SETTINGS_PATH = Path.home() / "aaronai" / "settings.json"
WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log") WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log")
WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json")
@@ -73,7 +86,7 @@ WHISPER_PROMPT = (
whisper_model = None whisper_model = None
if HAS_WHISPER: if HAS_WHISPER:
try: try:
whisper_model = WhisperModel("large-v3", device="cpu", compute_type="int8", cpu_threads=8) whisper_model = WhisperModel("distil-large-v3", device="cpu", compute_type="int8", cpu_threads=4)
print("Whisper model loaded") print("Whisper model loaded")
except Exception as e: except Exception as e:
print(f"Whisper not available: {e}") print(f"Whisper not available: {e}")
@@ -122,8 +135,9 @@ SESSION_PASSWORD = os.getenv("AARON_AI_PASSWORD", "changeme")
SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db") SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db")
def _init_sessions(): def _init_sessions():
conn = sqlite3.connect(SESSIONS_DB) conn = _connect_sessions()
conn.execute("CREATE TABLE IF NOT EXISTS sessions (token TEXT PRIMARY KEY, created_at TEXT)") conn.execute("CREATE TABLE IF NOT EXISTS sessions (token TEXT PRIMARY KEY, created_at TEXT)")
conn.execute("PRAGMA journal_mode=WAL")
conn.commit() conn.commit()
conn.close() conn.close()
@@ -136,19 +150,19 @@ def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest() return hashlib.sha256(password.encode()).hexdigest()
def save_session(token: str): def save_session(token: str):
conn = sqlite3.connect(SESSIONS_DB) conn = _connect_sessions()
conn.execute("INSERT OR REPLACE INTO sessions VALUES (?, ?)", (token, datetime.now().isoformat())) conn.execute("INSERT OR REPLACE INTO sessions VALUES (?, ?)", (token, datetime.now().isoformat()))
conn.commit() conn.commit()
conn.close() conn.close()
def delete_session(token: str): def delete_session(token: str):
conn = sqlite3.connect(SESSIONS_DB) conn = _connect_sessions()
conn.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit() conn.commit()
conn.close() conn.close()
def session_exists(token: str) -> bool: def session_exists(token: str) -> bool:
conn = sqlite3.connect(SESSIONS_DB) conn = _connect_sessions()
row = conn.execute("SELECT 1 FROM sessions WHERE token = ?", (token,)).fetchone() row = conn.execute("SELECT 1 FROM sessions WHERE token = ?", (token,)).fetchone()
conn.close() conn.close()
return row is not None return row is not None
@@ -163,7 +177,7 @@ def require_auth(request: Request):
return token return token
def init_conversations_db(): def init_conversations_db():
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS conversations ( c.execute('''CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
@@ -182,6 +196,8 @@ def init_conversations_db():
timestamp TEXT NOT NULL, timestamp TEXT NOT NULL,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) FOREIGN KEY (conversation_id) REFERENCES conversations(id)
)''') )''')
c.execute("PRAGMA journal_mode=WAL")
c.execute("CREATE INDEX IF NOT EXISTS idx_messages_conv_ts ON messages(conversation_id, timestamp DESC)")
conn.commit() conn.commit()
conn.close() conn.close()
@@ -250,7 +266,7 @@ def retrieve_context(query, n_results=8):
return context_pieces, sources return context_pieces, sources
def get_conversation_history(conversation_id, limit=20): def get_conversation_history(conversation_id, limit=20):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute('''SELECT role, content FROM messages c.execute('''SELECT role, content FROM messages
WHERE conversation_id = ? WHERE conversation_id = ?
@@ -260,7 +276,7 @@ def get_conversation_history(conversation_id, limit=20):
return [{"role": r[0], "content": r[1]} for r in reversed(rows)] return [{"role": r[0], "content": r[1]} for r in reversed(rows)]
def save_message(conversation_id, role, content, sources=None): def save_message(conversation_id, role, content, sources=None):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
msg_id = hashlib.md5(f"{conversation_id}{role}{datetime.now().isoformat()}".encode()).hexdigest() msg_id = hashlib.md5(f"{conversation_id}{role}{datetime.now().isoformat()}".encode()).hexdigest()
timestamp = datetime.now().isoformat() timestamp = datetime.now().isoformat()
@@ -274,7 +290,7 @@ def save_message(conversation_id, role, content, sources=None):
conn.close() conn.close()
def create_conversation(title="New conversation"): def create_conversation(title="New conversation"):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
conv_id = hashlib.md5(f"{datetime.now().isoformat()}".encode()).hexdigest()[:16] conv_id = hashlib.md5(f"{datetime.now().isoformat()}".encode()).hexdigest()[:16]
now = datetime.now().isoformat() now = datetime.now().isoformat()
@@ -409,7 +425,7 @@ async def update_settings(request: Request, auth: str = Depends(require_auth)):
@app.get("/api/conversations") @app.get("/api/conversations")
async def list_conversations(auth: str = Depends(require_auth)): async def list_conversations(auth: str = Depends(require_auth)):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute('''SELECT id, title, created_at, updated_at, message_count c.execute('''SELECT id, title, created_at, updated_at, message_count
FROM conversations ORDER BY updated_at DESC LIMIT 100''') FROM conversations ORDER BY updated_at DESC LIMIT 100''')
@@ -429,7 +445,7 @@ async def new_conversation(request: Request, auth: str = Depends(require_auth)):
@app.get("/api/conversations/{conv_id}/messages") @app.get("/api/conversations/{conv_id}/messages")
async def get_messages(conv_id: str, auth: str = Depends(require_auth)): async def get_messages(conv_id: str, auth: str = Depends(require_auth)):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute('''SELECT role, content, sources, timestamp FROM messages c.execute('''SELECT role, content, sources, timestamp FROM messages
WHERE conversation_id = ? ORDER BY timestamp ASC''', (conv_id,)) WHERE conversation_id = ? ORDER BY timestamp ASC''', (conv_id,))
@@ -446,7 +462,7 @@ async def rename_conversation(conv_id: str, request: Request, auth: str = Depend
title = data.get("title", "") title = data.get("title", "")
if not title: if not title:
return JSONResponse({"error": "Title required"}, status_code=400) return JSONResponse({"error": "Title required"}, status_code=400)
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("UPDATE conversations SET title = ? WHERE id = ?", (title, conv_id)) c.execute("UPDATE conversations SET title = ? WHERE id = ?", (title, conv_id))
conn.commit() conn.commit()
@@ -455,7 +471,7 @@ async def rename_conversation(conv_id: str, request: Request, auth: str = Depend
@app.delete("/api/conversations/{conv_id}") @app.delete("/api/conversations/{conv_id}")
async def delete_conversation(conv_id: str, auth: str = Depends(require_auth)): async def delete_conversation(conv_id: str, auth: str = Depends(require_auth)):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("DELETE FROM messages WHERE conversation_id = ?", (conv_id,)) c.execute("DELETE FROM messages WHERE conversation_id = ?", (conv_id,))
c.execute("DELETE FROM conversations WHERE id = ?", (conv_id,)) c.execute("DELETE FROM conversations WHERE id = ?", (conv_id,))
@@ -500,14 +516,14 @@ async def chat_endpoint(request: Request, auth: str = Depends(require_auth)):
save_message(conversation_id, "user", user_message) save_message(conversation_id, "user", user_message)
# Auto-title conversation from first message # Auto-title conversation from first message
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("SELECT message_count, title FROM conversations WHERE id = ?", (conversation_id,)) c.execute("SELECT message_count, title FROM conversations WHERE id = ?", (conversation_id,))
row = c.fetchone() row = c.fetchone()
conn.close() conn.close()
if row and row[0] <= 1 and row[1] == "New conversation": if row and row[0] <= 1 and row[1] == "New conversation":
auto_title = user_message[:60] + ("..." if len(user_message) > 60 else "") auto_title = user_message[:60] + ("..." if len(user_message) > 60 else "")
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("UPDATE conversations SET title = ? WHERE id = ?", (auto_title, conversation_id)) c.execute("UPDATE conversations SET title = ? WHERE id = ?", (auto_title, conversation_id))
conn.commit() conn.commit()
@@ -587,7 +603,7 @@ async def get_status(auth: str = Depends(require_auth)):
pass pass
# Conversation count # Conversation count
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("SELECT COUNT(*) FROM conversations") c.execute("SELECT COUNT(*) FROM conversations")
conv_count = c.fetchone()[0] conv_count = c.fetchone()[0]
@@ -623,6 +639,7 @@ async def transcribe_audio(request: Request, audio: UploadFile = File(...), auth
tmp_path, tmp_path,
language="en", language="en",
vad_filter=True, vad_filter=True,
beam_size=1,
initial_prompt=WHISPER_PROMPT initial_prompt=WHISPER_PROMPT
) )
transcript = " ".join(s.text.strip() for s in segments) transcript = " ".join(s.text.strip() for s in segments)
@@ -674,7 +691,7 @@ def transcribe_and_save(tmp_path, timestamp, nextcloud_url, nextcloud_user, next
nc_auth = (nextcloud_user, nextcloud_password) nc_auth = (nextcloud_user, nextcloud_password)
try: try:
segments, _ = whisper_model.transcribe( segments, _ = whisper_model.transcribe(
tmp_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT tmp_path, language="en", vad_filter=True, beam_size=1, initial_prompt=WHISPER_PROMPT
) )
transcript = " ".join(s.text.strip() for s in segments).strip() transcript = " ".join(s.text.strip() for s in segments).strip()
os.unlink(tmp_path) os.unlink(tmp_path)
@@ -760,7 +777,7 @@ async def capture_endpoint(
tmp.write(audio_bytes) tmp.write(audio_bytes)
tmp_audio_path = tmp.name tmp_audio_path = tmp.name
segments, _ = whisper_model.transcribe( segments, _ = whisper_model.transcribe(
tmp_audio_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT tmp_audio_path, language="en", vad_filter=True, beam_size=1, initial_prompt=WHISPER_PROMPT
) )
voice_annotation = " ".join(s.text.strip() for s in segments).strip() or None voice_annotation = " ".join(s.text.strip() for s in segments).strip() or None
os.unlink(tmp_audio_path) os.unlink(tmp_audio_path)
@@ -969,7 +986,7 @@ async def reindex_status(auth: str = Depends(require_auth)):
@app.delete("/api/conversations") @app.delete("/api/conversations")
async def clear_all_conversations(auth: str = Depends(require_auth)): async def clear_all_conversations(auth: str = Depends(require_auth)):
conn = sqlite3.connect(CONVERSATIONS_DB) conn = _connect_conversations()
c = conn.cursor() c = conn.cursor()
c.execute("DELETE FROM messages") c.execute("DELETE FROM messages")
c.execute("DELETE FROM conversations") c.execute("DELETE FROM conversations")
+1 -1
View File
@@ -6,7 +6,7 @@ mkdir -p "$BACKUP_DIR"
# Copy critical files # Copy critical files
cp ~/aaronai/memory.md "$BACKUP_DIR/memory-$DATE.md" cp ~/aaronai/memory.md "$BACKUP_DIR/memory-$DATE.md"
cp ~/aaronai/settings.json "$BACKUP_DIR/settings-$DATE.json" cp ~/aaronai/settings.json "$BACKUP_DIR/settings-$DATE.json"
cp ~/aaronai/conversations.db "$BACKUP_DIR/conversations-$DATE.db" python3 -c "import sqlite3, sys; src = sqlite3.connect('$HOME/aaronai/conversations.db'); dst = sqlite3.connect('$BACKUP_DIR/conversations-$DATE.db'); src.backup(dst); dst.close(); src.close()"
# Keep only last 7 days # Keep only last 7 days
find "$BACKUP_DIR" -name "*.md" -mtime +7 -delete find "$BACKUP_DIR" -name "*.md" -mtime +7 -delete
+23 -5
View File
@@ -16,6 +16,7 @@ import os
import json import json
import sqlite3 import sqlite3
import argparse import argparse
from functools import lru_cache
from collections import Counter from collections import Counter
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -283,6 +284,11 @@ def retrieve_graphiti(mode, task=None, n_results=8, excluded_sources=None):
print(f"[Graphiti retrieval error: {e}] — falling back to empty.") print(f"[Graphiti retrieval error: {e}] — falling back to empty.")
return [] return []
@lru_cache(maxsize=1)
def _get_embedder():
from sentence_transformers import SentenceTransformer
return SentenceTransformer("all-MiniLM-L6-v2")
def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None): def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=None):
# E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search # E3 experiment: DREAMER_SUBSTRATE=graphiti routes retrieval to Graphiti /search
# Default behavior: pgvector similarity search (unchanged) # Default behavior: pgvector similarity search (unchanged)
@@ -291,8 +297,7 @@ def retrieve(mode, task=None, n_results=8, excluded_sources=None, type_filter=No
substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector") substrate = os.getenv("DREAMER_SUBSTRATE", "pgvector")
if substrate == "graphiti": if substrate == "graphiti":
return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources) return retrieve_graphiti(mode, task=task, n_results=n_results, excluded_sources=excluded_sources)
from sentence_transformers import SentenceTransformer embedder = _get_embedder()
embedder = SentenceTransformer("all-MiniLM-L6-v2")
low, high = MODE_RANGES[mode] low, high = MODE_RANGES[mode]
if task: if task:
@@ -480,10 +485,11 @@ def write_manifest(date_str, stage_data, corpus_data):
auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD) auth = (NEXTCLOUD_USER, NEXTCLOUD_PASSWORD)
url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json" url = f"{DREAMS_WEBDAV}/dream-manifest-{date_str}.json"
try: try:
requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30) response = requests.put(url, data=content.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json") print(f"Manifest written: Journal/Dreams/dream-manifest-{date_str}.json")
except Exception as e: except Exception as e:
print(f"Manifest write failed (non-critical): {e}") print(f"Manifest write failed — manifest not persisted: {e}")
def dream_pipeline(type_filter=None): def dream_pipeline(type_filter=None):
@@ -613,8 +619,20 @@ def dream_pipeline(type_filter=None):
# Write manifest # Write manifest
all_session_sources = list(session_retrieved) all_session_sources = list(session_retrieved)
all_session_folders = list({extract_folder(s) for s in all_session_sources}) all_session_folders = list({extract_folder(s) for s in all_session_sources})
total_chunks = 0
pg = None
try:
pg = get_pg()
cur = pg.cursor()
cur.execute("SELECT COUNT(*) FROM embeddings")
total_chunks = cur.fetchone()[0]
except Exception as e:
print(f"total_chunks query failed (non-critical): {e}")
finally:
if pg is not None:
pg.close()
corpus_data = { corpus_data = {
"total_chunks": delta.get("new_chunks", 0), "total_chunks": total_chunks,
"new_chunks_since_last_dream": delta.get("new_chunks", 0), "new_chunks_since_last_dream": delta.get("new_chunks", 0),
"days_since_last_dream": round(delta.get("days_since_dream", 0), 2), "days_since_last_dream": round(delta.get("days_since_dream", 0), 2),
"substrate": "pgvector", "substrate": "pgvector",
+48 -6
View File
@@ -25,6 +25,30 @@ DEFAULT_CHUNK_SIZE = 500
DEFAULT_CHUNK_OVERLAP = 50 DEFAULT_CHUNK_OVERLAP = 50
def _docx_cell_paragraphs(cell):
yield from (p for p in cell.paragraphs if p.text.strip())
for nested in cell.tables:
for row in nested.rows:
for c in row.cells:
yield from _docx_cell_paragraphs(c)
def _pptx_shape_text(shape):
from pptx.enum.shapes import MSO_SHAPE_TYPE
parts = []
if shape.shape_type == MSO_SHAPE_TYPE.GROUP:
for sub in shape.shapes:
parts.extend(_pptx_shape_text(sub))
return parts
if hasattr(shape, "text") and shape.text.strip():
parts.append(shape.text)
if getattr(shape, "has_table", False):
for cell in shape.table.iter_cells():
if cell.text.strip():
parts.append(cell.text)
return parts
def extract_text(filepath: Path) -> str: def extract_text(filepath: Path) -> str:
"""Return the text of a supported file. Returns "" on any failure or """Return the text of a supported file. Returns "" on any failure or
unsupported extension. Does not write to ingest_failures — caller decides.""" unsupported extension. Does not write to ingest_failures — caller decides."""
@@ -32,7 +56,21 @@ def extract_text(filepath: Path) -> str:
try: try:
if suffix == ".docx": if suffix == ".docx":
doc = DocxDocument(filepath) doc = DocxDocument(filepath)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) parts = [p.text for p in doc.paragraphs if p.text.strip()]
for tbl in doc.tables:
for row in tbl.rows:
for cell in row.cells:
parts.extend(p.text for p in _docx_cell_paragraphs(cell))
for section in doc.sections:
parts.extend(p.text for p in section.header.paragraphs if p.text.strip())
parts.extend(p.text for p in section.footer.paragraphs if p.text.strip())
from docx.oxml.ns import qn
for txbx in doc.element.body.findall(".//" + qn("w:txbxContent")):
for p in txbx.findall(".//" + qn("w:p")):
text = "".join(t.text or "" for t in p.findall(".//" + qn("w:t")))
if text.strip():
parts.append(text)
return "\n".join(parts)
elif suffix == ".pdf": elif suffix == ".pdf":
reader = PdfReader(filepath) reader = PdfReader(filepath)
return "".join( return "".join(
@@ -41,11 +79,15 @@ def extract_text(filepath: Path) -> str:
) )
elif suffix == ".pptx": elif suffix == ".pptx":
prs = Presentation(filepath) prs = Presentation(filepath)
return "\n".join( parts = []
shape.text for slide in prs.slides for slide in prs.slides:
for shape in slide.shapes for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip() parts.extend(_pptx_shape_text(shape))
) if slide.has_notes_slide:
notes = slide.notes_slide.notes_text_frame.text
if notes.strip():
parts.append(notes)
return "\n".join(parts)
elif suffix in {".txt", ".md"}: elif suffix in {".txt", ".md"}:
return filepath.read_text(encoding="utf-8", errors="ignore") return filepath.read_text(encoding="utf-8", errors="ignore")
except Exception as e: except Exception as e:
+9 -3
View File
@@ -18,8 +18,14 @@ CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db")
PG_DSN = os.getenv("PG_DSN") PG_DSN = os.getenv("PG_DSN")
MIN_EXCHANGES = 3 MIN_EXCHANGES = 3
print("Loading embedding model...") _embedder = None
embedder = SentenceTransformer("all-MiniLM-L6-v2")
def get_embedder():
global _embedder
if _embedder is None:
print("Loading embedding model...")
_embedder = SentenceTransformer("all-MiniLM-L6-v2")
return _embedder
def get_conversations(): def get_conversations():
conn = sqlite3.connect(CONVERSATIONS_DB) conn = sqlite3.connect(CONVERSATIONS_DB)
@@ -123,7 +129,7 @@ def run():
# Embed and insert # Embed and insert
texts = [c[1] for c in new_chunks] texts = [c[1] for c in new_chunks]
embeddings = embedder.encode(texts, show_progress_bar=False).tolist() embeddings = get_embedder().encode(texts, show_progress_bar=False).tolist()
for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings): for (chunk_id, chunk_text, meta), embedding in zip(new_chunks, embeddings):
if not meta.get("type"): if not meta.get("type"):
+14 -3
View File
@@ -124,7 +124,7 @@ def resolve_ingest_failure(source: str):
def ingest_file(filepath: Path, embedder) -> int: def ingest_file(filepath: Path, embedder) -> int:
if filepath.name.startswith(("~$", ".")): if filepath.name.startswith(("~$", "~", ".")):
return 0 return 0
if filepath.suffix.lower() not in SUPPORTED: if filepath.suffix.lower() not in SUPPORTED:
return 0 return 0
@@ -168,6 +168,7 @@ def ingest_files(paths: list, embedder, state: dict) -> dict:
for path in paths: for path in paths:
count = ingest_file(path, embedder) count = ingest_file(path, embedder)
total += count total += count
if count > 0:
state[str(path)] = str(path.stat().st_mtime) state[str(path)] = str(path.stat().st_mtime)
log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.") log.info(f"Ingestion complete. {total} chunks across {len(paths)} files.")
return state return state
@@ -196,12 +197,18 @@ def get_changed_files(state: dict) -> list:
continue continue
if path.suffix.lower() not in SUPPORTED: if path.suffix.lower() not in SUPPORTED:
continue continue
if path.name.startswith((".", "~$")): if path.name.startswith((".", "~$", "~")):
continue continue
if "Admin/Backups" in str(path) or "Backups" in path.parts: if "Admin/Backups" in str(path) or "Backups" in path.parts:
continue continue
if "Journal/Media" in str(path): if "Journal/Media" in str(path):
continue continue
if "Generative Design" in path.parts and "Processing" in path.parts:
continue
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
continue
if path.stat().st_size == 0:
continue
if state.get(str(path)) != str(path.stat().st_mtime): if state.get(str(path)) != str(path.stat().st_mtime):
changed.append(path) changed.append(path)
return changed return changed
@@ -280,12 +287,16 @@ class IngestHandler(FileSystemEventHandler):
self.last_event = 0 self.last_event = 0
def _should_ignore(self, path: Path) -> bool: def _should_ignore(self, path: Path) -> bool:
if path.name.startswith((".", "~$")): if path.name.startswith((".", "~$", "~")):
return True return True
if "Admin/Backups" in str(path) or "Backups" in path.parts: if "Admin/Backups" in str(path) or "Backups" in path.parts:
return True return True
if "Journal/Media" in str(path): if "Journal/Media" in str(path):
return True return True
if "Generative Design" in path.parts and "Processing" in path.parts:
return True
if "Computational Design 2017" in path.parts and "Student Work" in path.parts:
return True
return False return False
def on_created(self, event): def on_created(self, event):