Image capture — extend /api/capture for image+voice, Claude vision description, Media/ WebDAV, watcher excludes Media/

This commit is contained in:
2026-04-27 04:28:31 +00:00
parent ef2fddc47f
commit d3239aba17
2 changed files with 174 additions and 55 deletions
+172 -55
View File
@@ -11,7 +11,7 @@ import anthropic
from fastapi import FastAPI, Request, Response, Depends, HTTPException from fastapi import FastAPI, Request, Response, Depends, HTTPException
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
from fastapi import UploadFile, File from fastapi import UploadFile, File, Form
import tempfile import tempfile
import os import os
try: try:
@@ -683,59 +683,178 @@ async def run_dreamer(request: Request, auth: str = Depends(require_auth)):
return JSONResponse({"started": False, "error": str(e)}) return JSONResponse({"started": False, "error": str(e)})
@app.post("/api/capture") @app.post("/api/capture")
async def capture_audio(audio: UploadFile = File(...)): async def capture_endpoint(
"""Auth-free capture endpoint — saves transcribed audio to Nextcloud Journal/Captures/""" audio: UploadFile = File(None),
if not whisper_model: image: UploadFile = File(None),
raise HTTPException(status_code=503, detail="Whisper not available") project: str = Form(None),
tmp_path = None ):
try: """Auth-free capture endpoint — handles voice, image, or image+voice."""
suffix = ".webm" import requests as req_lib
if audio.content_type and "mp4" in audio.content_type: import base64
suffix = ".mp4"
elif audio.content_type and "ogg" in audio.content_type:
suffix = ".ogg"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
content = await audio.read()
tmp.write(content)
tmp_path = tmp.name
segments, info = whisper_model.transcribe( nextcloud_url = os.getenv("NEXTCLOUD_URL", "")
tmp_path, nextcloud_user = os.getenv("NEXTCLOUD_USER", "aaron")
language="en", nextcloud_password = os.getenv("NEXTCLOUD_PASSWORD", "")
vad_filter=True, nc_auth = (nextcloud_user, nextcloud_password)
initial_prompt=WHISPER_PROMPT timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
) month_dir = datetime.now().strftime("%Y-%m")
transcript = " ".join(s.text.strip() for s in segments).strip()
os.unlink(tmp_path) # ── Image + optional voice ───────────────────────────────────────────────
if image is not None:
tmp_audio_path = None
try:
# Read image bytes
image_bytes = await image.read()
image_content_type = image.content_type or "image/jpeg"
# Determine extension
ext_map = {"image/jpeg": "jpg", "image/png": "png", "image/webp": "webp", "image/heic": "jpg"}
img_ext = ext_map.get(image_content_type, "jpg")
img_filename = f"{timestamp}-image.{img_ext}"
# Save raw image to Media/YYYY-MM/ via WebDAV
media_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Media/{month_dir}"
req_lib.request("MKCOL", f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Media", auth=nc_auth, timeout=10)
req_lib.request("MKCOL", media_dir, auth=nc_auth, timeout=10)
media_url = f"{media_dir}/{img_filename}"
req_lib.put(media_url, data=image_bytes, auth=nc_auth,
headers={"Content-Type": image_content_type}, timeout=60)
# Transcribe voice annotation if present
voice_annotation = None
if audio is not None and whisper_model:
audio_bytes = await audio.read()
suffix = ".webm"
if audio.content_type and "mp4" in audio.content_type:
suffix = ".mp4"
elif audio.content_type and "ogg" in audio.content_type:
suffix = ".ogg"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(audio_bytes)
tmp_audio_path = tmp.name
segments, _ = whisper_model.transcribe(
tmp_audio_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT
)
voice_annotation = " ".join(s.text.strip() for s in segments).strip() or None
os.unlink(tmp_audio_path)
tmp_audio_path = None
# Generate Claude vision description
image_b64 = base64.standard_b64encode(image_bytes).decode("utf-8")
annotation_line = f"Aaron said about this image: \"{voice_annotation}\"" if voice_annotation else ""
vision_prompt = f"""You are generating a memory description for an AI corpus belonging to Aaron Nelson — computational designer, fabrication researcher, and visual artist working in the Hudson Valley.
Describe this image for long-term memory indexing.
PERCEPTUAL: Composition, materials, light, color, texture, scale, spatial relationships. Be specific enough that this image could be distinguished from visually similar images.
CONTENT: What is this? What domain does it belong to? What is it an instance of?
{annotation_line}
End your response with a single line in this exact format:
ENTITIES: [comma-separated list of key entities — people, objects, materials, places, projects, tools]
Keep the full description to 150-250 words. Do not speculate beyond what is visible or stated. Write as continuous prose followed by the ENTITIES line."""
vision_response = anthropic_client.messages.create(
model="claude-sonnet-4-6",
max_tokens=800,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": image_content_type,
"data": image_b64,
}
},
{"type": "text", "text": vision_prompt}
]
}]
)
description = vision_response.content[0].text.strip()
# Build rich Graphiti-ready episode markdown
capture_type = "image+voice" if voice_annotation else "image"
modality = "visual+audio" if voice_annotation else "visual"
media_path = f"Journal/Media/{month_dir}/{img_filename}"
content_md = f"""# Capture — Image — {timestamp}
**type:** {capture_type}
**modality:** {modality}
**status:** unprocessed
**media:** {media_path}
{f"**project:** {project}" if project else ""}
---
**Visual description:**
{description}
**Voice annotation:**
{voice_annotation if voice_annotation else "none recorded"}
---
"""
# Save description to Journal/Captures/ via WebDAV
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
cap_filename = f"{timestamp}-image.md"
cap_url = f"{captures_dir}/{cap_filename}"
req_lib.put(cap_url, data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
return JSONResponse({
"ok": True,
"filename": cap_filename,
"media": media_path,
"has_voice": voice_annotation is not None,
})
except Exception as e:
if tmp_audio_path and os.path.exists(tmp_audio_path):
os.unlink(tmp_audio_path)
raise HTTPException(status_code=500, detail=str(e))
# ── Voice only ───────────────────────────────────────────────────────────
elif audio is not None:
if not whisper_model:
raise HTTPException(status_code=503, detail="Whisper not available")
tmp_path = None tmp_path = None
try:
if not transcript: suffix = ".webm"
return JSONResponse({"ok": False, "error": "No speech detected"}) if audio.content_type and "mp4" in audio.content_type:
suffix = ".mp4"
# Save to Nextcloud Journal/Captures/ via WebDAV elif audio.content_type and "ogg" in audio.content_type:
import requests as req_lib suffix = ".ogg"
from datetime import datetime with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") content_bytes = await audio.read()
filename = f"{timestamp}-voice.md" tmp.write(content_bytes)
content_md = f"# Capture — {timestamp}\n\n**type:** voice\n**modality:** audio\n**status:** unprocessed\n\n---\n\n{transcript}\n" tmp_path = tmp.name
segments, _ = whisper_model.transcribe(
nextcloud_url = os.getenv("NEXTCLOUD_URL", "") tmp_path, language="en", vad_filter=True, initial_prompt=WHISPER_PROMPT
nextcloud_user = os.getenv("NEXTCLOUD_USER", "aaron") )
nextcloud_password = os.getenv("NEXTCLOUD_PASSWORD", "") transcript = " ".join(s.text.strip() for s in segments).strip()
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
auth = (nextcloud_user, nextcloud_password)
req_lib.request("MKCOL", captures_dir, auth=auth, timeout=10)
url = f"{captures_dir}/{filename}"
response = req_lib.put(url, data=content_md.encode("utf-8"), auth=auth, timeout=30)
response.raise_for_status()
return JSONResponse({"ok": True, "filename": filename, "transcript": transcript})
except Exception as e:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path) os.unlink(tmp_path)
raise HTTPException(status_code=500, detail=str(e)) tmp_path = None
if not transcript:
return JSONResponse({"ok": False, "error": "No speech detected"})
filename = f"{timestamp}-voice.md"
content_md = f"# Capture — {timestamp}\n\n**type:** voice\n**modality:** audio\n**status:** unprocessed\n\n---\n\n{transcript}\n"
captures_dir = f"{nextcloud_url}/remote.php/dav/files/{nextcloud_user}/Journal/Captures"
req_lib.request("MKCOL", captures_dir, auth=nc_auth, timeout=10)
url = f"{captures_dir}/{filename}"
req_lib.put(url, data=content_md.encode("utf-8"), auth=nc_auth, timeout=30)
return JSONResponse({"ok": True, "filename": filename, "transcript": transcript})
except Exception as e:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
raise HTTPException(status_code=500, detail=str(e))
else:
raise HTTPException(status_code=400, detail="No audio or image provided")
@app.get("/api/captures") @app.get("/api/captures")
async def list_captures(): async def list_captures():
@@ -795,14 +914,12 @@ async def clear_all_conversations(auth: str = Depends(require_auth)):
scheduler = BackgroundScheduler() scheduler = BackgroundScheduler()
def run_dream_job(): def run_dream_job():
"""Runs nightly dreamer — reuses loaded embedder, no subprocess overhead.""" """Runs nightly dreamer — full interdependent pipeline, no mode flag."""
try: try:
import subprocess import subprocess
settings = load_settings()
mode = settings.get("dream_mode", "nrem")
dream_script = str(Path.home() / "aaronai" / "scripts" / "dream.py") dream_script = str(Path.home() / "aaronai" / "scripts" / "dream.py")
result = subprocess.run( result = subprocess.run(
[PYTHON, dream_script, "--mode", mode], [PYTHON, dream_script],
cwd=str(Path.home() / "aaronai"), cwd=str(Path.home() / "aaronai"),
capture_output=True, text=True, timeout=600 capture_output=True, text=True, timeout=600
) )
+2
View File
@@ -96,6 +96,8 @@ class IngestHandler(FileSystemEventHandler):
return return
if 'Admin/Backups' in str(path) or 'Backups' in path.parts: if 'Admin/Backups' in str(path) or 'Backups' in path.parts:
return return
if 'Journal/Media' in str(path):
return
self.pending = True self.pending = True
self.last_event = time.time() self.last_event = time.time()