capture: public SSE endpoint for transcription completion events
This commit is contained in:
@@ -724,6 +724,11 @@ def transcribe_and_save(tmp_path, timestamp, nextcloud_url, nextcloud_user, next
|
|||||||
"filename": filename,
|
"filename": filename,
|
||||||
"timestamp": timestamp,
|
"timestamp": timestamp,
|
||||||
}, timeout=3)
|
}, timeout=3)
|
||||||
|
_req.post("http://localhost:8000/api/captures/events/notify", json={
|
||||||
|
"type": "capture_saved",
|
||||||
|
"filename": filename,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
}, timeout=3)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -1018,6 +1023,7 @@ def reschedule_jobs():
|
|||||||
|
|
||||||
# SSE client registry
|
# SSE client registry
|
||||||
sse_clients: list[asyncio.Queue] = []
|
sse_clients: list[asyncio.Queue] = []
|
||||||
|
capture_sse_clients: list[asyncio.Queue] = []
|
||||||
|
|
||||||
async def sse_generator(queue: asyncio.Queue):
|
async def sse_generator(queue: asyncio.Queue):
|
||||||
try:
|
try:
|
||||||
@@ -1036,6 +1042,48 @@ async def sse_generator(queue: asyncio.Queue):
|
|||||||
if queue in sse_clients:
|
if queue in sse_clients:
|
||||||
sse_clients.remove(queue)
|
sse_clients.remove(queue)
|
||||||
|
|
||||||
|
async def capture_sse_generator(queue: asyncio.Queue):
|
||||||
|
try:
|
||||||
|
yield 'data: {"type": "connected"}\n\n'
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
event = await asyncio.wait_for(queue.get(), timeout=30.0)
|
||||||
|
import json as _json
|
||||||
|
yield 'data: ' + _json.dumps(event) + '\n\n'
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
yield 'data: {"type": "heartbeat"}\n\n'
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
if queue in capture_sse_clients:
|
||||||
|
capture_sse_clients.remove(queue)
|
||||||
|
|
||||||
|
@app.get("/api/captures/events")
|
||||||
|
async def capture_sse_endpoint(request: Request):
|
||||||
|
"""Public SSE endpoint for capture page — no auth required."""
|
||||||
|
queue: asyncio.Queue = asyncio.Queue()
|
||||||
|
capture_sse_clients.append(queue)
|
||||||
|
return StreamingResponse(
|
||||||
|
capture_sse_generator(queue),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.post("/api/captures/events/notify")
|
||||||
|
async def notify_capture_clients(request: Request):
|
||||||
|
"""Internal endpoint — called when transcription completes."""
|
||||||
|
client_host = request.client.host if request.client else ""
|
||||||
|
if client_host not in ("127.0.0.1", "::1", "localhost"):
|
||||||
|
raise HTTPException(status_code=403, detail="Internal only")
|
||||||
|
data = await request.json()
|
||||||
|
for queue in capture_sse_clients:
|
||||||
|
await queue.put(data)
|
||||||
|
return JSONResponse({"notified": len(capture_sse_clients)})
|
||||||
|
|
||||||
@app.get("/api/events")
|
@app.get("/api/events")
|
||||||
async def sse_endpoint(request: Request, auth: str = Depends(require_auth)):
|
async def sse_endpoint(request: Request, auth: str = Depends(require_auth)):
|
||||||
queue: asyncio.Queue = asyncio.Queue()
|
queue: asyncio.Queue = asyncio.Queue()
|
||||||
|
|||||||
Reference in New Issue
Block a user