From 7cd9f9ed48b022efc4875666f4b80efff566b2eb Mon Sep 17 00:00:00 2001 From: Teknium Date: Sun, 22 Mar 2026 04:06:57 -0700 Subject: [PATCH] feat(api-server): add /api/jobs endpoints for cron job management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRUD + actions for cron jobs on the existing API server (port 8642): GET /api/jobs — list jobs POST /api/jobs — create job GET /api/jobs/{id} — get job PATCH /api/jobs/{id} — update job DELETE /api/jobs/{id} — delete job POST /api/jobs/{id}/pause — pause job POST /api/jobs/{id}/resume — resume job POST /api/jobs/{id}/run — trigger immediate run All endpoints use existing API_SERVER_KEY auth. Job ID format validated (12 hex chars). Logic ported from PR #2111 by nock4, adapted from FastAPI to aiohttp on the existing API server. --- gateway/platforms/api_server.py | 179 ++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index bbe9f77f4..6da3e8104 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -630,6 +630,176 @@ class APIServerAdapter(BasePlatformAdapter): "deleted": True, }) + # ------------------------------------------------------------------ + # Cron jobs API + # ------------------------------------------------------------------ + + @staticmethod + def _validate_job_id(job_id: str) -> Optional[str]: + """Return error message if job_id is invalid, else None.""" + import re as _re + if not _re.fullmatch(r"[a-f0-9]{12}", job_id): + return "Invalid job ID format" + return None + + async def _handle_list_jobs(self, request: "web.Request") -> "web.Response": + """GET /api/jobs — list all cron jobs.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + try: + from cron.jobs import list_jobs + include_disabled = request.query.get("include_disabled", "").lower() in ("true", "1") + jobs = list_jobs(include_disabled=include_disabled) + return web.json_response({"jobs": jobs}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_create_job(self, request: "web.Request") -> "web.Response": + """POST /api/jobs — create a new cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + try: + from cron.jobs import create_job + body = await request.json() + name = body.get("name", "").strip() + schedule = body.get("schedule", "").strip() + prompt = body.get("prompt", "") + deliver = body.get("deliver", "local") + skills = body.get("skills") + repeat = body.get("repeat") + + if not name: + return web.json_response({"error": "Name is required"}, status=400) + if not schedule: + return web.json_response({"error": "Schedule is required"}, status=400) + + kwargs = { + "prompt": prompt, + "schedule": schedule, + "name": name, + "deliver": deliver, + } + if skills: + kwargs["skills"] = skills + if repeat is not None: + kwargs["repeat"] = repeat + + job = create_job(**kwargs) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_get_job(self, request: "web.Request") -> "web.Response": + """GET /api/jobs/{job_id} — get a single cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import get_job + job = get_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_update_job(self, request: "web.Request") -> "web.Response": + """PATCH /api/jobs/{job_id} — update a cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import update_job + body = await request.json() + job = update_job(job_id, body) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_delete_job(self, request: "web.Request") -> "web.Response": + """DELETE /api/jobs/{job_id} — delete a cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import remove_job + success = remove_job(job_id) + if not success: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"ok": True}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_pause_job(self, request: "web.Request") -> "web.Response": + """POST /api/jobs/{job_id}/pause — pause a cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import pause_job + job = pause_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_resume_job(self, request: "web.Request") -> "web.Response": + """POST /api/jobs/{job_id}/resume — resume a paused cron job.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import resume_job + job = resume_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + + async def _handle_run_job(self, request: "web.Request") -> "web.Response": + """POST /api/jobs/{job_id}/run — trigger immediate execution.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + job_id = request.match_info["job_id"] + err = self._validate_job_id(job_id) + if err: + return web.json_response({"error": err}, status=400) + try: + from cron.jobs import trigger_job + job = trigger_job(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response({"job": job}) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + # ------------------------------------------------------------------ # Output extraction helper # ------------------------------------------------------------------ @@ -739,6 +909,15 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_post("/v1/responses", self._handle_responses) self._app.router.add_get("/v1/responses/{response_id}", self._handle_get_response) self._app.router.add_delete("/v1/responses/{response_id}", self._handle_delete_response) + # Cron jobs management API + self._app.router.add_get("/api/jobs", self._handle_list_jobs) + self._app.router.add_post("/api/jobs", self._handle_create_job) + self._app.router.add_get("/api/jobs/{job_id}", self._handle_get_job) + self._app.router.add_patch("/api/jobs/{job_id}", self._handle_update_job) + self._app.router.add_delete("/api/jobs/{job_id}", self._handle_delete_job) + self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job) + self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job) + self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) self._runner = web.AppRunner(self._app) await self._runner.setup()