1
0

Compare commits

...

5 Commits

Author SHA1 Message Date
Alexander Whitestone
110f67c567 fix: add Gitea hardening config and script to disable registration + require sign-in
Adds deploy/gitea/app.ini with hardened [service] settings and
scripts/harden_gitea.sh to apply them on the server. The script
backs up the existing config, patches the four required settings,
restarts Gitea, and verifies the changes.

Settings applied:
- DISABLE_REGISTRATION = true
- ALLOW_ONLY_EXTERNAL_REGISTRATION = false
- SHOW_REGISTRATION_BUTTON = false
- REQUIRE_SIGNIN_VIEW = true

Fixes #988

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 18:39:05 -04:00
e24f49e58d [kimi] Add JSON validation guard to queue.json writes (#952) (#995) 2026-03-22 22:33:40 +00:00
1fa5cff5dc [kimi] Fix GITEA_API configuration in triage scripts (#951) (#994) 2026-03-22 22:28:23 +00:00
e255e7eb2a [kimi] Add docstrings to system.py route handlers (#940) (#992) 2026-03-22 22:12:36 +00:00
c3b6eb71c0 [kimi] Add docstrings to src/dashboard/routes/tasks.py (#939) (#991) 2026-03-22 22:08:28 +00:00
17 changed files with 579 additions and 46 deletions

21
deploy/gitea/app.ini Normal file
View File

@@ -0,0 +1,21 @@
; ── Gitea Hardening — Security Overrides ─────────────────────────────────────
;
; Merge these settings into your Gitea custom/conf/app.ini.
;
; On a default Gitea install (Docker or bare-metal):
; /path/to/gitea/custom/conf/app.ini
;
; After editing, restart Gitea:
; systemctl restart gitea # bare-metal
; docker restart <gitea-container> # Docker
;
; See also: scripts/harden_gitea.sh (automated version)
[service]
; Disable public registration — only admins can create accounts
DISABLE_REGISTRATION = true
ALLOW_ONLY_EXTERNAL_REGISTRATION = false
SHOW_REGISTRATION_BUTTON = false
; Require sign-in to view any content (repos, explore, etc.)
REQUIRE_SIGNIN_VIEW = true

View File

@@ -17,8 +17,23 @@ REPO_ROOT = Path(__file__).resolve().parent.parent
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json" SUMMARY_FILE = REPO_ROOT / ".loop" / "retro" / "summary.json"
GITEA_API = "http://localhost:3000/api/v1"
REPO_SLUG = "rockachopa/Timmy-time-dashboard" def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
TAG_RE = re.compile(r"\[([^\]]+)\]") TAG_RE = re.compile(r"\[([^\]]+)\]")

169
scripts/harden_gitea.sh Executable file
View File

@@ -0,0 +1,169 @@
#!/usr/bin/env bash
set -euo pipefail
# ── Gitea Hardening Script ──────────────────────────────────────────────────
#
# Disables public registration and requires sign-in to view content.
# Refs: Issue #988
#
# Usage (on the Gitea server):
# sudo bash scripts/harden_gitea.sh
# sudo bash scripts/harden_gitea.sh --config /path/to/custom/conf/app.ini
# sudo bash scripts/harden_gitea.sh --docker gitea # restart via docker
#
# What it does:
# 1. Patches [service] section in app.ini
# 2. Restarts Gitea so changes take effect
# 3. Verifies the changes are active
BOLD='\033[1m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m'
info() { echo -e "${GREEN}[+]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
error() { echo -e "${RED}[x]${NC} $1"; }
# ── Defaults ────────────────────────────────────────────────────────────────
# Common Gitea config paths (checked in order)
SEARCH_PATHS=(
"/etc/gitea/app.ini"
"/opt/gitea/custom/conf/app.ini"
"/data/gitea/conf/app.ini"
"/app/gitea/conf/app.ini"
)
CONFIG_PATH=""
DOCKER_CONTAINER=""
SYSTEMD_SERVICE="gitea"
# ── Parse arguments ─────────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--config) CONFIG_PATH="$2"; shift 2 ;;
--docker) DOCKER_CONTAINER="$2"; shift 2 ;;
--service) SYSTEMD_SERVICE="$2"; shift 2 ;;
-h|--help)
echo "Usage: $0 [--config /path/to/app.ini] [--docker container] [--service name]"
exit 0
;;
*) error "Unknown option: $1"; exit 1 ;;
esac
done
# ── Find config ─────────────────────────────────────────────────────────────
if [ -z "$CONFIG_PATH" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if [ -f "$path" ]; then
CONFIG_PATH="$path"
break
fi
done
fi
# If using Docker, try to find config inside the container
if [ -z "$CONFIG_PATH" ] && [ -n "$DOCKER_CONTAINER" ]; then
for path in "${SEARCH_PATHS[@]}"; do
if docker exec "$DOCKER_CONTAINER" test -f "$path" 2>/dev/null; then
CONFIG_PATH="$path"
info "Found config inside container at $path"
break
fi
done
fi
if [ -z "$CONFIG_PATH" ]; then
error "Could not find Gitea app.ini. Use --config to specify the path."
exit 1
fi
info "Using config: $CONFIG_PATH"
# ── Backup ──────────────────────────────────────────────────────────────────
BACKUP="${CONFIG_PATH}.bak.$(date +%Y%m%d%H%M%S)"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" cp "$CONFIG_PATH" "$BACKUP"
else
cp "$CONFIG_PATH" "$BACKUP"
fi
info "Backup saved to $BACKUP"
# ── Apply settings ──────────────────────────────────────────────────────────
apply_setting() {
local key="$1"
local value="$2"
local file="$3"
if [ -n "$DOCKER_CONTAINER" ]; then
# Check if key exists (commented or not) and update, otherwise append to [service]
if docker exec "$DOCKER_CONTAINER" grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
docker exec "$DOCKER_CONTAINER" sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Append after [service] section header
docker exec "$DOCKER_CONTAINER" sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
else
if grep -qE "^;?\s*${key}\s*=" "$file" 2>/dev/null; then
sed -i "s|^;*\s*${key}\s*=.*|${key} = ${value}|" "$file"
else
# Ensure [service] section exists, then append
if ! grep -q '^\[service\]' "$file"; then
printf '\n[service]\n' >> "$file"
fi
sed -i "/^\[service\]/a ${key} = ${value}" "$file"
fi
fi
}
info "Applying hardening settings..."
apply_setting "DISABLE_REGISTRATION" "true" "$CONFIG_PATH"
apply_setting "ALLOW_ONLY_EXTERNAL_REGISTRATION" "false" "$CONFIG_PATH"
apply_setting "SHOW_REGISTRATION_BUTTON" "false" "$CONFIG_PATH"
apply_setting "REQUIRE_SIGNIN_VIEW" "true" "$CONFIG_PATH"
info "Settings applied:"
info " DISABLE_REGISTRATION = true"
info " ALLOW_ONLY_EXTERNAL_REGISTRATION = false"
info " SHOW_REGISTRATION_BUTTON = false"
info " REQUIRE_SIGNIN_VIEW = true"
# ── Restart Gitea ───────────────────────────────────────────────────────────
echo ""
if [ -n "$DOCKER_CONTAINER" ]; then
info "Restarting Gitea container: $DOCKER_CONTAINER"
docker restart "$DOCKER_CONTAINER"
elif systemctl is-active --quiet "$SYSTEMD_SERVICE" 2>/dev/null; then
info "Restarting Gitea via systemd: $SYSTEMD_SERVICE"
systemctl restart "$SYSTEMD_SERVICE"
else
warn "Could not detect Gitea service. Restart Gitea manually to apply changes."
fi
# ── Verify ──────────────────────────────────────────────────────────────────
echo ""
info "Verification — current [service] settings:"
if [ -n "$DOCKER_CONTAINER" ]; then
docker exec "$DOCKER_CONTAINER" grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
else
grep -A 20 '^\[service\]' "$CONFIG_PATH" | head -25
fi
echo ""
echo -e "${GREEN}${BOLD} Gitea hardening complete.${NC}"
echo ""
echo " Registration: DISABLED"
echo " Sign-in required to view: YES"
echo ""
echo " Backup: $BACKUP"
echo ""

View File

@@ -30,7 +30,22 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json" CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this # Default cycle duration in seconds (5 min); stale threshold = 2× this
@@ -187,7 +202,11 @@ def load_queue() -> list[dict]:
# Persist the cleaned queue so stale entries don't recur # Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers) _save_cleaned_queue(data, open_numbers)
return ready return ready
except (json.JSONDecodeError, OSError): except json.JSONDecodeError as exc:
print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue")
return []
except OSError as exc:
print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue")
return [] return []

View File

@@ -20,11 +20,28 @@ from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
# ── Config ────────────────────────────────────────────────────────────── # ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
def _get_gitea_api() -> str:
"""Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default."""
# Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility)
api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API")
if api_url:
return api_url
# Check ~/.hermes/gitea_api file
api_file = Path.home() / ".hermes" / "gitea_api"
if api_file.exists():
return api_file.read_text().strip()
# Default fallback
return "http://localhost:3000/api/v1"
GITEA_API = _get_gitea_api()
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json" QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
QUEUE_BACKUP_FILE = REPO_ROOT / ".loop" / "queue.json.bak"
RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl" RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "triage.jsonl"
QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json" QUARANTINE_FILE = REPO_ROOT / ".loop" / "quarantine.json"
CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl" CYCLE_RETRO_FILE = REPO_ROOT / ".loop" / "retro" / "cycles.jsonl"
@@ -326,9 +343,38 @@ def run_triage() -> list[dict]:
ready = [s for s in scored if s["ready"]] ready = [s for s in scored if s["ready"]]
not_ready = [s for s in scored if not s["ready"]] not_ready = [s for s in scored if not s["ready"]]
# Save backup before writing (if current file exists and is valid)
if QUEUE_FILE.exists():
try:
json.loads(QUEUE_FILE.read_text()) # Validate current file
QUEUE_BACKUP_FILE.write_text(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Current file is corrupt, don't overwrite backup
# Write new queue file
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n") QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
# Validate the write by re-reading and parsing
try:
json.loads(QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError) as exc:
print(f"[triage] ERROR: queue.json validation failed: {exc}", file=sys.stderr)
# Restore from backup if available
if QUEUE_BACKUP_FILE.exists():
try:
backup_data = QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
QUEUE_FILE.write_text(backup_data)
print(f"[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError) as restore_exc:
print(f"[triage] ERROR: Backup restore failed: {restore_exc}", file=sys.stderr)
# Write empty list as last resort
QUEUE_FILE.write_text("[]\n")
else:
# No backup, write empty list
QUEUE_FILE.write_text("[]\n")
# Write retro entry # Write retro entry
retro_entry = { retro_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),

View File

@@ -56,11 +56,13 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse) @router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request): async def mission_control(request: Request):
"""Render the swarm mission control dashboard page."""
return templates.TemplateResponse(request, "mission_control.html", {}) return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse) @router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request): async def bugs_page(request: Request):
"""Render the bug tracking page."""
return templates.TemplateResponse( return templates.TemplateResponse(
request, request,
"bugs.html", "bugs.html",
@@ -75,16 +77,19 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse) @router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request): async def self_coding(request: Request):
"""Render the self-coding automation status page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}}) return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse) @router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request): async def hands_page(request: Request):
"""Render the hands (automation executions) page."""
return templates.TemplateResponse(request, "hands.html", {"executions": []}) return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse) @router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request): async def creative_ui(request: Request):
"""Render the creative UI playground page."""
return templates.TemplateResponse(request, "creative.html", {}) return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -145,6 +145,7 @@ async def tasks_page(request: Request):
@router.get("/tasks/pending", response_class=HTMLResponse) @router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request): async def tasks_pending(request: Request):
"""Return HTMX partial for pending approval tasks."""
with _get_db() as db: with _get_db() as db:
rows = db.execute( rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC" "SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
@@ -164,6 +165,7 @@ async def tasks_pending(request: Request):
@router.get("/tasks/active", response_class=HTMLResponse) @router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request): async def tasks_active(request: Request):
"""Return HTMX partial for active (approved/running/paused) tasks."""
with _get_db() as db: with _get_db() as db:
rows = db.execute( rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC" "SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
@@ -183,6 +185,7 @@ async def tasks_active(request: Request):
@router.get("/tasks/completed", response_class=HTMLResponse) @router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request): async def tasks_completed(request: Request):
"""Return HTMX partial for completed/vetoed/failed tasks (last 50)."""
with _get_db() as db: with _get_db() as db:
rows = db.execute( rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50" "SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
@@ -241,26 +244,31 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse) @router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str): async def approve_task(request: Request, task_id: str):
"""Approve a pending task and move it to active queue."""
return await _set_status(request, task_id, "approved") return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse) @router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str): async def veto_task(request: Request, task_id: str):
"""Veto a task, marking it as rejected."""
return await _set_status(request, task_id, "vetoed") return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse) @router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str): async def pause_task(request: Request, task_id: str):
"""Pause a running or approved task."""
return await _set_status(request, task_id, "paused") return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse) @router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str): async def cancel_task(request: Request, task_id: str):
"""Cancel a task (marks as vetoed)."""
return await _set_status(request, task_id, "vetoed") return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse) @router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str): async def retry_task(request: Request, task_id: str):
"""Retry a failed/vetoed task by moving it back to approved."""
return await _set_status(request, task_id, "approved") return await _set_status(request, task_id, "approved")
@@ -271,6 +279,7 @@ async def modify_task(
title: str = Form(...), title: str = Form(...),
description: str = Form(""), description: str = Form(""),
): ):
"""Update task title and description."""
with _get_db() as db: with _get_db() as db:
db.execute( db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?", "UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -7,7 +7,7 @@ without a running game server.
from __future__ import annotations from __future__ import annotations
import logging import logging
from dataclasses import dataclass, field from dataclasses import dataclass
from datetime import UTC, datetime from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface from infrastructure.world.interface import WorldInterface
@@ -81,9 +81,7 @@ class MockWorldAdapter(WorldInterface):
def act(self, command: CommandInput) -> ActionResult: def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action) logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append( self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
_ActionLog(command=command, timestamp=datetime.now(UTC))
)
return ActionResult( return ActionResult(
status=ActionStatus.SUCCESS, status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}", message=f"Mock executed: {command.action}",
@@ -92,8 +90,10 @@ class MockWorldAdapter(WorldInterface):
def speak(self, message: str, target: str | None = None) -> None: def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target) logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append({ self.speech_log.append(
"message": message, {
"target": target, "message": message,
"timestamp": datetime.now(UTC).isoformat(), "target": target,
}) "timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -35,14 +35,10 @@ class TES3MPWorldAdapter(WorldInterface):
# -- lifecycle --------------------------------------------------------- # -- lifecycle ---------------------------------------------------------
def connect(self) -> None: def connect(self) -> None:
raise NotImplementedError( raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
"TES3MPWorldAdapter.connect() — wire up TES3MP server socket"
)
def disconnect(self) -> None: def disconnect(self) -> None:
raise NotImplementedError( raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
"TES3MPWorldAdapter.disconnect() — close TES3MP server socket"
)
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
@@ -51,9 +47,7 @@ class TES3MPWorldAdapter(WorldInterface):
# -- core contract (stubs) --------------------------------------------- # -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput: def observe(self) -> PerceptionOutput:
raise NotImplementedError( raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
"TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state"
)
def act(self, command: CommandInput) -> ActionResult: def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError( raise NotImplementedError(
@@ -61,6 +55,4 @@ class TES3MPWorldAdapter(WorldInterface):
) )
def speak(self, message: str, target: str | None = None) -> None: def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError( raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")
"TES3MPWorldAdapter.speak() — send chat message via TES3MP"
)

View File

@@ -27,14 +27,14 @@ class WorldInterface(ABC):
# -- lifecycle (optional overrides) ------------------------------------ # -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None: def connect(self) -> None: # noqa: B027
"""Establish connection to the game world. """Establish connection to the game world.
Default implementation is a no-op. Override to open sockets, Default implementation is a no-op. Override to open sockets,
authenticate, etc. authenticate, etc.
""" """
def disconnect(self) -> None: def disconnect(self) -> None: # noqa: B027
"""Tear down the connection. """Tear down the connection.
Default implementation is a no-op. Default implementation is a no-op.

View File

@@ -10,10 +10,10 @@ from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import UTC, datetime from datetime import UTC, datetime
from enum import Enum from enum import StrEnum
class ActionStatus(str, Enum): class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world.""" """Outcome of an action dispatched to the world."""
SUCCESS = "success" SUCCESS = "success"

View File

@@ -17,7 +17,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import time import time
from dataclasses import asdict, dataclass, field from dataclasses import dataclass, field
from datetime import UTC, datetime from datetime import UTC, datetime
from loop.phase1_gather import gather from loop.phase1_gather import gather
@@ -32,6 +32,7 @@ logger = logging.getLogger(__name__)
# Cycle log entry # Cycle log entry
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@dataclass @dataclass
class CycleRecord: class CycleRecord:
"""One observe → reason → act → reflect cycle.""" """One observe → reason → act → reflect cycle."""
@@ -50,6 +51,7 @@ class CycleRecord:
# Heartbeat # Heartbeat
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
class Heartbeat: class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter. """Manages the recurring cognitive loop with optional world adapter.
@@ -268,14 +270,17 @@ class Heartbeat:
try: try:
from infrastructure.ws_manager.handler import ws_manager from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("heartbeat.cycle", { await ws_manager.broadcast(
"cycle_id": record.cycle_id, "heartbeat.cycle",
"timestamp": record.timestamp, {
"action": record.action_taken, "cycle_id": record.cycle_id,
"action_status": record.action_status, "timestamp": record.timestamp,
"reasoning_summary": record.reasoning_summary[:300], "action": record.action_taken,
"observation": record.observation, "action_status": record.action_status,
"duration_ms": record.duration_ms, "reasoning_summary": record.reasoning_summary[:300],
}) "observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc: except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc) logger.debug("Heartbeat broadcast skipped: %s", exc)

View File

@@ -10,7 +10,6 @@ from infrastructure.world.types import (
PerceptionOutput, PerceptionOutput,
) )
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Type construction # Type construction
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -3,7 +3,6 @@
import pytest import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.interface import WorldInterface
from infrastructure.world.registry import AdapterRegistry from infrastructure.world.registry import AdapterRegistry

View File

@@ -6,7 +6,6 @@ Acceptance criteria:
- WebSocket broadcasts include current action and reasoning summary - WebSocket broadcasts include current action and reasoning summary
""" """
import asyncio
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
import pytest import pytest
@@ -81,6 +80,7 @@ class TestHeartbeatWithAdapter:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter): async def test_on_cycle_callback(self, mock_adapter):
received = [] received = []
async def callback(record): async def callback(record):
received.append(record) received.append(record)
@@ -145,9 +145,7 @@ class TestHeartbeatBroadcast:
) as mock_ws: ) as mock_ws:
mock_ws.broadcast = AsyncMock() mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat # Patch the import inside heartbeat
with patch( with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
"infrastructure.ws_manager.handler.ws_manager"
) as ws_mod:
ws_mod.broadcast = AsyncMock() ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter) hb = Heartbeat(world=mock_adapter)
await hb.run_once() await hb.run_once()

View File

@@ -0,0 +1,97 @@
"""Tests for load_queue corrupt JSON handling in loop_guard.py."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
import scripts.loop_guard as lg
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect loop_guard paths to tmp_path for isolation."""
monkeypatch.setattr(lg, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(lg, "IDLE_STATE_FILE", tmp_path / "idle_state.json")
monkeypatch.setattr(lg, "CYCLE_RESULT_FILE", tmp_path / "cycle_result.json")
monkeypatch.setattr(lg, "GITEA_API", "http://test:3000/api/v1")
monkeypatch.setattr(lg, "REPO_SLUG", "owner/repo")
def test_load_queue_missing_file(tmp_path):
"""Missing queue file returns empty list."""
result = lg.load_queue()
assert result == []
def test_load_queue_valid_data(tmp_path):
"""Valid queue.json returns ready items."""
data = [
{"issue": 1, "title": "Ready issue", "ready": True},
{"issue": 2, "title": "Not ready", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert len(result) == 1
assert result[0]["issue"] == 1
def test_load_queue_corrupt_json_logs_warning(tmp_path, capsys):
"""Corrupt queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("not valid json {{{")
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Corrupt queue.json" in captured.out
def test_load_queue_not_a_list(tmp_path):
"""Queue.json that is not a list returns empty list."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps({"not": "a list"}))
result = lg.load_queue()
assert result == []
def test_load_queue_no_ready_items(tmp_path):
"""Queue with no ready items returns empty list."""
data = [
{"issue": 1, "title": "Not ready 1", "ready": False},
{"issue": 2, "title": "Not ready 2", "ready": False},
]
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text(json.dumps(data, indent=2))
result = lg.load_queue()
assert result == []
def test_load_queue_oserror_logs_warning(tmp_path, monkeypatch, capsys):
"""OSError when reading queue.json returns empty list and logs warning."""
lg.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
lg.QUEUE_FILE.write_text("[]")
# Mock Path.read_text to raise OSError
original_read_text = Path.read_text
def mock_read_text(self, *args, **kwargs):
if self.name == "queue.json":
raise OSError("Permission denied")
return original_read_text(self, *args, **kwargs)
monkeypatch.setattr(Path, "read_text", mock_read_text)
result = lg.load_queue()
assert result == []
captured = capsys.readouterr()
assert "WARNING" in captured.out
assert "Cannot read queue.json" in captured.out

View File

@@ -0,0 +1,159 @@
"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"