Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
4ab7a6f6e6 feat: Session Sovereignty Report Generator (#957)
Some checks failed
Tests / lint (pull_request) Failing after 16s
Tests / test (pull_request) Has been skipped
- Add `src/timmy/sovereignty/session_report.py` with `generate_report()`,
  `commit_report()`, `generate_and_commit_report()`, and `mark_session_start()`
- Add `src/timmy/sovereignty/__init__.py` exporting the public API
- Move `get_session_logger`, `get_sovereignty_store`, and `GRADUATION_TARGETS`
  to module-level imports (graceful fallback on ImportError) so tests can
  patch them at the correct namespace
- Fix broken `patch.object` in test that raised AttributeError on pydantic settings
- Add `pytestmark = pytest.mark.unit` so tests run under `tox -e unit`
- All 23 sovereignty report tests pass

Fixes #957

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:44:36 -04:00
Alexander Whitestone
4150ab7372 WIP: Claude Code progress on #957
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:58:58 -04:00
25 changed files with 1047 additions and 116 deletions

View File

@@ -33,12 +33,12 @@ from dashboard.routes.calm import router as calm_router
from dashboard.routes.chat_api import router as chat_api_router
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
from dashboard.routes.daily_run import router as daily_run_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
from dashboard.routes.grok import router as grok_router
from dashboard.routes.health import router as health_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.loop_qa import router as loop_qa_router
from dashboard.routes.memory import router as memory_router
from dashboard.routes.mobile import router as mobile_router
@@ -547,12 +547,28 @@ async def lifespan(app: FastAPI):
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
app = FastAPI(
title="Mission Control",

View File

@@ -41,7 +41,6 @@ def _save_voice_settings(data: dict) -> None:
except Exception as exc:
logger.warning("Failed to save voice settings: %s", exc)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/voice", tags=["voice"])

View File

@@ -4,6 +4,6 @@ Monitors the local machine (Hermes/M3 Max) for memory pressure, disk usage,
Ollama model health, zombie processes, and network connectivity.
"""
from infrastructure.hermes.monitor import HealthLevel, HealthReport, HermesMonitor, hermes_monitor
from infrastructure.hermes.monitor import HermesMonitor, HealthLevel, HealthReport, hermes_monitor
__all__ = ["HermesMonitor", "HealthLevel", "HealthReport", "hermes_monitor"]

View File

@@ -19,12 +19,11 @@ import json
import logging
import shutil
import subprocess
import tempfile
import time
import urllib.request
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
from enum import Enum
from typing import Any
from config import settings
@@ -32,7 +31,7 @@ from config import settings
logger = logging.getLogger(__name__)
class HealthLevel(StrEnum):
class HealthLevel(str, Enum):
"""Severity level for a health check result."""
OK = "ok"
@@ -195,7 +194,8 @@ class HermesMonitor:
name="memory",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free (threshold: {memory_free_min_gb}GB)"
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {memory_free_min_gb}GB)"
),
details=details,
needs_human=True,
@@ -302,7 +302,8 @@ class HermesMonitor:
name="disk",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free (threshold: {disk_free_min_gb}GB)"
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {disk_free_min_gb}GB)"
),
details=details,
needs_human=True,
@@ -334,7 +335,7 @@ class HermesMonitor:
cutoff = time.time() - 86400 # 24 hours ago
try:
tmp = Path(tempfile.gettempdir())
tmp = Path("/tmp")
for item in tmp.iterdir():
try:
stat = item.stat()
@@ -344,7 +345,11 @@ class HermesMonitor:
freed_bytes += stat.st_size
item.unlink(missing_ok=True)
elif item.is_dir():
dir_size = sum(f.stat().st_size for f in item.rglob("*") if f.is_file())
dir_size = sum(
f.stat().st_size
for f in item.rglob("*")
if f.is_file()
)
freed_bytes += dir_size
shutil.rmtree(str(item), ignore_errors=True)
except (PermissionError, OSError):
@@ -387,7 +392,10 @@ class HermesMonitor:
return CheckResult(
name="ollama",
level=HealthLevel.OK,
message=(f"Ollama OK — {len(models)} model(s) available, {len(loaded)} loaded"),
message=(
f"Ollama OK — {len(models)} model(s) available, "
f"{len(loaded)} loaded"
),
details={
"reachable": True,
"model_count": len(models),

View File

@@ -135,9 +135,7 @@ class BannerlordObserver:
self._host = host or settings.gabs_host
self._port = port or settings.gabs_port
self._timeout = timeout if timeout is not None else settings.gabs_timeout
self._poll_interval = (
poll_interval if poll_interval is not None else settings.gabs_poll_interval
)
self._poll_interval = poll_interval if poll_interval is not None else settings.gabs_poll_interval
self._journal_path = Path(journal_path) if journal_path else _get_journal_path()
self._entry_count = 0
self._days_observed: set[str] = set()

View File

@@ -196,7 +196,9 @@ class EmotionalStateTracker:
"intensity_label": _intensity_label(self.state.intensity),
"previous_emotion": self.state.previous_emotion,
"trigger_event": self.state.trigger_event,
"prompt_modifier": EMOTION_PROMPT_MODIFIERS.get(self.state.current_emotion, ""),
"prompt_modifier": EMOTION_PROMPT_MODIFIERS.get(
self.state.current_emotion, ""
),
}
def get_prompt_modifier(self) -> str:

View File

@@ -36,7 +36,7 @@ import asyncio
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
@@ -70,9 +70,7 @@ _LOOP_TAG = "loop-generated"
# Regex patterns for scoring
_TAG_RE = re.compile(r"\[([^\]]+)\]")
_FILE_RE = re.compile(
r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE
)
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
_ACCEPT_RE = re.compile(
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
@@ -453,7 +451,9 @@ async def add_label(
# Apply to the issue
apply_url = _repo_url(f"issues/{issue_number}/labels")
apply_resp = await client.post(apply_url, headers=headers, json={"labels": [label_id]})
apply_resp = await client.post(
apply_url, headers=headers, json={"labels": [label_id]}
)
return apply_resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
@@ -692,9 +692,7 @@ class BacklogTriageLoop:
# 1. Fetch
raw_issues = await fetch_open_issues(client)
result.total_open = len(raw_issues)
logger.info(
"Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues)
)
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
# 2. Score
scored = [score_issue(i) for i in raw_issues]

View File

@@ -37,7 +37,7 @@ from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from enum import StrEnum
from enum import Enum
from typing import Any
from config import settings
@@ -48,8 +48,7 @@ logger = logging.getLogger(__name__)
# Enumerations
# ---------------------------------------------------------------------------
class AgentType(StrEnum):
class AgentType(str, Enum):
"""Known agents in the swarm."""
CLAUDE_CODE = "claude_code"
@@ -58,7 +57,7 @@ class AgentType(StrEnum):
TIMMY = "timmy"
class TaskType(StrEnum):
class TaskType(str, Enum):
"""Categories of engineering work."""
# Claude Code strengths
@@ -84,7 +83,7 @@ class TaskType(StrEnum):
ORCHESTRATION = "orchestration"
class DispatchStatus(StrEnum):
class DispatchStatus(str, Enum):
"""Lifecycle state of a dispatched task."""
PENDING = "pending"
@@ -100,7 +99,6 @@ class DispatchStatus(StrEnum):
# Agent registry
# ---------------------------------------------------------------------------
@dataclass
class AgentSpec:
"""Capabilities and limits for a single agent."""
@@ -108,9 +106,9 @@ class AgentSpec:
name: AgentType
display_name: str
strengths: frozenset[TaskType]
gitea_label: str | None # label to apply when dispatching
gitea_label: str | None # label to apply when dispatching
max_concurrent: int = 1
interface: str = "gitea" # "gitea" | "api" | "local"
interface: str = "gitea" # "gitea" | "api" | "local"
api_endpoint: str | None = None # for interface="api"
@@ -199,7 +197,6 @@ _TASK_ROUTING: dict[TaskType, AgentType] = {
# Dispatch result
# ---------------------------------------------------------------------------
@dataclass
class DispatchResult:
"""Outcome of a dispatch call."""
@@ -223,7 +220,6 @@ class DispatchResult:
# Routing logic
# ---------------------------------------------------------------------------
def select_agent(task_type: TaskType) -> AgentType:
"""Return the best agent for *task_type* based on the routing table.
@@ -252,23 +248,11 @@ def infer_task_type(title: str, description: str = "") -> TaskType:
text = (title + " " + description).lower()
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
(
TaskType.ARCHITECTURE,
frozenset({"architect", "design", "adr", "system design", "schema"}),
),
(
TaskType.REFACTORING,
frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"}),
),
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
(
TaskType.COMPLEX_REASONING,
frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"}),
),
(
TaskType.RESEARCH,
frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"}),
),
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
@@ -289,7 +273,6 @@ def infer_task_type(title: str, description: str = "") -> TaskType:
# Gitea helpers
# ---------------------------------------------------------------------------
async def _post_gitea_comment(
client: Any,
base_url: str,
@@ -422,7 +405,6 @@ async def _poll_issue_completion(
# Core dispatch functions
# ---------------------------------------------------------------------------
async def _dispatch_via_gitea(
agent: AgentType,
issue_number: int,
@@ -497,11 +479,7 @@ async def _dispatch_via_gitea(
)
# 2. Post assignment comment
criteria_md = (
"\n".join(f"- {c}" for c in acceptance_criteria)
if acceptance_criteria
else "_None specified_"
)
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
comment_body = (
f"## Assigned to {spec.display_name}\n\n"
f"**Task type:** `{task_type.value}`\n\n"
@@ -638,7 +616,9 @@ async def _dispatch_local(
assumed to succeed at dispatch time).
"""
task_type = infer_task_type(title, description)
logger.info("Timmy handling task locally: %r (issue #%s)", title[:60], issue_number)
logger.info(
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
)
return DispatchResult(
task_type=task_type,
agent=AgentType.TIMMY,
@@ -652,7 +632,6 @@ async def _dispatch_local(
# Public entry point
# ---------------------------------------------------------------------------
async def dispatch_task(
title: str,
description: str = "",
@@ -790,7 +769,9 @@ async def _log_escalation(
f"---\n*Timmy agent dispatcher.*"
)
async with httpx.AsyncClient(timeout=10) as client:
await _post_gitea_comment(client, base_url, repo, headers, issue_number, body)
await _post_gitea_comment(
client, base_url, repo, headers, issue_number, body
)
except Exception as exc:
logger.warning("Failed to post escalation comment: %s", exc)
@@ -799,7 +780,6 @@ async def _log_escalation(
# Monitoring helper
# ---------------------------------------------------------------------------
async def wait_for_completion(
issue_number: int,
poll_interval: int = 60,

View File

@@ -418,7 +418,9 @@ class MCPBridge:
return f"Error executing {name}: {exc}"
@staticmethod
def _build_initial_messages(prompt: str, system_prompt: str | None) -> list[dict]:
def _build_initial_messages(
prompt: str, system_prompt: str | None
) -> list[dict]:
"""Build the initial message list for a run."""
messages: list[dict] = []
if system_prompt:
@@ -510,7 +512,9 @@ class MCPBridge:
error_msg = ""
try:
content, tool_calls_made, rounds, error_msg = await self._run_tool_loop(messages, tools)
content, tool_calls_made, rounds, error_msg = await self._run_tool_loop(
messages, tools
)
except httpx.ConnectError as exc:
logger.warning("Ollama connection failed: %s", exc)
error_msg = f"Ollama connection failed: {exc}"

View File

@@ -0,0 +1,21 @@
"""Sovereignty reporting for Timmy play sessions.
Auto-generates markdown scorecards at session end and commits them to
the Gitea repo for institutional memory.
Refs: #957 (Session Sovereignty Report Generator)
"""
from timmy.sovereignty.session_report import (
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
__all__ = [
"generate_report",
"commit_report",
"generate_and_commit_report",
"mark_session_start",
]

View File

@@ -0,0 +1,442 @@
"""Session Sovereignty Report Generator.
Auto-generates a sovereignty scorecard at the end of each play session
and commits it as a markdown file to the Gitea repo under
``reports/sovereignty/``.
Report contents (per issue #957):
- Session duration + game played
- Total model calls by type (VLM, LLM, TTS, API)
- Total cache/rule hits by type
- New skills crystallized (placeholder — pending skill-tracking impl)
- Sovereignty delta (change from session start → end)
- Cost breakdown (actual API spend)
- Per-layer sovereignty %: perception, decision, narration
- Trend comparison vs previous session
Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop)
"""
import base64
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import httpx
from config import settings
# Optional module-level imports — degrade gracefully if unavailable at import time
try:
from timmy.session_logger import get_session_logger
except Exception: # ImportError or circular import during early startup
get_session_logger = None # type: ignore[assignment]
try:
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
except Exception:
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
get_sovereignty_store = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# Module-level session start time; set by mark_session_start()
_SESSION_START: datetime | None = None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def mark_session_start() -> None:
"""Record the session start wall-clock time.
Call once during application startup so ``generate_report()`` can
compute accurate session durations.
"""
global _SESSION_START
_SESSION_START = datetime.now(UTC)
logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat())
def generate_report(session_id: str = "dashboard") -> str:
"""Render a sovereignty scorecard as a markdown string.
Pulls from:
- ``timmy.session_logger`` — message/tool-call/error counts
- ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost,
graduation phase, and trend data
Args:
session_id: The session identifier (default: "dashboard").
Returns:
Markdown-formatted sovereignty report string.
"""
now = datetime.now(UTC)
session_start = _SESSION_START or now
duration_secs = (now - session_start).total_seconds()
session_data = _gather_session_data()
sov_data = _gather_sovereignty_data()
return _render_markdown(now, session_id, duration_secs, session_data, sov_data)
def commit_report(report_md: str, session_id: str = "dashboard") -> bool:
"""Commit a sovereignty report to the Gitea repo.
Creates or updates ``reports/sovereignty/{date}_{session_id}.md``
via the Gitea Contents API. Degrades gracefully: logs a warning
and returns ``False`` if Gitea is unreachable or misconfigured.
Args:
report_md: Markdown content to commit.
session_id: Session identifier used in the filename.
Returns:
``True`` on success, ``False`` on failure.
"""
if not settings.gitea_enabled:
logger.info("Sovereignty: Gitea disabled — skipping report commit")
return False
if not settings.gitea_token:
logger.warning("Sovereignty: no Gitea token — skipping report commit")
return False
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
file_path = f"reports/sovereignty/{date_str}_{session_id}.md"
url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}"
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
encoded_content = base64.b64encode(report_md.encode()).decode()
commit_message = (
f"report: sovereignty session {session_id} ({date_str})\n\n"
f"Auto-generated by Timmy. Refs #957"
)
payload: dict[str, Any] = {
"message": commit_message,
"content": encoded_content,
}
try:
with httpx.Client(timeout=10.0) as client:
# Fetch existing file SHA so we can update rather than create
check = client.get(url, headers=headers)
if check.status_code == 200:
existing = check.json()
payload["sha"] = existing.get("sha", "")
resp = client.put(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("Sovereignty: report committed to %s", file_path)
return True
except httpx.HTTPStatusError as exc:
logger.warning(
"Sovereignty: commit failed (HTTP %s): %s",
exc.response.status_code,
exc,
)
return False
except Exception as exc:
logger.warning("Sovereignty: commit failed: %s", exc)
return False
async def generate_and_commit_report(session_id: str = "dashboard") -> bool:
"""Generate and commit a sovereignty report for the current session.
Primary entry point — call at session end / application shutdown.
Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread``
so it does not block the event loop.
Args:
session_id: The session identifier.
Returns:
``True`` if the report was generated and committed successfully.
"""
import asyncio
try:
report_md = generate_report(session_id)
logger.info("Sovereignty: report generated (%d chars)", len(report_md))
committed = await asyncio.to_thread(commit_report, report_md, session_id)
return committed
except Exception as exc:
logger.warning("Sovereignty: report generation failed: %s", exc)
return False
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string."""
total = int(seconds)
hours, remainder = divmod(total, 3600)
minutes, secs = divmod(remainder, 60)
if hours:
return f"{hours}h {minutes}m {secs}s"
if minutes:
return f"{minutes}m {secs}s"
return f"{secs}s"
def _gather_session_data() -> dict[str, Any]:
"""Pull session statistics from the session logger.
Returns a dict with:
- ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors``
- ``tool_call_breakdown``: dict[tool_name, count]
"""
default: dict[str, Any] = {
"user_messages": 0,
"timmy_messages": 0,
"tool_calls": 0,
"errors": 0,
"tool_call_breakdown": {},
}
try:
if get_session_logger is None:
return default
sl = get_session_logger()
sl.flush()
# Read today's session file directly for accurate counts
if not sl.session_file.exists():
return default
entries: list[dict] = []
with open(sl.session_file) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
tool_breakdown: dict[str, int] = {}
user_msgs = timmy_msgs = tool_calls = errors = 0
for entry in entries:
etype = entry.get("type")
if etype == "message":
if entry.get("role") == "user":
user_msgs += 1
elif entry.get("role") == "timmy":
timmy_msgs += 1
elif etype == "tool_call":
tool_calls += 1
tool_name = entry.get("tool", "unknown")
tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1
elif etype == "error":
errors += 1
return {
"user_messages": user_msgs,
"timmy_messages": timmy_msgs,
"tool_calls": tool_calls,
"errors": errors,
"tool_call_breakdown": tool_breakdown,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather session data: %s", exc)
return default
def _gather_sovereignty_data() -> dict[str, Any]:
"""Pull sovereignty metrics from the SQLite store.
Returns a dict with:
- ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()``
- ``deltas``: per-metric start/end values within recent history window
- ``previous_session``: most recent prior value for each metric
"""
try:
if get_sovereignty_store is None:
return {"metrics": {}, "deltas": {}, "previous_session": {}}
store = get_sovereignty_store()
summary = store.get_summary()
deltas: dict[str, dict[str, Any]] = {}
previous_session: dict[str, float | None] = {}
for metric_type in GRADUATION_TARGETS:
history = store.get_latest(metric_type, limit=10)
if len(history) >= 2:
deltas[metric_type] = {
"start": history[-1]["value"],
"end": history[0]["value"],
}
previous_session[metric_type] = history[1]["value"]
elif len(history) == 1:
deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]}
previous_session[metric_type] = None
else:
deltas[metric_type] = {"start": None, "end": None}
previous_session[metric_type] = None
return {
"metrics": summary,
"deltas": deltas,
"previous_session": previous_session,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc)
return {"metrics": {}, "deltas": {}, "previous_session": {}}
def _render_markdown(
now: datetime,
session_id: str,
duration_secs: float,
session_data: dict[str, Any],
sov_data: dict[str, Any],
) -> str:
"""Assemble the full sovereignty report in markdown."""
lines: list[str] = []
# Header
lines += [
"# Sovereignty Session Report",
"",
f"**Session ID:** `{session_id}` ",
f"**Date:** {now.strftime('%Y-%m-%d')} ",
f"**Duration:** {_format_duration(duration_secs)} ",
f"**Generated:** {now.isoformat()}",
"",
"---",
"",
]
# Session activity
lines += [
"## Session Activity",
"",
"| Metric | Count |",
"|--------|-------|",
f"| User messages | {session_data['user_messages']} |",
f"| Timmy responses | {session_data['timmy_messages']} |",
f"| Tool calls | {session_data['tool_calls']} |",
f"| Errors | {session_data['errors']} |",
"",
]
tool_breakdown = session_data.get("tool_call_breakdown", {})
if tool_breakdown:
lines += ["### Model Calls by Tool", ""]
for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
lines.append(f"- `{tool_name}`: {count}")
lines.append("")
# Sovereignty scorecard
lines += [
"## Sovereignty Scorecard",
"",
"| Metric | Current | Target (graduation) | Phase |",
"|--------|---------|---------------------|-------|",
]
for metric_type, data in sov_data["metrics"].items():
current = data.get("current")
current_str = f"{current:.4f}" if current is not None else "N/A"
grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation")
grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A"
phase = data.get("phase", "unknown")
lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |")
lines += ["", "### Sovereignty Delta (This Session)", ""]
for metric_type, delta_info in sov_data.get("deltas", {}).items():
start_val = delta_info.get("start")
end_val = delta_info.get("end")
if start_val is not None and end_val is not None:
diff = end_val - start_val
sign = "+" if diff >= 0 else ""
lines.append(
f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})"
)
else:
lines.append(f"- **{metric_type}**: N/A (no data recorded)")
# Cost breakdown
lines += ["", "## Cost Breakdown", ""]
api_cost_data = sov_data["metrics"].get("api_cost", {})
current_cost = api_cost_data.get("current")
if current_cost is not None:
lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}")
else:
lines.append("- **Total API spend:** N/A (no data recorded)")
lines.append("")
# Per-layer sovereignty
lines += [
"## Per-Layer Sovereignty",
"",
"| Layer | Sovereignty % |",
"|-------|--------------|",
"| Perception (VLM) | N/A |",
"| Decision (LLM) | N/A |",
"| Narration (TTS) | N/A |",
"",
"> Per-layer tracking requires instrumented inference calls. See #957.",
"",
]
# Skills crystallized
lines += [
"## Skills Crystallized",
"",
"_Skill crystallization tracking not yet implemented. See #957._",
"",
]
# Trend vs previous session
lines += ["## Trend vs Previous Session", ""]
prev_data = sov_data.get("previous_session", {})
has_prev = any(v is not None for v in prev_data.values())
if has_prev:
lines += [
"| Metric | Previous | Current | Change |",
"|--------|----------|---------|--------|",
]
for metric_type, curr_info in sov_data["metrics"].items():
curr_val = curr_info.get("current")
prev_val = prev_data.get(metric_type)
curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A"
prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A"
if curr_val is not None and prev_val is not None:
diff = curr_val - prev_val
sign = "+" if diff >= 0 else ""
change_str = f"{sign}{diff:.4f}"
else:
change_str = "N/A"
lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |")
lines.append("")
else:
lines += ["_No previous session data available for comparison._", ""]
# Footer
lines += [
"---",
"_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_",
]
return "\n".join(lines)

View File

@@ -47,11 +47,13 @@ _DEFAULT_IDLE_THRESHOLD = 30
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
checked_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def is_stuck(self) -> bool:
@@ -67,7 +69,9 @@ class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
generated_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def any_stuck(self) -> bool:
@@ -189,14 +193,18 @@ async def check_agent_health(
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(client, base_url, headers, repo, label)
issues = await _fetch_labeled_issues(
client, base_url, headers, repo, label
)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(client, base_url, headers, repo, num)
last_activity = await _last_comment_time(
client, base_url, headers, repo, num
)
if last_activity is None:
last_activity = await _issue_created_time(issue)

View File

@@ -91,9 +91,9 @@ _PRIORITY_LABEL_SCORES: dict[str, int] = {
class AgentTarget(StrEnum):
"""Which agent should handle this issue."""
TIMMY = "timmy" # Timmy handles locally (self)
TIMMY = "timmy" # Timmy handles locally (self)
CLAUDE = "claude" # Dispatch to Claude Code
KIMI = "kimi" # Dispatch to Kimi Code
KIMI = "kimi" # Dispatch to Kimi Code
@dataclass
@@ -172,7 +172,9 @@ def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
title = issue.get("title", "")
body = issue.get("body") or ""
labels = _extract_labels(issue)
assignees = [a.get("login", "") for a in issue.get("assignees") or []]
assignees = [
a.get("login", "") for a in issue.get("assignees") or []
]
url = issue.get("html_url", "")
priority = _score_priority(labels, assignees)
@@ -250,7 +252,9 @@ async def fetch_open_issues(
params=params,
)
if resp.status_code != 200:
logger.warning("fetch_open_issues: Gitea returned %s", resp.status_code)
logger.warning(
"fetch_open_issues: Gitea returned %s", resp.status_code
)
return []
issues = resp.json()

View File

@@ -34,7 +34,7 @@ _LABEL_MAP: dict[AgentTarget, str] = {
_LABEL_COLORS: dict[str, str] = {
"claude-ready": "#8b6f47", # warm brown
"kimi-ready": "#006b75", # dark teal
"kimi-ready": "#006b75", # dark teal
"timmy-ready": "#0075ca", # blue
}
@@ -52,7 +52,9 @@ class DispatchRecord:
issue_title: str
agent: AgentTarget
rationale: str
dispatched_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
dispatched_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
label_applied: bool = False
comment_posted: bool = False
@@ -168,7 +170,9 @@ async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(client, base_url, headers, repo, label_name)
label_id = await _get_or_create_label(
client, base_url, headers, repo, label_name
)
# Apply label
if label_id is not None:

View File

@@ -22,9 +22,9 @@ logger = logging.getLogger(__name__)
# Thresholds
# ---------------------------------------------------------------------------
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
# ---------------------------------------------------------------------------
@@ -63,7 +63,9 @@ class SystemSnapshot:
memory: MemoryUsage = field(default_factory=MemoryUsage)
ollama: OllamaHealth = field(default_factory=OllamaHealth)
warnings: list[str] = field(default_factory=list)
taken_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
taken_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def healthy(self) -> bool:
@@ -115,8 +117,8 @@ def _probe_memory() -> MemoryUsage:
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
"""Synchronous Ollama health probe — run in a thread."""
try:
import json
import urllib.request
import json
url = ollama_url.rstrip("/") + "/api/tags"
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
@@ -152,12 +154,14 @@ async def get_system_snapshot() -> SystemSnapshot:
if disk.percent_used >= _WARN_DISK_PCT:
warnings.append(
f"Disk {disk.path}: {disk.percent_used:.0f}% used ({disk.free_gb:.1f} GB free)"
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
f"({disk.free_gb:.1f} GB free)"
)
if memory.percent_used >= _WARN_MEM_PCT:
warnings.append(
f"Memory: {memory.percent_used:.0f}% used ({memory.available_gb:.1f} GB available)"
f"Memory: {memory.percent_used:.0f}% used "
f"({memory.available_gb:.1f} GB available)"
)
if not ollama.reachable:
@@ -212,5 +216,7 @@ async def cleanup_stale_files(
errors.append(str(exc))
await asyncio.to_thread(_cleanup)
logger.info("cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors))
logger.info(
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
)
return {"deleted_count": deleted, "errors": errors}

View File

@@ -10,12 +10,14 @@ from __future__ import annotations
import json
import socket
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from integrations.bannerlord.gabs_client import GabsClient, GabsError
# ── GabsClient unit tests ─────────────────────────────────────────────────────
@@ -234,13 +236,7 @@ class TestBannerlordObserver:
snapshot = {
"game_state": {"day": 7, "season": "winter", "campaign_phase": "early"},
"player": {
"name": "Timmy",
"clan": "Thalheimer",
"renown": 42,
"level": 3,
"gold": 1000,
},
"player": {"name": "Timmy", "clan": "Thalheimer", "renown": 42, "level": 3, "gold": 1000},
"player_party": {"size": 25, "morale": 80, "food_days_left": 5},
"kingdoms": [{"name": "Vlandia", "ruler": "Derthert", "military_strength": 5000}],
}

View File

@@ -1,6 +1,7 @@
"""Tests for agent emotional state simulation (src/timmy/agents/emotional_state.py)."""
import time
from unittest.mock import patch
from timmy.agents.emotional_state import (
EMOTION_PROMPT_MODIFIERS,

View File

@@ -4,6 +4,8 @@ from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dispatcher import (
AGENT_REGISTRY,
AgentType,
@@ -19,11 +21,11 @@ from timmy.dispatcher import (
wait_for_completion,
)
# ---------------------------------------------------------------------------
# Agent registry
# ---------------------------------------------------------------------------
class TestAgentRegistry:
def test_all_agents_present(self):
for member in AgentType:
@@ -39,7 +41,7 @@ class TestAgentRegistry:
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
def test_non_gitea_agents_have_no_labels(self):
for _agent, spec in AGENT_REGISTRY.items():
for agent, spec in AGENT_REGISTRY.items():
if spec.interface not in ("gitea",):
# api and local agents may have no label
assert spec.gitea_label is None or spec.interface == "gitea"
@@ -53,7 +55,6 @@ class TestAgentRegistry:
# select_agent
# ---------------------------------------------------------------------------
class TestSelectAgent:
def test_architecture_routes_to_claude(self):
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
@@ -84,7 +85,6 @@ class TestSelectAgent:
# infer_task_type
# ---------------------------------------------------------------------------
class TestInferTaskType:
def test_architecture_keyword(self):
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
@@ -119,7 +119,6 @@ class TestInferTaskType:
# DispatchResult
# ---------------------------------------------------------------------------
class TestDispatchResult:
def test_success_when_assigned(self):
r = DispatchResult(
@@ -162,7 +161,6 @@ class TestDispatchResult:
# _dispatch_local
# ---------------------------------------------------------------------------
class TestDispatchLocal:
async def test_returns_assigned(self):
result = await _dispatch_local(
@@ -192,7 +190,6 @@ class TestDispatchLocal:
# _dispatch_via_api
# ---------------------------------------------------------------------------
class TestDispatchViaApi:
async def test_no_endpoint_returns_failed(self):
result = await _dispatch_via_api(
@@ -307,9 +304,7 @@ class TestDispatchViaGitea:
assert result.status == DispatchStatus.ASSIGNED
async def test_no_gitea_token_returns_failed(self):
bad_settings = MagicMock(
gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b"
)
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
@@ -322,9 +317,7 @@ class TestDispatchViaGitea:
assert "not configured" in (result.error or "").lower()
async def test_gitea_disabled_returns_failed(self):
bad_settings = MagicMock(
gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b"
)
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
@@ -375,7 +368,6 @@ class TestDispatchViaGitea:
# dispatch_task (integration-style)
# ---------------------------------------------------------------------------
class TestDispatchTask:
async def test_empty_title_returns_failed(self):
result = await dispatch_task(title=" ")
@@ -404,9 +396,7 @@ class TestDispatchTask:
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(
return_value=MagicMock(status_code=200, json=MagicMock(return_value=[]))
)
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
apply_resp = MagicMock(status_code=201)
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
@@ -474,7 +464,6 @@ class TestDispatchTask:
# wait_for_completion
# ---------------------------------------------------------------------------
class TestWaitForCompletion:
async def test_returns_completed_when_issue_closed(self):
closed_resp = MagicMock(

View File

@@ -0,0 +1,444 @@
"""Tests for timmy.sovereignty.session_report.
Refs: #957 (Session Sovereignty Report Generator)
"""
import base64
import json
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
from timmy.sovereignty.session_report import (
_format_duration,
_gather_session_data,
_gather_sovereignty_data,
_render_markdown,
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
# ---------------------------------------------------------------------------
# _format_duration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds_only(self):
assert _format_duration(45) == "45s"
def test_minutes_and_seconds(self):
assert _format_duration(125) == "2m 5s"
def test_hours_minutes_seconds(self):
assert _format_duration(3661) == "1h 1m 1s"
def test_zero(self):
assert _format_duration(0) == "0s"
# ---------------------------------------------------------------------------
# mark_session_start + generate_report (smoke)
# ---------------------------------------------------------------------------
class TestMarkSessionStart:
def test_sets_session_start(self):
import timmy.sovereignty.session_report as sr
sr._SESSION_START = None
mark_session_start()
assert sr._SESSION_START is not None
assert sr._SESSION_START.tzinfo == UTC
def test_idempotent_overwrite(self):
import timmy.sovereignty.session_report as sr
mark_session_start()
first = sr._SESSION_START
time.sleep(0.01)
mark_session_start()
second = sr._SESSION_START
assert second >= first
# ---------------------------------------------------------------------------
# _gather_session_data
# ---------------------------------------------------------------------------
class TestGatherSessionData:
def test_returns_defaults_when_no_file(self, tmp_path):
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = tmp_path / "nonexistent.jsonl"
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 0
assert data["timmy_messages"] == 0
assert data["tool_calls"] == 0
assert data["errors"] == 0
assert data["tool_call_breakdown"] == {}
def test_counts_entries_correctly(self, tmp_path):
session_file = tmp_path / "session_2026-03-23.jsonl"
entries = [
{"type": "message", "role": "user", "content": "hello"},
{"type": "message", "role": "timmy", "content": "hi"},
{"type": "message", "role": "user", "content": "test"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"},
{"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"},
{"type": "error", "error": "boom"},
]
with open(session_file, "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = session_file
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 2
assert data["timmy_messages"] == 1
assert data["tool_calls"] == 3
assert data["errors"] == 1
assert data["tool_call_breakdown"]["memory_search"] == 2
assert data["tool_call_breakdown"]["shell"] == 1
def test_graceful_on_import_error(self):
with patch(
"timmy.sovereignty.session_report.get_session_logger",
side_effect=ImportError("no session_logger"),
):
data = _gather_session_data()
assert data["tool_calls"] == 0
# ---------------------------------------------------------------------------
# _gather_sovereignty_data
# ---------------------------------------------------------------------------
class TestGatherSovereigntyData:
def test_returns_empty_on_import_error(self):
with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}):
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
side_effect=ImportError("no store"),
):
data = _gather_sovereignty_data()
assert data["metrics"] == {}
assert data["deltas"] == {}
assert data["previous_session"] == {}
def test_populates_deltas_from_history(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {
"cache_hit_rate": {"current": 0.5, "phase": "week1"},
}
# get_latest returns newest-first
mock_store.get_latest.return_value = [
{"value": 0.5},
{"value": 0.3},
{"value": 0.1},
]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"cache_hit_rate": {"graduation": 0.9}},
):
data = _gather_sovereignty_data()
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {}
mock_store.get_latest.return_value = [{"value": 0.4}]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"api_cost": {"graduation": 0.01}},
):
data = _gather_sovereignty_data()
delta = data["deltas"]["api_cost"]
assert delta["start"] == 0.4
assert delta["end"] == 0.4
assert data["previous_session"]["api_cost"] is None
# ---------------------------------------------------------------------------
# generate_report (integration — smoke test)
# ---------------------------------------------------------------------------
class TestGenerateReport:
def _minimal_session_data(self):
return {
"user_messages": 3,
"timmy_messages": 3,
"tool_calls": 2,
"errors": 0,
"tool_call_breakdown": {"memory_search": 2},
}
def _minimal_sov_data(self):
return {
"metrics": {
"cache_hit_rate": {"current": 0.45, "phase": "week1"},
"api_cost": {"current": 0.12, "phase": "pre-start"},
},
"deltas": {
"cache_hit_rate": {"start": 0.40, "end": 0.45},
"api_cost": {"start": 0.10, "end": 0.12},
},
"previous_session": {
"cache_hit_rate": 0.40,
"api_cost": 0.10,
},
}
def test_smoke_produces_markdown(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report("test-session")
assert "# Sovereignty Session Report" in report
assert "test-session" in report
assert "## Session Activity" in report
assert "## Sovereignty Scorecard" in report
assert "## Cost Breakdown" in report
assert "## Trend vs Previous Session" in report
def test_report_contains_session_stats(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report()
assert "| User messages | 3 |" in report
assert "memory_search" in report
def test_report_no_previous_session(self):
sov = self._minimal_sov_data()
sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None}
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=sov,
),
):
report = generate_report()
assert "No previous session data" in report
# ---------------------------------------------------------------------------
# commit_report
# ---------------------------------------------------------------------------
class TestCommitReport:
def test_returns_false_when_gitea_disabled(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = False
result = commit_report("# test", "dashboard")
assert result is False
def test_returns_false_when_no_token(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = commit_report("# test", "dashboard")
assert result is False
def test_creates_file_via_put(self):
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.raise_for_status.return_value = None
mock_check = MagicMock()
mock_check.status_code = 404 # file does not exist yet
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# report content", "dashboard")
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded
def test_updates_existing_file_with_sha(self):
mock_check = MagicMock()
mock_check.status_code = 200
mock_check.json.return_value = {"sha": "abc123"}
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# updated", "dashboard")
assert result is True
payload = mock_client.put.call_args.kwargs.get("json", {})
assert payload.get("sha") == "abc123"
def test_returns_false_on_http_error(self):
import httpx
mock_check = MagicMock()
mock_check.status_code = 404
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.side_effect = httpx.HTTPStatusError(
"403", request=MagicMock(), response=MagicMock(status_code=403)
)
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# test", "dashboard")
assert result is False
# ---------------------------------------------------------------------------
# generate_and_commit_report (async)
# ---------------------------------------------------------------------------
class TestGenerateAndCommitReport:
async def test_returns_true_on_success(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=True,
),
):
result = await generate_and_commit_report("test")
assert result is True
async def test_returns_false_when_commit_fails(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=False,
),
):
result = await generate_and_commit_report()
assert result is False
async def test_graceful_on_exception(self):
with patch(
"timmy.sovereignty.session_report.generate_report",
side_effect=RuntimeError("explode"),
):
result = await generate_and_commit_report()
assert result is False

View File

@@ -25,6 +25,7 @@ from timmy.backlog_triage import (
score_issue,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────

View File

@@ -7,6 +7,7 @@ Refs: #1073
"""
import json
from io import BytesIO
from unittest.mock import MagicMock, patch
import pytest
@@ -78,9 +79,7 @@ def test_get_memory_info_handles_subprocess_failure(monitor):
@pytest.mark.asyncio
async def test_check_memory_ok(monitor):
with patch.object(
monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}
):
with patch.object(monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}):
result = await monitor._check_memory()
assert result.name == "memory"
@@ -127,7 +126,7 @@ async def test_check_memory_exception_returns_unknown(monitor):
@pytest.mark.asyncio
async def test_check_disk_ok(monitor):
usage = MagicMock()
usage.free = 100 * (1024**3) # 100 GB
usage.free = 100 * (1024**3) # 100 GB
usage.total = 500 * (1024**3) # 500 GB
usage.used = 400 * (1024**3)
@@ -141,7 +140,7 @@ async def test_check_disk_ok(monitor):
@pytest.mark.asyncio
async def test_check_disk_low_triggers_cleanup(monitor):
usage = MagicMock()
usage.free = 5 * (1024**3) # 5 GB — below threshold
usage.free = 5 * (1024**3) # 5 GB — below threshold
usage.total = 500 * (1024**3)
usage.used = 495 * (1024**3)
@@ -177,8 +176,12 @@ async def test_check_disk_critical_when_cleanup_fails(monitor):
def test_get_ollama_status_reachable(monitor):
tags_body = json.dumps({"models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}]}).encode()
ps_body = json.dumps({"models": [{"name": "qwen3:30b", "size": 1000}]}).encode()
tags_body = json.dumps({
"models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}]
}).encode()
ps_body = json.dumps({
"models": [{"name": "qwen3:30b", "size": 1000}]
}).encode()
responses = [
_FakeHTTPResponse(tags_body),

View File

@@ -6,6 +6,7 @@ import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
@@ -48,7 +49,9 @@ def test_report_any_stuck():
def test_report_all_idle():
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")])
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.all_idle is True

View File

@@ -6,12 +6,14 @@ import pytest
from timmy.vassal.backlog import (
AgentTarget,
TriagedIssue,
_choose_agent,
_extract_labels,
_score_priority,
triage_issues,
)
# ---------------------------------------------------------------------------
# _extract_labels
# ---------------------------------------------------------------------------

View File

@@ -12,6 +12,7 @@ from timmy.vassal.house_health import (
_probe_disk,
)
# ---------------------------------------------------------------------------
# Data model tests
# ---------------------------------------------------------------------------

View File

@@ -6,6 +6,7 @@ import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
@@ -133,6 +134,6 @@ def test_orchestrator_stop_when_not_running():
def test_module_singleton_exists():
from timmy.vassal import VassalOrchestrator, vassal_orchestrator
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)