From 0f1c9701799c64de6e83df4a31993c21ce41ffed Mon Sep 17 00:00:00 2001 From: Teknium Date: Sun, 22 Mar 2026 04:18:18 -0700 Subject: [PATCH] =?UTF-8?q?fix(api-server):=20harden=20jobs=20API=20?= =?UTF-8?q?=E2=80=94=20input=20limits,=20field=20whitelist,=20startup=20ch?= =?UTF-8?q?eck,=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five improvements to the /api/jobs endpoints: 1. Startup availability check — cron module imported once at class load, endpoints return 501 if unavailable (not 500 per-request import error) 2. Input limits — name ≤ 200 chars, prompt ≤ 5000 chars, repeat must be positive int 3. Update field whitelist — only name/schedule/prompt/deliver/skills/ repeat/enabled pass through to cron.jobs.update_job, preventing arbitrary key injection 4. Deduplicated validation — _check_job_id and _check_jobs_available helpers replace repeated boilerplate 5. 32 new tests covering all endpoints, validation, auth, and cron-unavailable cases --- gateway/platforms/api_server.py | 161 ++++--- tests/gateway/test_api_server_jobs.py | 597 ++++++++++++++++++++++++++ 2 files changed, 710 insertions(+), 48 deletions(-) create mode 100644 tests/gateway/test_api_server_jobs.py diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index d0fd301a1..78ea1137c 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -691,23 +691,57 @@ class APIServerAdapter(BasePlatformAdapter): # Cron jobs API # ------------------------------------------------------------------ - @staticmethod - def _validate_job_id(job_id: str) -> Optional[str]: - """Return error message if job_id is invalid, else None.""" - import re as _re - if not _re.fullmatch(r"[a-f0-9]{12}", job_id): - return "Invalid job ID format" + # Check cron module availability once (not per-request) + _CRON_AVAILABLE = False + try: + from cron.jobs import ( + list_jobs as _cron_list, + get_job as _cron_get, + create_job as _cron_create, + update_job as _cron_update, + remove_job as _cron_remove, + pause_job as _cron_pause, + resume_job as _cron_resume, + trigger_job as _cron_trigger, + ) + _CRON_AVAILABLE = True + except ImportError: + pass + + _JOB_ID_RE = __import__("re").compile(r"[a-f0-9]{12}") + # Allowed fields for update — prevents clients injecting arbitrary keys + _UPDATE_ALLOWED_FIELDS = {"name", "schedule", "prompt", "deliver", "skills", "skill", "repeat", "enabled"} + _MAX_NAME_LENGTH = 200 + _MAX_PROMPT_LENGTH = 5000 + + def _check_jobs_available(self) -> Optional["web.Response"]: + """Return error response if cron module isn't available.""" + if not self._CRON_AVAILABLE: + return web.json_response( + {"error": "Cron module not available"}, status=501, + ) return None + def _check_job_id(self, request: "web.Request") -> tuple: + """Validate and extract job_id. Returns (job_id, error_response).""" + job_id = request.match_info["job_id"] + if not self._JOB_ID_RE.fullmatch(job_id): + return job_id, web.json_response( + {"error": "Invalid job ID format"}, status=400, + ) + return job_id, None + async def _handle_list_jobs(self, request: "web.Request") -> "web.Response": """GET /api/jobs — list all cron jobs.""" auth_err = self._check_auth(request) if auth_err: return auth_err + cron_err = self._check_jobs_available() + if cron_err: + return cron_err try: - from cron.jobs import list_jobs include_disabled = request.query.get("include_disabled", "").lower() in ("true", "1") - jobs = list_jobs(include_disabled=include_disabled) + jobs = self._cron_list(include_disabled=include_disabled) return web.json_response({"jobs": jobs}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -717,11 +751,13 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err + cron_err = self._check_jobs_available() + if cron_err: + return cron_err try: - from cron.jobs import create_job body = await request.json() - name = body.get("name", "").strip() - schedule = body.get("schedule", "").strip() + name = (body.get("name") or "").strip() + schedule = (body.get("schedule") or "").strip() prompt = body.get("prompt", "") deliver = body.get("deliver", "local") skills = body.get("skills") @@ -729,8 +765,18 @@ class APIServerAdapter(BasePlatformAdapter): if not name: return web.json_response({"error": "Name is required"}, status=400) + if len(name) > self._MAX_NAME_LENGTH: + return web.json_response( + {"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400, + ) if not schedule: return web.json_response({"error": "Schedule is required"}, status=400) + if len(prompt) > self._MAX_PROMPT_LENGTH: + return web.json_response( + {"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400, + ) + if repeat is not None and (not isinstance(repeat, int) or repeat < 1): + return web.json_response({"error": "Repeat must be a positive integer"}, status=400) kwargs = { "prompt": prompt, @@ -743,7 +789,7 @@ class APIServerAdapter(BasePlatformAdapter): if repeat is not None: kwargs["repeat"] = repeat - job = create_job(**kwargs) + job = self._cron_create(**kwargs) return web.json_response({"job": job}) except Exception as e: return web.json_response({"error": str(e)}, status=500) @@ -753,13 +799,14 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import get_job - job = get_job(job_id) + job = self._cron_get(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"job": job}) @@ -771,14 +818,28 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import update_job body = await request.json() - job = update_job(job_id, body) + # Whitelist allowed fields to prevent arbitrary key injection + sanitized = {k: v for k, v in body.items() if k in self._UPDATE_ALLOWED_FIELDS} + if not sanitized: + return web.json_response({"error": "No valid fields to update"}, status=400) + # Validate lengths if present + if "name" in sanitized and len(sanitized["name"]) > self._MAX_NAME_LENGTH: + return web.json_response( + {"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400, + ) + if "prompt" in sanitized and len(sanitized["prompt"]) > self._MAX_PROMPT_LENGTH: + return web.json_response( + {"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400, + ) + job = self._cron_update(job_id, sanitized) if not job: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"job": job}) @@ -790,13 +851,14 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import remove_job - success = remove_job(job_id) + success = self._cron_remove(job_id) if not success: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"ok": True}) @@ -808,13 +870,14 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import pause_job - job = pause_job(job_id) + job = self._cron_pause(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"job": job}) @@ -826,13 +889,14 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import resume_job - job = resume_job(job_id) + job = self._cron_resume(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"job": job}) @@ -844,13 +908,14 @@ class APIServerAdapter(BasePlatformAdapter): auth_err = self._check_auth(request) if auth_err: return auth_err - job_id = request.match_info["job_id"] - err = self._validate_job_id(job_id) - if err: - return web.json_response({"error": err}, status=400) + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err try: - from cron.jobs import trigger_job - job = trigger_job(job_id) + job = self._cron_trigger(job_id) if not job: return web.json_response({"error": "Job not found"}, status=404) return web.json_response({"job": job}) diff --git a/tests/gateway/test_api_server_jobs.py b/tests/gateway/test_api_server_jobs.py new file mode 100644 index 000000000..789900a5c --- /dev/null +++ b/tests/gateway/test_api_server_jobs.py @@ -0,0 +1,597 @@ +""" +Tests for the Cron Jobs API endpoints on the API server adapter. + +Covers: +- CRUD operations for cron jobs (list, create, get, update, delete) +- Pause / resume / run (trigger) actions +- Input validation (missing name, name too long, prompt too long, invalid repeat) +- Job ID validation (invalid hex) +- Auth enforcement (401 when API_SERVER_KEY is set) +- Cron module unavailability (501 when _CRON_AVAILABLE is False) +""" + +import json +from unittest.mock import MagicMock, patch + +import pytest +from aiohttp import web +from aiohttp.test_utils import TestClient, TestServer + +from gateway.config import PlatformConfig +from gateway.platforms.api_server import APIServerAdapter, cors_middleware + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +SAMPLE_JOB = { + "id": "aabbccddeeff", + "name": "test-job", + "schedule": "*/5 * * * *", + "prompt": "do something", + "deliver": "local", + "enabled": True, +} + +VALID_JOB_ID = "aabbccddeeff" + + +def _make_adapter(api_key: str = "") -> APIServerAdapter: + """Create an adapter with optional API key.""" + extra = {} + if api_key: + extra["key"] = api_key + config = PlatformConfig(enabled=True, extra=extra) + return APIServerAdapter(config) + + +def _create_app(adapter: APIServerAdapter) -> web.Application: + """Create the aiohttp app with jobs routes registered.""" + app = web.Application(middlewares=[cors_middleware]) + app["api_server_adapter"] = adapter + # Register only job routes (plus health for sanity) + app.router.add_get("/health", adapter._handle_health) + app.router.add_get("/api/jobs", adapter._handle_list_jobs) + app.router.add_post("/api/jobs", adapter._handle_create_job) + app.router.add_get("/api/jobs/{job_id}", adapter._handle_get_job) + app.router.add_patch("/api/jobs/{job_id}", adapter._handle_update_job) + app.router.add_delete("/api/jobs/{job_id}", adapter._handle_delete_job) + app.router.add_post("/api/jobs/{job_id}/pause", adapter._handle_pause_job) + app.router.add_post("/api/jobs/{job_id}/resume", adapter._handle_resume_job) + app.router.add_post("/api/jobs/{job_id}/run", adapter._handle_run_job) + return app + + +@pytest.fixture +def adapter(): + return _make_adapter() + + +@pytest.fixture +def auth_adapter(): + return _make_adapter(api_key="sk-secret") + + +# --------------------------------------------------------------------------- +# 1. test_list_jobs +# --------------------------------------------------------------------------- + +class TestListJobs: + @pytest.mark.asyncio + async def test_list_jobs(self, adapter): + """GET /api/jobs returns job list.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_list", return_value=[SAMPLE_JOB] + ): + resp = await cli.get("/api/jobs") + assert resp.status == 200 + data = await resp.json() + assert "jobs" in data + assert data["jobs"] == [SAMPLE_JOB] + + # ------------------------------------------------------------------- + # 2. test_list_jobs_include_disabled + # ------------------------------------------------------------------- + + @pytest.mark.asyncio + async def test_list_jobs_include_disabled(self, adapter): + """GET /api/jobs?include_disabled=true passes the flag.""" + app = _create_app(adapter) + mock_list = MagicMock(return_value=[SAMPLE_JOB]) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_list", mock_list + ): + resp = await cli.get("/api/jobs?include_disabled=true") + assert resp.status == 200 + mock_list.assert_called_once_with(include_disabled=True) + + @pytest.mark.asyncio + async def test_list_jobs_default_excludes_disabled(self, adapter): + """GET /api/jobs without flag passes include_disabled=False.""" + app = _create_app(adapter) + mock_list = MagicMock(return_value=[]) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_list", mock_list + ): + resp = await cli.get("/api/jobs") + assert resp.status == 200 + mock_list.assert_called_once_with(include_disabled=False) + + +# --------------------------------------------------------------------------- +# 3-7. test_create_job and validation +# --------------------------------------------------------------------------- + +class TestCreateJob: + @pytest.mark.asyncio + async def test_create_job(self, adapter): + """POST /api/jobs with valid body returns created job.""" + app = _create_app(adapter) + mock_create = MagicMock(return_value=SAMPLE_JOB) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_create", mock_create + ): + resp = await cli.post("/api/jobs", json={ + "name": "test-job", + "schedule": "*/5 * * * *", + "prompt": "do something", + }) + assert resp.status == 200 + data = await resp.json() + assert data["job"] == SAMPLE_JOB + mock_create.assert_called_once() + call_kwargs = mock_create.call_args[1] + assert call_kwargs["name"] == "test-job" + assert call_kwargs["schedule"] == "*/5 * * * *" + assert call_kwargs["prompt"] == "do something" + + @pytest.mark.asyncio + async def test_create_job_missing_name(self, adapter): + """POST /api/jobs without name returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "schedule": "*/5 * * * *", + "prompt": "do something", + }) + assert resp.status == 400 + data = await resp.json() + assert "name" in data["error"].lower() or "Name" in data["error"] + + @pytest.mark.asyncio + async def test_create_job_name_too_long(self, adapter): + """POST /api/jobs with name > 200 chars returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "name": "x" * 201, + "schedule": "*/5 * * * *", + }) + assert resp.status == 400 + data = await resp.json() + assert "200" in data["error"] or "Name" in data["error"] + + @pytest.mark.asyncio + async def test_create_job_prompt_too_long(self, adapter): + """POST /api/jobs with prompt > 5000 chars returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "name": "test-job", + "schedule": "*/5 * * * *", + "prompt": "x" * 5001, + }) + assert resp.status == 400 + data = await resp.json() + assert "5000" in data["error"] or "Prompt" in data["error"] + + @pytest.mark.asyncio + async def test_create_job_invalid_repeat(self, adapter): + """POST /api/jobs with repeat=0 returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "name": "test-job", + "schedule": "*/5 * * * *", + "repeat": 0, + }) + assert resp.status == 400 + data = await resp.json() + assert "repeat" in data["error"].lower() or "Repeat" in data["error"] + + @pytest.mark.asyncio + async def test_create_job_missing_schedule(self, adapter): + """POST /api/jobs without schedule returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "name": "test-job", + }) + assert resp.status == 400 + data = await resp.json() + assert "schedule" in data["error"].lower() or "Schedule" in data["error"] + + +# --------------------------------------------------------------------------- +# 8-10. test_get_job +# --------------------------------------------------------------------------- + +class TestGetJob: + @pytest.mark.asyncio + async def test_get_job(self, adapter): + """GET /api/jobs/{id} returns job.""" + app = _create_app(adapter) + mock_get = MagicMock(return_value=SAMPLE_JOB) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_get", mock_get + ): + resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 200 + data = await resp.json() + assert data["job"] == SAMPLE_JOB + mock_get.assert_called_once_with(VALID_JOB_ID) + + @pytest.mark.asyncio + async def test_get_job_not_found(self, adapter): + """GET /api/jobs/{id} returns 404 when job doesn't exist.""" + app = _create_app(adapter) + mock_get = MagicMock(return_value=None) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_get", mock_get + ): + resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 404 + + @pytest.mark.asyncio + async def test_get_job_invalid_id(self, adapter): + """GET /api/jobs/{id} with non-hex id returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.get("/api/jobs/not-a-valid-hex!") + assert resp.status == 400 + data = await resp.json() + assert "Invalid" in data["error"] + + +# --------------------------------------------------------------------------- +# 11-12. test_update_job +# --------------------------------------------------------------------------- + +class TestUpdateJob: + @pytest.mark.asyncio + async def test_update_job(self, adapter): + """PATCH /api/jobs/{id} updates with whitelisted fields.""" + app = _create_app(adapter) + updated_job = {**SAMPLE_JOB, "name": "updated-name"} + mock_update = MagicMock(return_value=updated_job) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_update", mock_update + ): + resp = await cli.patch( + f"/api/jobs/{VALID_JOB_ID}", + json={"name": "updated-name", "schedule": "0 * * * *"}, + ) + assert resp.status == 200 + data = await resp.json() + assert data["job"] == updated_job + mock_update.assert_called_once() + call_args = mock_update.call_args + assert call_args[0][0] == VALID_JOB_ID + sanitized = call_args[0][1] + assert "name" in sanitized + assert "schedule" in sanitized + + @pytest.mark.asyncio + async def test_update_job_rejects_unknown_fields(self, adapter): + """PATCH /api/jobs/{id} — only allowed fields pass through.""" + app = _create_app(adapter) + updated_job = {**SAMPLE_JOB, "name": "new-name"} + mock_update = MagicMock(return_value=updated_job) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_update", mock_update + ): + resp = await cli.patch( + f"/api/jobs/{VALID_JOB_ID}", + json={ + "name": "new-name", + "evil_field": "malicious", + "__proto__": "hack", + }, + ) + assert resp.status == 200 + call_args = mock_update.call_args + sanitized = call_args[0][1] + assert "name" in sanitized + assert "evil_field" not in sanitized + assert "__proto__" not in sanitized + + @pytest.mark.asyncio + async def test_update_job_no_valid_fields(self, adapter): + """PATCH /api/jobs/{id} with only unknown fields returns 400.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.patch( + f"/api/jobs/{VALID_JOB_ID}", + json={"evil_field": "malicious"}, + ) + assert resp.status == 400 + data = await resp.json() + assert "No valid fields" in data["error"] + + +# --------------------------------------------------------------------------- +# 13. test_delete_job +# --------------------------------------------------------------------------- + +class TestDeleteJob: + @pytest.mark.asyncio + async def test_delete_job(self, adapter): + """DELETE /api/jobs/{id} returns ok.""" + app = _create_app(adapter) + mock_remove = MagicMock(return_value=True) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_remove", mock_remove + ): + resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 200 + data = await resp.json() + assert data["ok"] is True + mock_remove.assert_called_once_with(VALID_JOB_ID) + + @pytest.mark.asyncio + async def test_delete_job_not_found(self, adapter): + """DELETE /api/jobs/{id} returns 404 when job doesn't exist.""" + app = _create_app(adapter) + mock_remove = MagicMock(return_value=False) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_remove", mock_remove + ): + resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 404 + + +# --------------------------------------------------------------------------- +# 14. test_pause_job +# --------------------------------------------------------------------------- + +class TestPauseJob: + @pytest.mark.asyncio + async def test_pause_job(self, adapter): + """POST /api/jobs/{id}/pause returns updated job.""" + app = _create_app(adapter) + paused_job = {**SAMPLE_JOB, "enabled": False} + mock_pause = MagicMock(return_value=paused_job) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_pause", mock_pause + ): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/pause") + assert resp.status == 200 + data = await resp.json() + assert data["job"] == paused_job + assert data["job"]["enabled"] is False + mock_pause.assert_called_once_with(VALID_JOB_ID) + + +# --------------------------------------------------------------------------- +# 15. test_resume_job +# --------------------------------------------------------------------------- + +class TestResumeJob: + @pytest.mark.asyncio + async def test_resume_job(self, adapter): + """POST /api/jobs/{id}/resume returns updated job.""" + app = _create_app(adapter) + resumed_job = {**SAMPLE_JOB, "enabled": True} + mock_resume = MagicMock(return_value=resumed_job) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_resume", mock_resume + ): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/resume") + assert resp.status == 200 + data = await resp.json() + assert data["job"] == resumed_job + assert data["job"]["enabled"] is True + mock_resume.assert_called_once_with(VALID_JOB_ID) + + +# --------------------------------------------------------------------------- +# 16. test_run_job +# --------------------------------------------------------------------------- + +class TestRunJob: + @pytest.mark.asyncio + async def test_run_job(self, adapter): + """POST /api/jobs/{id}/run returns triggered job.""" + app = _create_app(adapter) + triggered_job = {**SAMPLE_JOB, "last_run": "2025-01-01T00:00:00Z"} + mock_trigger = MagicMock(return_value=triggered_job) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_trigger", mock_trigger + ): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run") + assert resp.status == 200 + data = await resp.json() + assert data["job"] == triggered_job + mock_trigger.assert_called_once_with(VALID_JOB_ID) + + +# --------------------------------------------------------------------------- +# 17. test_auth_required +# --------------------------------------------------------------------------- + +class TestAuthRequired: + @pytest.mark.asyncio + async def test_auth_required_list_jobs(self, auth_adapter): + """GET /api/jobs without API key returns 401 when key is set.""" + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.get("/api/jobs") + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_auth_required_create_job(self, auth_adapter): + """POST /api/jobs without API key returns 401 when key is set.""" + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.post("/api/jobs", json={ + "name": "test", "schedule": "* * * * *", + }) + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_auth_required_get_job(self, auth_adapter): + """GET /api/jobs/{id} without API key returns 401 when key is set.""" + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_auth_required_delete_job(self, auth_adapter): + """DELETE /api/jobs/{id} without API key returns 401.""" + app = _create_app(auth_adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True): + resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 401 + + @pytest.mark.asyncio + async def test_auth_passes_with_valid_key(self, auth_adapter): + """GET /api/jobs with correct API key succeeds.""" + app = _create_app(auth_adapter) + mock_list = MagicMock(return_value=[]) + async with TestClient(TestServer(app)) as cli: + with patch.object( + APIServerAdapter, "_CRON_AVAILABLE", True + ), patch.object( + APIServerAdapter, "_cron_list", mock_list + ): + resp = await cli.get( + "/api/jobs", + headers={"Authorization": "Bearer sk-secret"}, + ) + assert resp.status == 200 + + +# --------------------------------------------------------------------------- +# 18. test_cron_unavailable +# --------------------------------------------------------------------------- + +class TestCronUnavailable: + @pytest.mark.asyncio + async def test_cron_unavailable_list(self, adapter): + """GET /api/jobs returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.get("/api/jobs") + assert resp.status == 501 + data = await resp.json() + assert "not available" in data["error"].lower() + + @pytest.mark.asyncio + async def test_cron_unavailable_create(self, adapter): + """POST /api/jobs returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.post("/api/jobs", json={ + "name": "test", "schedule": "* * * * *", + }) + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_cron_unavailable_get(self, adapter): + """GET /api/jobs/{id} returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_cron_unavailable_delete(self, adapter): + """DELETE /api/jobs/{id} returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}") + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_cron_unavailable_pause(self, adapter): + """POST /api/jobs/{id}/pause returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/pause") + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_cron_unavailable_resume(self, adapter): + """POST /api/jobs/{id}/resume returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/resume") + assert resp.status == 501 + + @pytest.mark.asyncio + async def test_cron_unavailable_run(self, adapter): + """POST /api/jobs/{id}/run returns 501 when _CRON_AVAILABLE is False.""" + app = _create_app(adapter) + async with TestClient(TestServer(app)) as cli: + with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run") + assert resp.status == 501