APScheduler — replace systemd timers, in-process dream and ingest scheduling
This commit is contained in:
+89
-1
@@ -28,6 +28,8 @@ from fastapi.middleware.cors import CORSMiddleware
|
|||||||
import uvicorn
|
import uvicorn
|
||||||
import asyncio
|
import asyncio
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
from apscheduler.triggers.cron import CronTrigger
|
||||||
|
|
||||||
load_dotenv(Path.home() / "aaronai" / ".env")
|
load_dotenv(Path.home() / "aaronai" / ".env")
|
||||||
|
|
||||||
@@ -46,6 +48,11 @@ DEFAULT_SETTINGS = {
|
|||||||
"font_size": "medium",
|
"font_size": "medium",
|
||||||
"web_search": True,
|
"web_search": True,
|
||||||
"show_sources": True,
|
"show_sources": True,
|
||||||
|
"dream_hour_utc": 8,
|
||||||
|
"dream_minute_utc": 0,
|
||||||
|
"dream_mode": "nrem",
|
||||||
|
"ingest_hour_utc": 2,
|
||||||
|
"ingest_minute_utc": 30,
|
||||||
}
|
}
|
||||||
|
|
||||||
print("Loading Aaron AI...")
|
print("Loading Aaron AI...")
|
||||||
@@ -350,7 +357,18 @@ def chat(user_message, conversation_id, settings):
|
|||||||
assistant_message += block.text
|
assistant_message += block.text
|
||||||
return assistant_message, list(set(sources))
|
return assistant_message, list(set(sources))
|
||||||
|
|
||||||
app = FastAPI()
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
reschedule_jobs()
|
||||||
|
scheduler.start()
|
||||||
|
print("Scheduler started")
|
||||||
|
yield
|
||||||
|
scheduler.shutdown()
|
||||||
|
print("Scheduler stopped")
|
||||||
|
|
||||||
|
app = FastAPI(lifespan=lifespan)
|
||||||
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
|
||||||
|
|
||||||
@app.post("/auth/login")
|
@app.post("/auth/login")
|
||||||
@@ -403,6 +421,10 @@ async def update_settings(request: Request, auth: str = Depends(require_auth)):
|
|||||||
settings = load_settings()
|
settings = load_settings()
|
||||||
settings.update(data)
|
settings.update(data)
|
||||||
save_settings(settings)
|
save_settings(settings)
|
||||||
|
# Reschedule if schedule settings changed
|
||||||
|
schedule_keys = {"dream_hour_utc","dream_minute_utc","dream_mode","ingest_hour_utc","ingest_minute_utc"}
|
||||||
|
if any(k in data for k in schedule_keys):
|
||||||
|
reschedule_jobs()
|
||||||
return JSONResponse(settings)
|
return JSONResponse(settings)
|
||||||
|
|
||||||
@app.get("/api/conversations")
|
@app.get("/api/conversations")
|
||||||
@@ -769,6 +791,72 @@ async def clear_all_conversations(auth: str = Depends(require_auth)):
|
|||||||
return JSONResponse({"cleared": True})
|
return JSONResponse({"cleared": True})
|
||||||
|
|
||||||
|
|
||||||
|
# ─── Scheduler ──────────────────────────────────────────────────────────────
|
||||||
|
scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
def run_dream_job():
|
||||||
|
"""Runs nightly dreamer — reuses loaded embedder, no subprocess overhead."""
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
settings = load_settings()
|
||||||
|
mode = settings.get("dream_mode", "nrem")
|
||||||
|
dream_script = str(Path.home() / "aaronai" / "scripts" / "dream.py")
|
||||||
|
result = subprocess.run(
|
||||||
|
[PYTHON, dream_script, "--mode", mode],
|
||||||
|
cwd=str(Path.home() / "aaronai"),
|
||||||
|
capture_output=True, text=True, timeout=600
|
||||||
|
)
|
||||||
|
print(f"Dreamer completed: {result.stdout[-200:] if result.stdout else 'no output'}")
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"Dreamer error: {result.stderr[-200:] if result.stderr else 'unknown'}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Dreamer job failed: {e}")
|
||||||
|
|
||||||
|
def run_ingest_job():
|
||||||
|
"""Runs nightly conversation indexing."""
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
ingest_script = str(Path.home() / "aaronai" / "scripts" / "ingest_conversations.py")
|
||||||
|
result = subprocess.run(
|
||||||
|
[PYTHON, ingest_script],
|
||||||
|
cwd=str(Path.home() / "aaronai"),
|
||||||
|
capture_output=True, text=True, timeout=300
|
||||||
|
)
|
||||||
|
print(f"Ingest completed: {result.stdout[-200:] if result.stdout else 'no output'}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Ingest job failed: {e}")
|
||||||
|
|
||||||
|
def reschedule_jobs():
|
||||||
|
"""Update scheduler from current settings."""
|
||||||
|
settings = load_settings()
|
||||||
|
# Remove existing jobs
|
||||||
|
for job_id in ("dream_job", "ingest_job"):
|
||||||
|
try:
|
||||||
|
scheduler.remove_job(job_id)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
# Add dream job
|
||||||
|
scheduler.add_job(
|
||||||
|
run_dream_job,
|
||||||
|
CronTrigger(hour=settings.get("dream_hour_utc", 8),
|
||||||
|
minute=settings.get("dream_minute_utc", 0),
|
||||||
|
timezone="UTC"),
|
||||||
|
id="dream_job",
|
||||||
|
max_instances=1,
|
||||||
|
replace_existing=True
|
||||||
|
)
|
||||||
|
# Add ingest job
|
||||||
|
scheduler.add_job(
|
||||||
|
run_ingest_job,
|
||||||
|
CronTrigger(hour=settings.get("ingest_hour_utc", 2),
|
||||||
|
minute=settings.get("ingest_minute_utc", 30),
|
||||||
|
timezone="UTC"),
|
||||||
|
id="ingest_job",
|
||||||
|
max_instances=1,
|
||||||
|
replace_existing=True
|
||||||
|
)
|
||||||
|
print(f"Scheduled: dream at {settings.get('dream_hour_utc',8):02d}:{settings.get('dream_minute_utc',0):02d} UTC, ingest at {settings.get('ingest_hour_utc',2):02d}:{settings.get('ingest_minute_utc',30):02d} UTC")
|
||||||
|
|
||||||
# SSE client registry
|
# SSE client registry
|
||||||
sse_clients: list[asyncio.Queue] = []
|
sse_clients: list[asyncio.Queue] = []
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user