import os import json import sqlite3 import subprocess import hashlib from pathlib import Path from datetime import datetime from dotenv import load_dotenv from sentence_transformers import SentenceTransformer import anthropic from fastapi import FastAPI, Request, Response, Depends, HTTPException import psycopg2 import psycopg2.extras from fastapi import UploadFile, File import tempfile import os try: from faster_whisper import WhisperModel HAS_WHISPER = True except ImportError: HAS_WHISPER = False from fastapi.responses import FileResponse, JSONResponse import secrets import hashlib from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import uvicorn import asyncio from fastapi.responses import StreamingResponse load_dotenv(Path.home() / "aaronai" / ".env") MEMORY_PATH = Path.home() / "aaronai" / "memory.md" DB_PATH = str(Path.home() / "aaronai" / "db") CONVERSATIONS_DB = str(Path.home() / "aaronai" / "conversations.db") SETTINGS_PATH = Path.home() / "aaronai" / "settings.json" WATCHER_LOG = str(Path.home() / "aaronai" / "watcher.log") WATCHER_STATE = str(Path.home() / "aaronai" / "watcher_state.json") NEXTCLOUD_PATH = "/home/aaron/nextcloud/data/data/aaron/files" INGEST_SCRIPT = str(Path.home() / "aaronai" / "scripts" / "ingest.py") PYTHON = str(Path.home() / "aaronai" / "venv" / "bin" / "python3") DEFAULT_SETTINGS = { "theme": "light", "font_size": "medium", "web_search": True, "show_sources": True, } print("Loading Aaron AI...") PG_DSN = os.getenv("PG_DSN", "dbname=aaronai user=aaronai password=aaronai_db_password host=localhost") def get_pg(): return psycopg2.connect(PG_DSN) WHISPER_PROMPT = ( "Grasshopper, Rhino, PolyJet, SLA, FDM, DMLS, ChromaDB, " "HVAMC, FWN3D, Mossygear, Nextcloud, Gitea, computational design, " "additive manufacturing, parametric, fabrication" ) whisper_model = None if HAS_WHISPER: try: whisper_model = WhisperModel("small", device="cpu", compute_type="int8", cpu_threads=4) print("Whisper model loaded") except Exception as e: print(f"Whisper not available: {e}") embedder = SentenceTransformer("all-MiniLM-L6-v2") # ChromaDB removed — using pgvector anthropic_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) SYSTEM_PROMPT = """You are the personal AI assistant of Aaron Nelson — computational designer, fabrication researcher, program builder, and creative practitioner based in the Hudson Valley. Aaron's work sits at the intersection of computational geometry, additive manufacturing, and physical making. He resolves complex systems into physical reality — from Grasshopper definitions to large-scale steel structures, from archival photographs to 3D-printed architectural restorations, from product concepts to manufactured goods. This throughline — computation resolving into physical form — defines his practice across academic, consulting, and creative contexts. He built the Hudson Valley Additive Manufacturing Center (HVAMC) from nothing and has directed it since 2016, alongside the DDF academic program at SUNY New Paltz. He has the skills of a founder — equipment selection, policy creation, client development, grant writing, curriculum design — operating within an academic structure. He consults with IBM, Braskem, Selux and others through FWN3D. He runs Mossygear as a product business. He makes large-scale fabricated art. His communication style is direct, precise, and intolerant of padding or overclaiming. He flags inaccuracies immediately and expects the same standard from you. When helping him write, match his voice — economical, specific, never performative. When answering questions, cite sources and acknowledge uncertainty rather than filling gaps with plausible-sounding content. You have access to his complete document corpus, conversation history, and a persistent memory file that carries his current context. Treat the memory file as ground truth for his present situation. Use web search automatically when current information is needed. Never re-brief on context that's already in memory or documents.""" # Auth configuration import os SESSION_PASSWORD = os.getenv("AARON_AI_PASSWORD", "changeme") SESSIONS_DB = str(Path.home() / "aaronai" / "sessions.db") def _init_sessions(): conn = sqlite3.connect(SESSIONS_DB) conn.execute("CREATE TABLE IF NOT EXISTS sessions (token TEXT PRIMARY KEY, created_at TEXT)") conn.commit() conn.close() _init_sessions() def make_session_token() -> str: return secrets.token_urlsafe(32) def hash_password(password: str) -> str: return hashlib.sha256(password.encode()).hexdigest() def save_session(token: str): conn = sqlite3.connect(SESSIONS_DB) conn.execute("INSERT OR REPLACE INTO sessions VALUES (?, ?)", (token, datetime.now().isoformat())) conn.commit() conn.close() def delete_session(token: str): conn = sqlite3.connect(SESSIONS_DB) conn.execute("DELETE FROM sessions WHERE token = ?", (token,)) conn.commit() conn.close() def session_exists(token: str) -> bool: conn = sqlite3.connect(SESSIONS_DB) row = conn.execute("SELECT 1 FROM sessions WHERE token = ?", (token,)).fetchone() conn.close() return row is not None def get_session(request: Request) -> str | None: return request.cookies.get("aaronai_session") def require_auth(request: Request): token = get_session(request) if not token or not session_exists(token): raise HTTPException(status_code=401, detail="Not authenticated") return token CV_SOURCES = ["Aaron Nelson CV 2024.pdf", "Aaron Nelson CV 2025.pdf", "Aaron Nelson - CV.docx"] def init_conversations_db(): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS conversations ( id TEXT PRIMARY KEY, title TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, model TEXT DEFAULT 'claude-sonnet-4-6', message_count INTEGER DEFAULT 0 )''') c.execute('''CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, conversation_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, sources TEXT DEFAULT '[]', timestamp TEXT NOT NULL, FOREIGN KEY (conversation_id) REFERENCES conversations(id) )''') conn.commit() conn.close() init_conversations_db() def load_settings(): if SETTINGS_PATH.exists(): try: s = json.loads(SETTINGS_PATH.read_text()) return {**DEFAULT_SETTINGS, **s} except: pass return DEFAULT_SETTINGS.copy() def save_settings(settings): SETTINGS_PATH.write_text(json.dumps(settings, indent=2)) def load_memory(): if MEMORY_PATH.exists(): return MEMORY_PATH.read_text(encoding="utf-8") return "" def save_memory(content): MEMORY_PATH.write_text(content, encoding="utf-8") def add_to_memory(item): memory = load_memory() timestamp = datetime.now().strftime("%Y-%m-%d") note = f"\n- [{timestamp}] {item}" if "## Notes" not in memory: memory += "\n\n## Notes" memory += note save_memory(memory) def remove_from_memory(item): memory = load_memory() lines = memory.split("\n") filtered = [l for l in lines if item.lower() not in l.lower()] save_memory("\n".join(filtered)) return len(lines) - len(filtered) def get_pinned_cv_context(): try: pg = get_pg() cur = pg.cursor() cur.execute( "SELECT document, source FROM embeddings WHERE source = ANY(%s)", (CV_SOURCES,) ) rows = cur.fetchall() pg.close() docs = [r[0] for r in rows] metas = [{"source": r[1]} for r in rows] return docs, metas except: return [], [] def is_professional_query(query): keywords = ["grant", "publication", "exhibition", "award", "fellowship", "experience", "position", "job", "career", "cv", "resume", "research", "work history", "accomplishment", "teaching", "course", "client", "consultation", "presentation", "workshop", "education", "degree", "institution", "service", "committee"] return any(k in query.lower() for k in keywords) def retrieve_context(query, n_results=8): query_embedding = embedder.encode([query]).tolist()[0] context_pieces = [] sources = [] if is_professional_query(query): cv_docs, cv_metas = get_pinned_cv_context() for doc, meta in zip(cv_docs, cv_metas): context_pieces.append(f"[CV] {doc}") sources.append(meta.get("source", "CV")) try: pg = get_pg() cur = pg.cursor() cur.execute(""" SELECT document, source, 1 - (embedding <=> %s::vector) as similarity FROM embeddings WHERE source NOT IN %s ORDER BY embedding <=> %s::vector LIMIT %s """, (query_embedding, tuple(CV_SOURCES) if CV_SOURCES else ('__none__',), query_embedding, n_results)) for doc, source, similarity in cur.fetchall(): if similarity > 0.3: context_pieces.append(doc) sources.append(source or "unknown") pg.close() except Exception as e: print(f"pgvector retrieval error: {e}") return context_pieces, sources def get_conversation_history(conversation_id, limit=20): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute('''SELECT role, content FROM messages WHERE conversation_id = ? ORDER BY timestamp DESC LIMIT ?''', (conversation_id, limit)) rows = c.fetchall() conn.close() return [{"role": r[0], "content": r[1]} for r in reversed(rows)] def save_message(conversation_id, role, content, sources=None): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() msg_id = hashlib.md5(f"{conversation_id}{role}{datetime.now().isoformat()}".encode()).hexdigest() timestamp = datetime.now().isoformat() c.execute('''INSERT INTO messages (id, conversation_id, role, content, sources, timestamp) VALUES (?, ?, ?, ?, ?, ?)''', (msg_id, conversation_id, role, content, json.dumps(sources or []), timestamp)) c.execute('''UPDATE conversations SET updated_at = ?, message_count = message_count + 1 WHERE id = ?''', (timestamp, conversation_id)) conn.commit() conn.close() def create_conversation(title="New conversation"): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() conv_id = hashlib.md5(f"{datetime.now().isoformat()}".encode()).hexdigest()[:16] now = datetime.now().isoformat() c.execute('''INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)''', (conv_id, title, now, now)) conn.commit() conn.close() return conv_id def chat(user_message, conversation_id, settings): memory = load_memory() context_pieces, sources = retrieve_context(user_message) history = get_conversation_history(conversation_id) context_parts = [] if memory: context_parts.append(f"Aaron's persistent memory:\n\n{memory}") if context_pieces: context_str = "\n\n---\n\n".join(context_pieces) unique_sources = list(set(sources)) context_parts.append( f"Relevant excerpts from Aaron's documents:\n\n{context_str}\n\nSources: {', '.join(unique_sources)}" ) context_block = "\n\n====\n\n".join(context_parts) + "\n\n---\n\n" if context_parts else "" full_message = context_block + user_message messages = history + [{"role": "user", "content": full_message}] tools = [{"type": "web_search_20250305", "name": "web_search"}] if settings.get("web_search", True) else [] while True: kwargs = { "model": "claude-sonnet-4-6", "max_tokens": 2048, "system": SYSTEM_PROMPT, "messages": messages } if tools: kwargs["tools"] = tools response = anthropic_client.messages.create(**kwargs) if response.stop_reason == "tool_use": messages.append({"role": "assistant", "content": response.content}) tool_results = [] for block in response.content: if block.type == "tool_use": tool_results.append({ "type": "tool_result", "tool_use_id": block.id, "content": "Search completed" }) messages.append({"role": "user", "content": tool_results}) else: assistant_message = "" for block in response.content: if hasattr(block, "text"): assistant_message += block.text return assistant_message, list(set(sources)) app = FastAPI() app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @app.post("/auth/login") async def login(request: Request, response: Response): data = await request.json() password = data.get("password", "") if hash_password(password) != hash_password(SESSION_PASSWORD): raise HTTPException(status_code=401, detail="Invalid password") token = make_session_token() save_session(token) response.set_cookie( key="aaronai_session", value=token, httponly=True, secure=True, samesite="lax", max_age=60 * 60 * 24 * 30 ) response.body = b'{"ok": true}' response.status_code = 200 response.media_type = "application/json" return response @app.post("/auth/logout") async def logout(request: Request, response: Response): token = get_session(request) if token: delete_session(token) response.delete_cookie("aaronai_session") return JSONResponse({"ok": True}) @app.get("/auth/check") async def check_auth(request: Request): token = get_session(request) if not token or token not in SESSIONS: return JSONResponse({"authenticated": False}) return JSONResponse({"authenticated": True}) @app.get("/", response_class=FileResponse) async def index(): return FileResponse("/home/aaron/aaronai/static/index.html") @app.get("/api/settings") async def get_settings(auth: str = Depends(require_auth)): return JSONResponse(load_settings()) @app.post("/api/settings") async def update_settings(request: Request, auth: str = Depends(require_auth)): data = await request.json() settings = load_settings() settings.update(data) save_settings(settings) return JSONResponse(settings) @app.get("/api/conversations") async def list_conversations(auth: str = Depends(require_auth)): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute('''SELECT id, title, created_at, updated_at, message_count FROM conversations ORDER BY updated_at DESC LIMIT 100''') rows = c.fetchall() conn.close() return JSONResponse([{ "id": r[0], "title": r[1], "created_at": r[2], "updated_at": r[3], "message_count": r[4] } for r in rows]) @app.post("/api/conversations") async def new_conversation(request: Request, auth: str = Depends(require_auth)): data = await request.json() title = data.get("title", "New conversation") conv_id = create_conversation(title) return JSONResponse({"id": conv_id, "title": title}) @app.get("/api/conversations/{conv_id}/messages") async def get_messages(conv_id: str, auth: str = Depends(require_auth)): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute('''SELECT role, content, sources, timestamp FROM messages WHERE conversation_id = ? ORDER BY timestamp ASC''', (conv_id,)) rows = c.fetchall() conn.close() return JSONResponse([{ "role": r[0], "content": r[1], "sources": json.loads(r[2]), "timestamp": r[3] } for r in rows]) @app.patch("/api/conversations/{conv_id}") async def rename_conversation(conv_id: str, request: Request, auth: str = Depends(require_auth)): data = await request.json() title = data.get("title", "") if not title: return JSONResponse({"error": "Title required"}, status_code=400) conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("UPDATE conversations SET title = ? WHERE id = ?", (title, conv_id)) conn.commit() conn.close() return JSONResponse({"id": conv_id, "title": title}) @app.delete("/api/conversations/{conv_id}") async def delete_conversation(conv_id: str, auth: str = Depends(require_auth)): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("DELETE FROM messages WHERE conversation_id = ?", (conv_id,)) c.execute("DELETE FROM conversations WHERE id = ?", (conv_id,)) conn.commit() conn.close() return JSONResponse({"deleted": conv_id}) @app.post("/api/chat") async def chat_endpoint(request: Request, auth: str = Depends(require_auth)): data = await request.json() user_message = data.get("message", "").strip() conversation_id = data.get("conversation_id", "") settings = load_settings() if not user_message: return JSONResponse({"error": "Empty message"}) if not conversation_id: conversation_id = create_conversation("New conversation") stripped = user_message.strip().lower() if stripped == "show memory": return JSONResponse({"response": load_memory(), "sources": [], "conversation_id": conversation_id}) if stripped.startswith("remember:"): item = user_message[9:].strip() add_to_memory(item) save_message(conversation_id, "user", user_message) save_message(conversation_id, "assistant", f"Saved to memory: '{item}'") return JSONResponse({"response": f"Saved to memory: '{item}'", "sources": [], "conversation_id": conversation_id}) if stripped.startswith("forget:"): item = user_message[7:].strip() removed = remove_from_memory(item) msg = f"Removed {removed} line(s) containing '{item}'" if removed else f"Nothing found containing '{item}'" save_message(conversation_id, "user", user_message) save_message(conversation_id, "assistant", msg) return JSONResponse({"response": msg, "sources": [], "conversation_id": conversation_id}) save_message(conversation_id, "user", user_message) # Auto-title conversation from first message conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("SELECT message_count, title FROM conversations WHERE id = ?", (conversation_id,)) row = c.fetchone() conn.close() if row and row[0] <= 1 and row[1] == "New conversation": auto_title = user_message[:60] + ("..." if len(user_message) > 60 else "") conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("UPDATE conversations SET title = ? WHERE id = ?", (auto_title, conversation_id)) conn.commit() conn.close() response, sources = chat(user_message, conversation_id, settings) save_message(conversation_id, "assistant", response, sources if settings.get("show_sources") else []) return JSONResponse({ "response": response, "sources": sources if settings.get("show_sources") else [], "conversation_id": conversation_id }) @app.get("/api/memory") async def get_memory(auth: str = Depends(require_auth)): return JSONResponse({"content": load_memory()}) @app.post("/api/memory") async def update_memory(request: Request, auth: str = Depends(require_auth)): data = await request.json() content = data.get("content", "") save_memory(content) return JSONResponse({"saved": True}) @app.get("/api/status") async def get_status(auth: str = Depends(require_auth)): try: pg = get_pg() cur = pg.cursor() cur.execute("SELECT COUNT(*) FROM embeddings") chunk_count = cur.fetchone()[0] pg.close() except: chunk_count = 0 # Watcher status watcher_running = False last_indexed = "Unknown" try: import time as _time, json as _json _sp = Path("/home/aaron/aaronai/watcher_status.json") if _sp.exists(): _s = _json.loads(_sp.read_text()) _age = _time.time() - _s.get("timestamp", 0) watcher_running = _s.get("running", False) and _age < 30 except: pass try: log_path = Path(WATCHER_LOG) if log_path.exists(): lines = log_path.read_text().strip().split("\n") for line in reversed(lines): if "Ingestion complete" in line: last_indexed = line.split(" - ")[0].strip() break except: pass # File count from watcher state file_count = 0 try: state_path = Path(WATCHER_STATE) if state_path.exists(): state = json.loads(state_path.read_text()) file_count = len(state) else: # Count files in Nextcloud directly nc_path = Path(NEXTCLOUD_PATH) if nc_path.exists(): file_count = sum(1 for f in nc_path.rglob("*") if f.is_file() and f.suffix.lower() in {'.pdf','.docx','.pptx','.txt','.md'}) except: pass # Conversation count conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("SELECT COUNT(*) FROM conversations") conv_count = c.fetchone()[0] conn.close() return JSONResponse({ "aaron_ai": "running", "watcher": "running" if watcher_running else "stopped", "chunk_count": chunk_count, "file_count": file_count, "last_indexed": last_indexed, "conversation_count": conv_count, "model": "claude-sonnet-4-6", "nextcloud_path": NEXTCLOUD_PATH }) @app.post("/api/transcribe") async def transcribe_audio(request: Request, audio: UploadFile = File(...), auth: str = Depends(require_auth)): if not whisper_model: raise HTTPException(status_code=503, detail="Whisper not available") try: suffix = ".webm" if audio.content_type and "mp4" in audio.content_type: suffix = ".mp4" elif audio.content_type and "ogg" in audio.content_type: suffix = ".ogg" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: content = await audio.read() tmp.write(content) tmp_path = tmp.name segments, info = whisper_model.transcribe( tmp_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT ) transcript = " ".join(s.text.strip() for s in segments) os.unlink(tmp_path) return JSONResponse({"text": transcript, "language": info.language}) except Exception as e: if os.path.exists(tmp_path): os.unlink(tmp_path) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/dreamer/status") async def dreamer_status(auth: str = Depends(require_auth)): try: state_path = Path.home() / "aaronai" / "dreamer_state.json" if state_path.exists(): state = json.loads(state_path.read_text()) else: state = {} last_ts = state.get("last_dream_timestamp", 0) last_dt = datetime.fromtimestamp(last_ts).strftime("%Y-%m-%d %H:%M") if last_ts else "never" return JSONResponse({ "last_dream": last_dt, "last_mode": state.get("last_dream_mode", "none"), "last_file": state.get("last_dream_file", ""), }) except Exception as e: return JSONResponse({"last_dream": "unknown", "last_mode": "none", "last_file": "", "error": str(e)}) @app.post("/api/dreamer/run") async def run_dreamer(request: Request, auth: str = Depends(require_auth)): try: body = await request.json() mode = body.get("mode", "nrem") task = body.get("task", None) dream_script = str(Path.home() / "aaronai" / "scripts" / "dream.py") cmd = [PYTHON, dream_script, "--mode", mode] if task: cmd += ["--task", task] subprocess.Popen(cmd, cwd=str(Path.home() / "aaronai")) return JSONResponse({"started": True, "mode": mode}) except Exception as e: return JSONResponse({"started": False, "error": str(e)}) @app.post("/api/capture") async def capture_audio(audio: UploadFile = File(...)): """Auth-free capture endpoint — saves transcribed audio to Nextcloud Journal/Captures/""" if not whisper_model: raise HTTPException(status_code=503, detail="Whisper not available") tmp_path = None try: suffix = ".webm" if audio.content_type and "mp4" in audio.content_type: suffix = ".mp4" elif audio.content_type and "ogg" in audio.content_type: suffix = ".ogg" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: content = await audio.read() tmp.write(content) tmp_path = tmp.name segments, info = whisper_model.transcribe( tmp_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT ) transcript = " ".join(s.text.strip() for s in segments).strip() os.unlink(tmp_path) tmp_path = None if not transcript: return JSONResponse({"ok": False, "error": "No speech detected"}) # Save to Nextcloud Journal/Captures/ via WebDAV import requests as req_lib from datetime import datetime timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") filename = f"{timestamp}-voice.md" content_md = f"# Capture — {timestamp}\n\n**type:** voice\n**modality:** audio\n**status:** unprocessed\n\n---\n\n{transcript}\n" nextcloud_url = os.getenv("NEXTCLOUD_URL", "") nextcloud_user = os.getenv("NEXTCLOUD_USER", "aaron") nextcloud_password = os.getenv("NEXTCLOUD_PASSWORD", "") captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures" auth = (nextcloud_user, nextcloud_password) req_lib.request("MKCOL", captures_dir, auth=auth, timeout=10) url = f"{captures_dir}/{filename}" response = req_lib.put(url, data=content_md.encode("utf-8"), auth=auth, timeout=30) response.raise_for_status() return JSONResponse({"ok": True, "filename": filename, "transcript": transcript}) except Exception as e: if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/captures") async def list_captures(): """Returns recent captures from Nextcloud Journal/Captures/ — auth-free""" try: import requests as req_lib nextcloud_url = os.getenv("NEXTCLOUD_URL", "") nextcloud_user = os.getenv("NEXTCLOUD_USER", "aaron") nextcloud_password = os.getenv("NEXTCLOUD_PASSWORD", "") captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures" auth = (nextcloud_user, nextcloud_password) propfind = req_lib.request("PROPFIND", captures_dir, auth=auth, timeout=10, headers={"Depth": "1"}) if propfind.status_code == 404: return JSONResponse({"captures": []}) import xml.etree.ElementTree as ET root = ET.fromstring(propfind.text) ns = {"d": "DAV:"} captures = [] for resp in root.findall("d:response", ns): href = resp.findtext("d:href", namespaces=ns) or "" if href.endswith("/"): continue name = href.split("/")[-1] if not name.endswith(".md"): continue captures.append({"name": name.replace(".md", ""), "duration": ""}) captures.sort(key=lambda x: x["name"], reverse=True) return JSONResponse({"captures": captures[:10]}) except Exception as e: return JSONResponse({"captures": []}) @app.post("/api/reindex") async def trigger_reindex(auth: str = Depends(require_auth)): try: subprocess.Popen([PYTHON, INGEST_SCRIPT, NEXTCLOUD_PATH]) return JSONResponse({"started": True, "message": "Re-indexing started in background"}) except Exception as e: return JSONResponse({"started": False, "error": str(e)}) @app.delete("/api/conversations") async def clear_all_conversations(auth: str = Depends(require_auth)): conn = sqlite3.connect(CONVERSATIONS_DB) c = conn.cursor() c.execute("DELETE FROM messages") c.execute("DELETE FROM conversations") conn.commit() conn.close() return JSONResponse({"cleared": True}) # SSE client registry sse_clients: list[asyncio.Queue] = [] async def sse_generator(queue: asyncio.Queue): try: yield 'data: {"type": "connected"}\n\n' while True: try: event = await asyncio.wait_for(queue.get(), timeout=30.0) import json as _json yield 'data: ' + _json.dumps(event) + '\n\n' except asyncio.TimeoutError: yield 'data: {"type": "heartbeat"}\n\n' except asyncio.CancelledError: pass finally: if queue in sse_clients: sse_clients.remove(queue) @app.get("/api/events") async def sse_endpoint(request: Request, auth: str = Depends(require_auth)): queue: asyncio.Queue = asyncio.Queue() sse_clients.append(queue) return StreamingResponse( sse_generator(queue), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive", } ) @app.post("/api/events/notify") async def notify_clients(request: Request): """Internal endpoint — called by dream.py when a dream is delivered""" # Only allow from localhost client_host = request.client.host if request.client else "" if client_host not in ("127.0.0.1", "::1", "localhost"): raise HTTPException(status_code=403, detail="Internal only") data = await request.json() for queue in sse_clients: await queue.put(data) return JSONResponse({"notified": len(sse_clients)}) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)