1
0

Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
110f67c567 fix: add Gitea hardening config and script to disable registration + require sign-in
Adds deploy/gitea/app.ini with hardened [service] settings and
scripts/harden_gitea.sh to apply them on the server. The script
backs up the existing config, patches the four required settings,
restarts Gitea, and verifies the changes.

Settings applied:
- DISABLE_REGISTRATION = true
- ALLOW_ONLY_EXTERNAL_REGISTRATION = false
- SHOW_REGISTRATION_BUTTON = false
- REQUIRE_SIGNIN_VIEW = true

Fixes #988

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:39:05 -04:00
e24f49e58d [kimi] Add JSON validation guard to queue.json writes (#952) (#995) 2026-03-22 22:33:40 +00:00
1fa5cff5dc [kimi] Fix GITEA_API configuration in triage scripts (#951) (#994) 2026-03-22 22:28:23 +00:00
e255e7eb2a [kimi] Add docstrings to system.py route handlers (#940) (#992) 2026-03-22 22:12:36 +00:00
c3b6eb71c0 [kimi] Add docstrings to src/dashboard/routes/tasks.py (#939) (#991) 2026-03-22 22:08:28 +00:00
17 changed files with 579 additions and 46 deletions

21
deploy/gitea/app.ini Normal file
View File

@@ -0,0 +1,21 @@
; ── Gitea Hardening — Security Overrides ─────────────────────────────────────
;
; Merge these settings into your Gitea custom/conf/app.ini.
;
; On a default Gitea install (Docker or bare-metal):
; /path/to/gitea/custom/conf/app.ini
;
; After editing, restart Gitea:
; systemctl restart gitea # bare-metal
; docker restart <gitea-container> # Docker
;
; See also: scripts/harden_gitea.sh (automated version)
[service]
; Disable public registration — only admins can create accounts
DISABLE_REGISTRATION = true
ALLOW_ONLY_EXTERNAL_REGISTRATION = false
SHOW_REGISTRATION_BUTTON = false
; Require sign-in to view any content (repos, explore, etc.)
REQUIRE_SIGNIN_VIEW = true

View File

@@ -17,8 +17,23 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
GITEA_API = "http://localhost:3000/api/v1"
REPO_SLUG = "rockachopa/Timmy-time-dashboard"
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
TAG_RE = re.compile(r"\[([^\]]+)\]")

169
scripts/harden_gitea.sh Executable file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea Hardening Script ──────────────────────────────────────────────────
#
# Disables public registration and requires sign-in to view content.
# Refs: Issue #988
#
# Usage (on the Gitea server):
# sudo bash scripts/harden_gitea.sh
# sudo bash scripts/harden_gitea.sh --config /path/to/custom/conf/app.ini
# sudo bash scripts/harden_gitea.sh --docker gitea # restart via docker
#
# What it does:
# 1. Patches [service] section in app.ini
# 2. Restarts Gitea so changes take effect
# 3. Verifies the changes are active
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
# ── Defaults ────────────────────────────────────────────────────────────────
# Common Gitea config paths (checked in order)
SEARCH_PATHS=(
"/etc/gitea/app.ini"
"/opt/gitea/custom/conf/app.ini"
"/data/gitea/conf/app.ini"
"/app/gitea/conf/app.ini"
)
CONFIG_PATH=""
DOCKER_CONTAINER=""
SYSTEMD_SERVICE="gitea"
# ── Parse arguments ─────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--config) CONFIG_PATH="$2"; shift 2 ;;
--docker) DOCKER_CONTAINER="$2"; shift 2 ;;
--service) SYSTEMD_SERVICE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--config /path/to/app.ini] [--docker container] [--service name]"
exit 0
;;
*) error "Unknown option: $1"; exit 1 ;;
esac
done
# ── Find config ─────────────────────────────────────────────────────────────
if [ -z "$CONFIG_PATH" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if [ -f "$path" ]; then
CONFIG_PATH="$path"
break
fi
done
fi
# If using Docker, try to find config inside the container
if [ -z "$CONFIG_PATH" ] && [ -n "$DOCKER_CONTAINER" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if docker exec "$DOCKER_CONTAINER" test -f "$path" 2>/dev/null; then
CONFIG_PATH="$path"
info "Found config inside container at $path"
break
fi
done
fi
if [ -z "$CONFIG_PATH" ]; then
error "Could not find Gitea app.ini. Use --config to specify the path."
exit 1
fi
info "Using config: $CONFIG_PATH"
# ── Backup ──────────────────────────────────────────────────────────────────
BACKUP="${CONFIG_PATH}.bak.$(date +%Y%m%d%H%M%S)"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" cp "$CONFIG_PATH" "$BACKUP"
else
cp "$CONFIG_PATH" "$BACKUP"
fi
info "Backup saved to $BACKUP"
# ── Apply settings ──────────────────────────────────────────────────────────
apply_setting() {
local key="$1"
local value="$2"
local file="$3"
if [ -n "$DOCKER_CONTAINER" ]; then
# Check if key exists (commented or not) and update, otherwise append to [service]
if docker exec "$DOCKER_CONTAINER" grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
docker exec "$DOCKER_CONTAINER" sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Append after [service] section header
docker exec "$DOCKER_CONTAINER" sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
else
if grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Ensure [service] section exists, then append
if ! grep -q '^\[service\]' "$file"; then
printf '\n[service]\n' >> "$file"
fi
sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
fi
}
info "Applying hardening settings..."
apply_setting "DISABLE_REGISTRATION" "true" "$CONFIG_PATH"
apply_setting "ALLOW_ONLY_EXTERNAL_REGISTRATION" "false" "$CONFIG_PATH"
apply_setting "SHOW_REGISTRATION_BUTTON" "false" "$CONFIG_PATH"
apply_setting "REQUIRE_SIGNIN_VIEW" "true" "$CONFIG_PATH"
info "Settings applied:"
info " DISABLE_REGISTRATION = true"
info " ALLOW_ONLY_EXTERNAL_REGISTRATION = false"
info " SHOW_REGISTRATION_BUTTON = false"
info " REQUIRE_SIGNIN_VIEW = true"
# ── Restart Gitea ───────────────────────────────────────────────────────────
echo ""
if [ -n "$DOCKER_CONTAINER" ]; then
info "Restarting Gitea container: $DOCKER_CONTAINER"
docker restart "$DOCKER_CONTAINER"
elif systemctl is-active --quiet "$SYSTEMD_SERVICE" 2>/dev/null; then
info "Restarting Gitea via systemd: $SYSTEMD_SERVICE"
systemctl restart "$SYSTEMD_SERVICE"
else
warn "Could not detect Gitea service. Restart Gitea manually to apply changes."
fi
# ── Verify ──────────────────────────────────────────────────────────────────
echo ""
info "Verification — current [service] settings:"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
else
grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
fi
echo ""
echo -e "${GREEN}${BOLD} Gitea hardening complete.${NC}"
echo ""
echo " Registration: DISABLED"
echo " Sign-in required to view: YES"
echo ""
echo " Backup: $BACKUP"
echo ""

View File

@@ -30,7 +30,22 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this
@@ -187,7 +202,11 @@ def load_queue() -> list[dict]:
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
except json.JSONDecodeError as exc:
print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue")
return []
except OSError as exc:
print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue")
return []

View File

@@ -20,11 +20,28 @@ from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
@@ -326,9 +343,38 @@ def run_triage() -> list[dict]:
ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Validate the write by re-reading and parsing
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry
retro_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),

View File

@@ -56,11 +56,13 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
"""Render the swarm mission control dashboard page."""
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
"""Render the bug tracking page."""
return templates.TemplateResponse(
request,
"bugs.html",
@@ -75,16 +77,19 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
"""Render the self-coding automation status page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Render the hands (automation executions) page."""
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
"""Render the creative UI playground page."""
return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -145,6 +145,7 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
@@ -164,6 +165,7 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
@@ -183,6 +185,7 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
@@ -241,26 +244,31 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved")
@@ -271,6 +279,7 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -7,7 +7,7 @@ without a running game server.
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface
@@ -81,9 +81,7 @@ class MockWorldAdapter(WorldInterface):
def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append(
_ActionLog(command=command, timestamp=datetime.now(UTC))
)
self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}",
@@ -92,8 +90,10 @@ class MockWorldAdapter(WorldInterface):
def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append({
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
})
self.speech_log.append(
{
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -35,14 +35,10 @@ class TES3MPWorldAdapter(WorldInterface):
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.connect() — wire up TES3MP server socket"
)
raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
def disconnect(self) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.disconnect() — close TES3MP server socket"
)
raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
@property
def is_connected(self) -> bool:
@@ -51,9 +47,7 @@ class TES3MPWorldAdapter(WorldInterface):
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
raise NotImplementedError(
"TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state"
)
raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError(
@@ -61,6 +55,4 @@ class TES3MPWorldAdapter(WorldInterface):
)
def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError(
"TES3MPWorldAdapter.speak() — send chat message via TES3MP"
)
raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")

View File

@@ -27,14 +27,14 @@ class WorldInterface(ABC):
# -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None:
def connect(self) -> None: # noqa: B027
"""Establish connection to the game world.
Default implementation is a no-op. Override to open sockets,
authenticate, etc.
"""
def disconnect(self) -> None:
def disconnect(self) -> None: # noqa: B027
"""Tear down the connection.
Default implementation is a no-op.

View File

@@ -10,10 +10,10 @@ from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from enum import StrEnum
class ActionStatus(str, Enum):
class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world."""
SUCCESS = "success"

View File

@@ -17,7 +17,7 @@ from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import asdict, dataclass, field
from dataclasses import dataclass, field
from datetime import UTC, datetime
from loop.phase1_gather import gather
@@ -32,6 +32,7 @@ logger = logging.getLogger(__name__)
# Cycle log entry
# ---------------------------------------------------------------------------
@dataclass
class CycleRecord:
"""One observe → reason → act → reflect cycle."""
@@ -50,6 +51,7 @@ class CycleRecord:
# Heartbeat
# ---------------------------------------------------------------------------
class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter.
@@ -268,14 +270,17 @@ class Heartbeat:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("heartbeat.cycle", {
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
})
await ws_manager.broadcast(
"heartbeat.cycle",
{
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc)

View File

@@ -10,7 +10,6 @@ from infrastructure.world.types import (
PerceptionOutput,
)
# ---------------------------------------------------------------------------
# Type construction
# ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.interface import WorldInterface
from infrastructure.world.registry import AdapterRegistry

View File

@@ -6,7 +6,6 @@ Acceptance criteria:
- WebSocket broadcasts include current action and reasoning summary
"""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
@@ -81,6 +80,7 @@ class TestHeartbeatWithAdapter:
@pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter):
received = []
async def callback(record):
received.append(record)
@@ -145,9 +145,7 @@ class TestHeartbeatBroadcast:
) as mock_ws:
mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat
with patch(
"infrastructure.ws_manager.handler.ws_manager"
) as ws_mod:
with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter)
await hb.run_once()

View File

@@ -0,0 +1,97 @@
"""Tests for load_queue corrupt JSON handling in loop_guard.py."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import scripts.loop_guard as lg
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect loop_guard paths to tmp_path for isolation."""
monkeypatch.setattr(lg, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(lg, "IDLE_STATE_FILE", tmp_path / "idle_state.json")
monkeypatch.setattr(lg, "CYCLE_RESULT_FILE", tmp_path / "cycle_result.json")
monkeypatch.setattr(lg, "GITEA_API", "http://test:3000/api/v1")
monkeypatch.setattr(lg, "REPO_SLUG", "owner/repo")
def test_load_queue_missing_file(tmp_path):
"""Missing queue file returns empty list."""
result = lg.load_queue()
assert result == []
def test_load_queue_valid_data(tmp_path):
"""Valid queue.json returns ready items."""
data = [
{"issue": 1, "title": "Ready issue", "ready": True},
{"issue": 2, "title": "Not ready", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert len(result) == 1
assert result[0]["issue"] == 1
def test_load_queue_corrupt_json_logs_warning(tmp_path, capsys):
"""Corrupt queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("not valid json {{{")
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Corrupt queue.json" in captured.out
def test_load_queue_not_a_list(tmp_path):
"""Queue.json that is not a list returns empty list."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps({"not": "a list"}))
result = lg.load_queue()
assert result == []
def test_load_queue_no_ready_items(tmp_path):
"""Queue with no ready items returns empty list."""
data = [
{"issue": 1, "title": "Not ready 1", "ready": False},
{"issue": 2, "title": "Not ready 2", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert result == []
def test_load_queue_oserror_logs_warning(tmp_path, monkeypatch, capsys):
"""OSError when reading queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("[]")
# Mock Path.read_text to raise OSError
original_read_text = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.name == "queue.json":
raise OSError("Permission denied")
return original_read_text(self, *args, **kwargs)
monkeypatch.setattr(Path, "read_text", mock_read_text)
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Cannot read queue.json" in captured.out

View File

@@ -0,0 +1,159 @@
"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"