1
0

Compare commits

..

1 Commits

Author SHA1 Message Date
hermes
7f13f540b3 fix: loop_guard GITEA_API default + queue.json validation guard (#951, #952) 2026-03-22 13:51:59 -04:00
9 changed files with 24 additions and 540 deletions

View File

@@ -1,21 +0,0 @@
; ── Gitea Hardening — Security Overrides ─────────────────────────────────────
;
; Merge these settings into your Gitea custom/conf/app.ini.
;
; On a default Gitea install (Docker or bare-metal):
; /path/to/gitea/custom/conf/app.ini
;
; After editing, restart Gitea:
; systemctl restart gitea # bare-metal
; docker restart <gitea-container> # Docker
;
; See also: scripts/harden_gitea.sh (automated version)
[service]
; Disable public registration — only admins can create accounts
DISABLE_REGISTRATION = true
ALLOW_ONLY_EXTERNAL_REGISTRATION = false
SHOW_REGISTRATION_BUTTON = false
; Require sign-in to view any content (repos, explore, etc.)
REQUIRE_SIGNIN_VIEW = true

View File

@@ -17,23 +17,8 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
GITEA_API = "http://localhost:3000/api/v1"
REPO_SLUG = "rockachopa/Timmy-time-dashboard"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
TAG_RE = re.compile(r"\[([^\]]+)\]")

View File

@@ -1,169 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea Hardening Script ──────────────────────────────────────────────────
#
# Disables public registration and requires sign-in to view content.
# Refs: Issue #988
#
# Usage (on the Gitea server):
# sudo bash scripts/harden_gitea.sh
# sudo bash scripts/harden_gitea.sh --config /path/to/custom/conf/app.ini
# sudo bash scripts/harden_gitea.sh --docker gitea # restart via docker
#
# What it does:
# 1. Patches [service] section in app.ini
# 2. Restarts Gitea so changes take effect
# 3. Verifies the changes are active
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
# ── Defaults ────────────────────────────────────────────────────────────────
# Common Gitea config paths (checked in order)
SEARCH_PATHS=(
"/etc/gitea/app.ini"
"/opt/gitea/custom/conf/app.ini"
"/data/gitea/conf/app.ini"
"/app/gitea/conf/app.ini"
)
CONFIG_PATH=""
DOCKER_CONTAINER=""
SYSTEMD_SERVICE="gitea"
# ── Parse arguments ─────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--config) CONFIG_PATH="$2"; shift 2 ;;
--docker) DOCKER_CONTAINER="$2"; shift 2 ;;
--service) SYSTEMD_SERVICE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--config /path/to/app.ini] [--docker container] [--service name]"
exit 0
;;
*) error "Unknown option: $1"; exit 1 ;;
esac
done
# ── Find config ─────────────────────────────────────────────────────────────
if [ -z "$CONFIG_PATH" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if [ -f "$path" ]; then
CONFIG_PATH="$path"
break
fi
done
fi
# If using Docker, try to find config inside the container
if [ -z "$CONFIG_PATH" ] && [ -n "$DOCKER_CONTAINER" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if docker exec "$DOCKER_CONTAINER" test -f "$path" 2>/dev/null; then
CONFIG_PATH="$path"
info "Found config inside container at $path"
break
fi
done
fi
if [ -z "$CONFIG_PATH" ]; then
error "Could not find Gitea app.ini. Use --config to specify the path."
exit 1
fi
info "Using config: $CONFIG_PATH"
# ── Backup ──────────────────────────────────────────────────────────────────
BACKUP="${CONFIG_PATH}.bak.$(date +%Y%m%d%H%M%S)"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" cp "$CONFIG_PATH" "$BACKUP"
else
cp "$CONFIG_PATH" "$BACKUP"
fi
info "Backup saved to $BACKUP"
# ── Apply settings ──────────────────────────────────────────────────────────
apply_setting() {
local key="$1"
local value="$2"
local file="$3"
if [ -n "$DOCKER_CONTAINER" ]; then
# Check if key exists (commented or not) and update, otherwise append to [service]
if docker exec "$DOCKER_CONTAINER" grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
docker exec "$DOCKER_CONTAINER" sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Append after [service] section header
docker exec "$DOCKER_CONTAINER" sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
else
if grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Ensure [service] section exists, then append
if ! grep -q '^\[service\]' "$file"; then
printf '\n[service]\n' >> "$file"
fi
sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
fi
}
info "Applying hardening settings..."
apply_setting "DISABLE_REGISTRATION" "true" "$CONFIG_PATH"
apply_setting "ALLOW_ONLY_EXTERNAL_REGISTRATION" "false" "$CONFIG_PATH"
apply_setting "SHOW_REGISTRATION_BUTTON" "false" "$CONFIG_PATH"
apply_setting "REQUIRE_SIGNIN_VIEW" "true" "$CONFIG_PATH"
info "Settings applied:"
info " DISABLE_REGISTRATION = true"
info " ALLOW_ONLY_EXTERNAL_REGISTRATION = false"
info " SHOW_REGISTRATION_BUTTON = false"
info " REQUIRE_SIGNIN_VIEW = true"
# ── Restart Gitea ───────────────────────────────────────────────────────────
echo ""
if [ -n "$DOCKER_CONTAINER" ]; then
info "Restarting Gitea container: $DOCKER_CONTAINER"
docker restart "$DOCKER_CONTAINER"
elif systemctl is-active --quiet "$SYSTEMD_SERVICE" 2>/dev/null; then
info "Restarting Gitea via systemd: $SYSTEMD_SERVICE"
systemctl restart "$SYSTEMD_SERVICE"
else
warn "Could not detect Gitea service. Restart Gitea manually to apply changes."
fi
# ── Verify ──────────────────────────────────────────────────────────────────
echo ""
info "Verification — current [service] settings:"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
else
grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
fi
echo ""
echo -e "${GREEN}${BOLD} Gitea hardening complete.${NC}"
echo ""
echo " Registration: DISABLED"
echo " Sign-in required to view: YES"
echo ""
echo " Backup: $BACKUP"
echo ""

View File

@@ -30,22 +30,7 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this
@@ -202,11 +187,7 @@ def load_queue() -> list[dict]:
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except json.JSONDecodeError as exc:
print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue")
return []
except OSError as exc:
print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue")
except (json.JSONDecodeError, OSError):
return []

View File

@@ -20,28 +20,11 @@ from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
@@ -343,36 +326,31 @@ def run_triage() -> list[dict]:
ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
backup_file = QUEUE_FILE.with_suffix(".json.bak")
# Backup existing queue before overwriting
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
backup_file.write_text(QUEUE_FILE.read_text())
except OSError:
pass
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Write and validate
queue_json = json.dumps(ready, indent=2) + "\n"
QUEUE_FILE.write_text(queue_json)
# Validate the write by re-reading and parsing
# Validate by re-reading — restore backup on corruption
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
validated = json.loads(QUEUE_FILE.read_text())
if not isinstance(validated, list):
raise ValueError("queue.json is not a list")
except (json.JSONDecodeError, ValueError) as e:
print(f"[triage] ERROR: queue.json validation failed: {e}", file=sys.stderr)
if backup_file.exists():
print("[triage] Restoring from backup", file=sys.stderr)
QUEUE_FILE.write_text(backup_file.read_text())
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry

View File

@@ -56,13 +56,11 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
"""Render the swarm mission control dashboard page."""
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
"""Render the bug tracking page."""
return templates.TemplateResponse(
request,
"bugs.html",
@@ -77,19 +75,16 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
"""Render the self-coding automation status page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Render the hands (automation executions) page."""
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
"""Render the creative UI playground page."""
return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -145,7 +145,6 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
@@ -165,7 +164,6 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
@@ -185,7 +183,6 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
@@ -244,31 +241,26 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved")
@@ -279,7 +271,6 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
"""Update task title and description."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -1,97 +0,0 @@
"""Tests for load_queue corrupt JSON handling in loop_guard.py."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import scripts.loop_guard as lg
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect loop_guard paths to tmp_path for isolation."""
monkeypatch.setattr(lg, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(lg, "IDLE_STATE_FILE", tmp_path / "idle_state.json")
monkeypatch.setattr(lg, "CYCLE_RESULT_FILE", tmp_path / "cycle_result.json")
monkeypatch.setattr(lg, "GITEA_API", "http://test:3000/api/v1")
monkeypatch.setattr(lg, "REPO_SLUG", "owner/repo")
def test_load_queue_missing_file(tmp_path):
"""Missing queue file returns empty list."""
result = lg.load_queue()
assert result == []
def test_load_queue_valid_data(tmp_path):
"""Valid queue.json returns ready items."""
data = [
{"issue": 1, "title": "Ready issue", "ready": True},
{"issue": 2, "title": "Not ready", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert len(result) == 1
assert result[0]["issue"] == 1
def test_load_queue_corrupt_json_logs_warning(tmp_path, capsys):
"""Corrupt queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("not valid json {{{")
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Corrupt queue.json" in captured.out
def test_load_queue_not_a_list(tmp_path):
"""Queue.json that is not a list returns empty list."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps({"not": "a list"}))
result = lg.load_queue()
assert result == []
def test_load_queue_no_ready_items(tmp_path):
"""Queue with no ready items returns empty list."""
data = [
{"issue": 1, "title": "Not ready 1", "ready": False},
{"issue": 2, "title": "Not ready 2", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert result == []
def test_load_queue_oserror_logs_warning(tmp_path, monkeypatch, capsys):
"""OSError when reading queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("[]")
# Mock Path.read_text to raise OSError
original_read_text = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.name == "queue.json":
raise OSError("Permission denied")
return original_read_text(self, *args, **kwargs)
monkeypatch.setattr(Path, "read_text", mock_read_text)
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Cannot read queue.json" in captured.out

View File

@@ -1,159 +0,0 @@
"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"