fix(api-server): harden jobs API — input limits, field whitelist, startup check, tests (#2456)

fix(api-server): harden jobs API — input limits, field whitelist, startup check, tests
This commit is contained in:
Teknium
2026-03-22 04:18:45 -07:00
committed by GitHub
2 changed files with 710 additions and 48 deletions

View File

@@ -691,23 +691,57 @@ class APIServerAdapter(BasePlatformAdapter):
# Cron jobs API
# ------------------------------------------------------------------
@staticmethod
def _validate_job_id(job_id: str) -> Optional[str]:
"""Return error message if job_id is invalid, else None."""
import re as _re
if not _re.fullmatch(r"[a-f0-9]{12}", job_id):
return "Invalid job ID format"
# Check cron module availability once (not per-request)
_CRON_AVAILABLE = False
try:
from cron.jobs import (
list_jobs as _cron_list,
get_job as _cron_get,
create_job as _cron_create,
update_job as _cron_update,
remove_job as _cron_remove,
pause_job as _cron_pause,
resume_job as _cron_resume,
trigger_job as _cron_trigger,
)
_CRON_AVAILABLE = True
except ImportError:
pass
_JOB_ID_RE = __import__("re").compile(r"[a-f0-9]{12}")
# Allowed fields for update — prevents clients injecting arbitrary keys
_UPDATE_ALLOWED_FIELDS = {"name", "schedule", "prompt", "deliver", "skills", "skill", "repeat", "enabled"}
_MAX_NAME_LENGTH = 200
_MAX_PROMPT_LENGTH = 5000
def _check_jobs_available(self) -> Optional["web.Response"]:
"""Return error response if cron module isn't available."""
if not self._CRON_AVAILABLE:
return web.json_response(
{"error": "Cron module not available"}, status=501,
)
return None
def _check_job_id(self, request: "web.Request") -> tuple:
"""Validate and extract job_id. Returns (job_id, error_response)."""
job_id = request.match_info["job_id"]
if not self._JOB_ID_RE.fullmatch(job_id):
return job_id, web.json_response(
{"error": "Invalid job ID format"}, status=400,
)
return job_id, None
async def _handle_list_jobs(self, request: "web.Request") -> "web.Response":
"""GET /api/jobs — list all cron jobs."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
try:
from cron.jobs import list_jobs
include_disabled = request.query.get("include_disabled", "").lower() in ("true", "1")
jobs = list_jobs(include_disabled=include_disabled)
jobs = self._cron_list(include_disabled=include_disabled)
return web.json_response({"jobs": jobs})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@@ -717,11 +751,13 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
try:
from cron.jobs import create_job
body = await request.json()
name = body.get("name", "").strip()
schedule = body.get("schedule", "").strip()
name = (body.get("name") or "").strip()
schedule = (body.get("schedule") or "").strip()
prompt = body.get("prompt", "")
deliver = body.get("deliver", "local")
skills = body.get("skills")
@@ -729,8 +765,18 @@ class APIServerAdapter(BasePlatformAdapter):
if not name:
return web.json_response({"error": "Name is required"}, status=400)
if len(name) > self._MAX_NAME_LENGTH:
return web.json_response(
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
)
if not schedule:
return web.json_response({"error": "Schedule is required"}, status=400)
if len(prompt) > self._MAX_PROMPT_LENGTH:
return web.json_response(
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
)
if repeat is not None and (not isinstance(repeat, int) or repeat < 1):
return web.json_response({"error": "Repeat must be a positive integer"}, status=400)
kwargs = {
"prompt": prompt,
@@ -743,7 +789,7 @@ class APIServerAdapter(BasePlatformAdapter):
if repeat is not None:
kwargs["repeat"] = repeat
job = create_job(**kwargs)
job = self._cron_create(**kwargs)
return web.json_response({"job": job})
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
@@ -753,13 +799,14 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import get_job
job = get_job(job_id)
job = self._cron_get(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})
@@ -771,14 +818,28 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import update_job
body = await request.json()
job = update_job(job_id, body)
# Whitelist allowed fields to prevent arbitrary key injection
sanitized = {k: v for k, v in body.items() if k in self._UPDATE_ALLOWED_FIELDS}
if not sanitized:
return web.json_response({"error": "No valid fields to update"}, status=400)
# Validate lengths if present
if "name" in sanitized and len(sanitized["name"]) > self._MAX_NAME_LENGTH:
return web.json_response(
{"error": f"Name must be ≤ {self._MAX_NAME_LENGTH} characters"}, status=400,
)
if "prompt" in sanitized and len(sanitized["prompt"]) > self._MAX_PROMPT_LENGTH:
return web.json_response(
{"error": f"Prompt must be ≤ {self._MAX_PROMPT_LENGTH} characters"}, status=400,
)
job = self._cron_update(job_id, sanitized)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})
@@ -790,13 +851,14 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import remove_job
success = remove_job(job_id)
success = self._cron_remove(job_id)
if not success:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"ok": True})
@@ -808,13 +870,14 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import pause_job
job = pause_job(job_id)
job = self._cron_pause(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})
@@ -826,13 +889,14 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import resume_job
job = resume_job(job_id)
job = self._cron_resume(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})
@@ -844,13 +908,14 @@ class APIServerAdapter(BasePlatformAdapter):
auth_err = self._check_auth(request)
if auth_err:
return auth_err
job_id = request.match_info["job_id"]
err = self._validate_job_id(job_id)
if err:
return web.json_response({"error": err}, status=400)
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
from cron.jobs import trigger_job
job = trigger_job(job_id)
job = self._cron_trigger(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})

View File

@@ -0,0 +1,597 @@
"""
Tests for the Cron Jobs API endpoints on the API server adapter.
Covers:
- CRUD operations for cron jobs (list, create, get, update, delete)
- Pause / resume / run (trigger) actions
- Input validation (missing name, name too long, prompt too long, invalid repeat)
- Job ID validation (invalid hex)
- Auth enforcement (401 when API_SERVER_KEY is set)
- Cron module unavailability (501 when _CRON_AVAILABLE is False)
"""
import json
from unittest.mock import MagicMock, patch
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import PlatformConfig
from gateway.platforms.api_server import APIServerAdapter, cors_middleware
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
SAMPLE_JOB = {
"id": "aabbccddeeff",
"name": "test-job",
"schedule": "*/5 * * * *",
"prompt": "do something",
"deliver": "local",
"enabled": True,
}
VALID_JOB_ID = "aabbccddeeff"
def _make_adapter(api_key: str = "") -> APIServerAdapter:
"""Create an adapter with optional API key."""
extra = {}
if api_key:
extra["key"] = api_key
config = PlatformConfig(enabled=True, extra=extra)
return APIServerAdapter(config)
def _create_app(adapter: APIServerAdapter) -> web.Application:
"""Create the aiohttp app with jobs routes registered."""
app = web.Application(middlewares=[cors_middleware])
app["api_server_adapter"] = adapter
# Register only job routes (plus health for sanity)
app.router.add_get("/health", adapter._handle_health)
app.router.add_get("/api/jobs", adapter._handle_list_jobs)
app.router.add_post("/api/jobs", adapter._handle_create_job)
app.router.add_get("/api/jobs/{job_id}", adapter._handle_get_job)
app.router.add_patch("/api/jobs/{job_id}", adapter._handle_update_job)
app.router.add_delete("/api/jobs/{job_id}", adapter._handle_delete_job)
app.router.add_post("/api/jobs/{job_id}/pause", adapter._handle_pause_job)
app.router.add_post("/api/jobs/{job_id}/resume", adapter._handle_resume_job)
app.router.add_post("/api/jobs/{job_id}/run", adapter._handle_run_job)
return app
@pytest.fixture
def adapter():
return _make_adapter()
@pytest.fixture
def auth_adapter():
return _make_adapter(api_key="sk-secret")
# ---------------------------------------------------------------------------
# 1. test_list_jobs
# ---------------------------------------------------------------------------
class TestListJobs:
@pytest.mark.asyncio
async def test_list_jobs(self, adapter):
"""GET /api/jobs returns job list."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_list", return_value=[SAMPLE_JOB]
):
resp = await cli.get("/api/jobs")
assert resp.status == 200
data = await resp.json()
assert "jobs" in data
assert data["jobs"] == [SAMPLE_JOB]
# -------------------------------------------------------------------
# 2. test_list_jobs_include_disabled
# -------------------------------------------------------------------
@pytest.mark.asyncio
async def test_list_jobs_include_disabled(self, adapter):
"""GET /api/jobs?include_disabled=true passes the flag."""
app = _create_app(adapter)
mock_list = MagicMock(return_value=[SAMPLE_JOB])
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_list", mock_list
):
resp = await cli.get("/api/jobs?include_disabled=true")
assert resp.status == 200
mock_list.assert_called_once_with(include_disabled=True)
@pytest.mark.asyncio
async def test_list_jobs_default_excludes_disabled(self, adapter):
"""GET /api/jobs without flag passes include_disabled=False."""
app = _create_app(adapter)
mock_list = MagicMock(return_value=[])
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_list", mock_list
):
resp = await cli.get("/api/jobs")
assert resp.status == 200
mock_list.assert_called_once_with(include_disabled=False)
# ---------------------------------------------------------------------------
# 3-7. test_create_job and validation
# ---------------------------------------------------------------------------
class TestCreateJob:
@pytest.mark.asyncio
async def test_create_job(self, adapter):
"""POST /api/jobs with valid body returns created job."""
app = _create_app(adapter)
mock_create = MagicMock(return_value=SAMPLE_JOB)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_create", mock_create
):
resp = await cli.post("/api/jobs", json={
"name": "test-job",
"schedule": "*/5 * * * *",
"prompt": "do something",
})
assert resp.status == 200
data = await resp.json()
assert data["job"] == SAMPLE_JOB
mock_create.assert_called_once()
call_kwargs = mock_create.call_args[1]
assert call_kwargs["name"] == "test-job"
assert call_kwargs["schedule"] == "*/5 * * * *"
assert call_kwargs["prompt"] == "do something"
@pytest.mark.asyncio
async def test_create_job_missing_name(self, adapter):
"""POST /api/jobs without name returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"schedule": "*/5 * * * *",
"prompt": "do something",
})
assert resp.status == 400
data = await resp.json()
assert "name" in data["error"].lower() or "Name" in data["error"]
@pytest.mark.asyncio
async def test_create_job_name_too_long(self, adapter):
"""POST /api/jobs with name > 200 chars returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"name": "x" * 201,
"schedule": "*/5 * * * *",
})
assert resp.status == 400
data = await resp.json()
assert "200" in data["error"] or "Name" in data["error"]
@pytest.mark.asyncio
async def test_create_job_prompt_too_long(self, adapter):
"""POST /api/jobs with prompt > 5000 chars returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"name": "test-job",
"schedule": "*/5 * * * *",
"prompt": "x" * 5001,
})
assert resp.status == 400
data = await resp.json()
assert "5000" in data["error"] or "Prompt" in data["error"]
@pytest.mark.asyncio
async def test_create_job_invalid_repeat(self, adapter):
"""POST /api/jobs with repeat=0 returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"name": "test-job",
"schedule": "*/5 * * * *",
"repeat": 0,
})
assert resp.status == 400
data = await resp.json()
assert "repeat" in data["error"].lower() or "Repeat" in data["error"]
@pytest.mark.asyncio
async def test_create_job_missing_schedule(self, adapter):
"""POST /api/jobs without schedule returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"name": "test-job",
})
assert resp.status == 400
data = await resp.json()
assert "schedule" in data["error"].lower() or "Schedule" in data["error"]
# ---------------------------------------------------------------------------
# 8-10. test_get_job
# ---------------------------------------------------------------------------
class TestGetJob:
@pytest.mark.asyncio
async def test_get_job(self, adapter):
"""GET /api/jobs/{id} returns job."""
app = _create_app(adapter)
mock_get = MagicMock(return_value=SAMPLE_JOB)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_get", mock_get
):
resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 200
data = await resp.json()
assert data["job"] == SAMPLE_JOB
mock_get.assert_called_once_with(VALID_JOB_ID)
@pytest.mark.asyncio
async def test_get_job_not_found(self, adapter):
"""GET /api/jobs/{id} returns 404 when job doesn't exist."""
app = _create_app(adapter)
mock_get = MagicMock(return_value=None)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_get", mock_get
):
resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 404
@pytest.mark.asyncio
async def test_get_job_invalid_id(self, adapter):
"""GET /api/jobs/{id} with non-hex id returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.get("/api/jobs/not-a-valid-hex!")
assert resp.status == 400
data = await resp.json()
assert "Invalid" in data["error"]
# ---------------------------------------------------------------------------
# 11-12. test_update_job
# ---------------------------------------------------------------------------
class TestUpdateJob:
@pytest.mark.asyncio
async def test_update_job(self, adapter):
"""PATCH /api/jobs/{id} updates with whitelisted fields."""
app = _create_app(adapter)
updated_job = {**SAMPLE_JOB, "name": "updated-name"}
mock_update = MagicMock(return_value=updated_job)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_update", mock_update
):
resp = await cli.patch(
f"/api/jobs/{VALID_JOB_ID}",
json={"name": "updated-name", "schedule": "0 * * * *"},
)
assert resp.status == 200
data = await resp.json()
assert data["job"] == updated_job
mock_update.assert_called_once()
call_args = mock_update.call_args
assert call_args[0][0] == VALID_JOB_ID
sanitized = call_args[0][1]
assert "name" in sanitized
assert "schedule" in sanitized
@pytest.mark.asyncio
async def test_update_job_rejects_unknown_fields(self, adapter):
"""PATCH /api/jobs/{id} — only allowed fields pass through."""
app = _create_app(adapter)
updated_job = {**SAMPLE_JOB, "name": "new-name"}
mock_update = MagicMock(return_value=updated_job)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_update", mock_update
):
resp = await cli.patch(
f"/api/jobs/{VALID_JOB_ID}",
json={
"name": "new-name",
"evil_field": "malicious",
"__proto__": "hack",
},
)
assert resp.status == 200
call_args = mock_update.call_args
sanitized = call_args[0][1]
assert "name" in sanitized
assert "evil_field" not in sanitized
assert "__proto__" not in sanitized
@pytest.mark.asyncio
async def test_update_job_no_valid_fields(self, adapter):
"""PATCH /api/jobs/{id} with only unknown fields returns 400."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.patch(
f"/api/jobs/{VALID_JOB_ID}",
json={"evil_field": "malicious"},
)
assert resp.status == 400
data = await resp.json()
assert "No valid fields" in data["error"]
# ---------------------------------------------------------------------------
# 13. test_delete_job
# ---------------------------------------------------------------------------
class TestDeleteJob:
@pytest.mark.asyncio
async def test_delete_job(self, adapter):
"""DELETE /api/jobs/{id} returns ok."""
app = _create_app(adapter)
mock_remove = MagicMock(return_value=True)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_remove", mock_remove
):
resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 200
data = await resp.json()
assert data["ok"] is True
mock_remove.assert_called_once_with(VALID_JOB_ID)
@pytest.mark.asyncio
async def test_delete_job_not_found(self, adapter):
"""DELETE /api/jobs/{id} returns 404 when job doesn't exist."""
app = _create_app(adapter)
mock_remove = MagicMock(return_value=False)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_remove", mock_remove
):
resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 404
# ---------------------------------------------------------------------------
# 14. test_pause_job
# ---------------------------------------------------------------------------
class TestPauseJob:
@pytest.mark.asyncio
async def test_pause_job(self, adapter):
"""POST /api/jobs/{id}/pause returns updated job."""
app = _create_app(adapter)
paused_job = {**SAMPLE_JOB, "enabled": False}
mock_pause = MagicMock(return_value=paused_job)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_pause", mock_pause
):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/pause")
assert resp.status == 200
data = await resp.json()
assert data["job"] == paused_job
assert data["job"]["enabled"] is False
mock_pause.assert_called_once_with(VALID_JOB_ID)
# ---------------------------------------------------------------------------
# 15. test_resume_job
# ---------------------------------------------------------------------------
class TestResumeJob:
@pytest.mark.asyncio
async def test_resume_job(self, adapter):
"""POST /api/jobs/{id}/resume returns updated job."""
app = _create_app(adapter)
resumed_job = {**SAMPLE_JOB, "enabled": True}
mock_resume = MagicMock(return_value=resumed_job)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_resume", mock_resume
):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/resume")
assert resp.status == 200
data = await resp.json()
assert data["job"] == resumed_job
assert data["job"]["enabled"] is True
mock_resume.assert_called_once_with(VALID_JOB_ID)
# ---------------------------------------------------------------------------
# 16. test_run_job
# ---------------------------------------------------------------------------
class TestRunJob:
@pytest.mark.asyncio
async def test_run_job(self, adapter):
"""POST /api/jobs/{id}/run returns triggered job."""
app = _create_app(adapter)
triggered_job = {**SAMPLE_JOB, "last_run": "2025-01-01T00:00:00Z"}
mock_trigger = MagicMock(return_value=triggered_job)
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_trigger", mock_trigger
):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
assert resp.status == 200
data = await resp.json()
assert data["job"] == triggered_job
mock_trigger.assert_called_once_with(VALID_JOB_ID)
# ---------------------------------------------------------------------------
# 17. test_auth_required
# ---------------------------------------------------------------------------
class TestAuthRequired:
@pytest.mark.asyncio
async def test_auth_required_list_jobs(self, auth_adapter):
"""GET /api/jobs without API key returns 401 when key is set."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.get("/api/jobs")
assert resp.status == 401
@pytest.mark.asyncio
async def test_auth_required_create_job(self, auth_adapter):
"""POST /api/jobs without API key returns 401 when key is set."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.post("/api/jobs", json={
"name": "test", "schedule": "* * * * *",
})
assert resp.status == 401
@pytest.mark.asyncio
async def test_auth_required_get_job(self, auth_adapter):
"""GET /api/jobs/{id} without API key returns 401 when key is set."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 401
@pytest.mark.asyncio
async def test_auth_required_delete_job(self, auth_adapter):
"""DELETE /api/jobs/{id} without API key returns 401."""
app = _create_app(auth_adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", True):
resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 401
@pytest.mark.asyncio
async def test_auth_passes_with_valid_key(self, auth_adapter):
"""GET /api/jobs with correct API key succeeds."""
app = _create_app(auth_adapter)
mock_list = MagicMock(return_value=[])
async with TestClient(TestServer(app)) as cli:
with patch.object(
APIServerAdapter, "_CRON_AVAILABLE", True
), patch.object(
APIServerAdapter, "_cron_list", mock_list
):
resp = await cli.get(
"/api/jobs",
headers={"Authorization": "Bearer sk-secret"},
)
assert resp.status == 200
# ---------------------------------------------------------------------------
# 18. test_cron_unavailable
# ---------------------------------------------------------------------------
class TestCronUnavailable:
@pytest.mark.asyncio
async def test_cron_unavailable_list(self, adapter):
"""GET /api/jobs returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.get("/api/jobs")
assert resp.status == 501
data = await resp.json()
assert "not available" in data["error"].lower()
@pytest.mark.asyncio
async def test_cron_unavailable_create(self, adapter):
"""POST /api/jobs returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.post("/api/jobs", json={
"name": "test", "schedule": "* * * * *",
})
assert resp.status == 501
@pytest.mark.asyncio
async def test_cron_unavailable_get(self, adapter):
"""GET /api/jobs/{id} returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.get(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 501
@pytest.mark.asyncio
async def test_cron_unavailable_delete(self, adapter):
"""DELETE /api/jobs/{id} returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.delete(f"/api/jobs/{VALID_JOB_ID}")
assert resp.status == 501
@pytest.mark.asyncio
async def test_cron_unavailable_pause(self, adapter):
"""POST /api/jobs/{id}/pause returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/pause")
assert resp.status == 501
@pytest.mark.asyncio
async def test_cron_unavailable_resume(self, adapter):
"""POST /api/jobs/{id}/resume returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/resume")
assert resp.status == 501
@pytest.mark.asyncio
async def test_cron_unavailable_run(self, adapter):
"""POST /api/jobs/{id}/run returns 501 when _CRON_AVAILABLE is False."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(APIServerAdapter, "_CRON_AVAILABLE", False):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
assert resp.status == 501