diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 8dddeb39..c6ff8c63 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -549,12 +549,28 @@ async def lifespan(app: FastAPI): except Exception: logger.debug("Failed to register error recorder") + # Mark session start for sovereignty duration tracking + try: + from timmy.sovereignty import mark_session_start + + mark_session_start() + except Exception: + logger.debug("Failed to mark sovereignty session start") + logger.info("✓ Dashboard ready for requests") yield await _shutdown_cleanup(bg_tasks, workshop_heartbeat) + # Generate and commit sovereignty session report + try: + from timmy.sovereignty import generate_and_commit_report + + await generate_and_commit_report() + except Exception as exc: + logger.warning("Sovereignty report generation failed at shutdown: %s", exc) + app = FastAPI( title="Mission Control", diff --git a/src/timmy/sovereignty/__init__.py b/src/timmy/sovereignty/__init__.py index 44ca4a45..4fae61db 100644 --- a/src/timmy/sovereignty/__init__.py +++ b/src/timmy/sovereignty/__init__.py @@ -4,4 +4,23 @@ Tracks how much of each AI layer (perception, decision, narration) runs locally vs. calls out to an LLM. Feeds the sovereignty dashboard. Refs: #954, #953 + +Session reporting: auto-generates markdown scorecards at session end +and commits them to the Gitea repo for institutional memory. + +Refs: #957 (Session Sovereignty Report Generator) """ + +from timmy.sovereignty.session_report import ( + commit_report, + generate_and_commit_report, + generate_report, + mark_session_start, +) + +__all__ = [ + "generate_report", + "commit_report", + "generate_and_commit_report", + "mark_session_start", +] diff --git a/src/timmy/sovereignty/session_report.py b/src/timmy/sovereignty/session_report.py new file mode 100644 index 00000000..d034e48c --- /dev/null +++ b/src/timmy/sovereignty/session_report.py @@ -0,0 +1,442 @@ +"""Session Sovereignty Report Generator. + +Auto-generates a sovereignty scorecard at the end of each play session +and commits it as a markdown file to the Gitea repo under +``reports/sovereignty/``. + +Report contents (per issue #957): +- Session duration + game played +- Total model calls by type (VLM, LLM, TTS, API) +- Total cache/rule hits by type +- New skills crystallized (placeholder — pending skill-tracking impl) +- Sovereignty delta (change from session start → end) +- Cost breakdown (actual API spend) +- Per-layer sovereignty %: perception, decision, narration +- Trend comparison vs previous session + +Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop) +""" + +import base64 +import json +import logging +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import httpx + +from config import settings + +# Optional module-level imports — degrade gracefully if unavailable at import time +try: + from timmy.session_logger import get_session_logger +except Exception: # ImportError or circular import during early startup + get_session_logger = None # type: ignore[assignment] + +try: + from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store +except Exception: + GRADUATION_TARGETS: dict = {} # type: ignore[assignment] + get_sovereignty_store = None # type: ignore[assignment] + +logger = logging.getLogger(__name__) + +# Module-level session start time; set by mark_session_start() +_SESSION_START: datetime | None = None + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def mark_session_start() -> None: + """Record the session start wall-clock time. + + Call once during application startup so ``generate_report()`` can + compute accurate session durations. + """ + global _SESSION_START + _SESSION_START = datetime.now(UTC) + logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat()) + + +def generate_report(session_id: str = "dashboard") -> str: + """Render a sovereignty scorecard as a markdown string. + + Pulls from: + - ``timmy.session_logger`` — message/tool-call/error counts + - ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost, + graduation phase, and trend data + + Args: + session_id: The session identifier (default: "dashboard"). + + Returns: + Markdown-formatted sovereignty report string. + """ + now = datetime.now(UTC) + session_start = _SESSION_START or now + duration_secs = (now - session_start).total_seconds() + + session_data = _gather_session_data() + sov_data = _gather_sovereignty_data() + + return _render_markdown(now, session_id, duration_secs, session_data, sov_data) + + +def commit_report(report_md: str, session_id: str = "dashboard") -> bool: + """Commit a sovereignty report to the Gitea repo. + + Creates or updates ``reports/sovereignty/{date}_{session_id}.md`` + via the Gitea Contents API. Degrades gracefully: logs a warning + and returns ``False`` if Gitea is unreachable or misconfigured. + + Args: + report_md: Markdown content to commit. + session_id: Session identifier used in the filename. + + Returns: + ``True`` on success, ``False`` on failure. + """ + if not settings.gitea_enabled: + logger.info("Sovereignty: Gitea disabled — skipping report commit") + return False + + if not settings.gitea_token: + logger.warning("Sovereignty: no Gitea token — skipping report commit") + return False + + date_str = datetime.now(UTC).strftime("%Y-%m-%d") + file_path = f"reports/sovereignty/{date_str}_{session_id}.md" + url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}" + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + encoded_content = base64.b64encode(report_md.encode()).decode() + commit_message = ( + f"report: sovereignty session {session_id} ({date_str})\n\n" + f"Auto-generated by Timmy. Refs #957" + ) + payload: dict[str, Any] = { + "message": commit_message, + "content": encoded_content, + } + + try: + with httpx.Client(timeout=10.0) as client: + # Fetch existing file SHA so we can update rather than create + check = client.get(url, headers=headers) + if check.status_code == 200: + existing = check.json() + payload["sha"] = existing.get("sha", "") + + resp = client.put(url, headers=headers, json=payload) + resp.raise_for_status() + + logger.info("Sovereignty: report committed to %s", file_path) + return True + + except httpx.HTTPStatusError as exc: + logger.warning( + "Sovereignty: commit failed (HTTP %s): %s", + exc.response.status_code, + exc, + ) + return False + except Exception as exc: + logger.warning("Sovereignty: commit failed: %s", exc) + return False + + +async def generate_and_commit_report(session_id: str = "dashboard") -> bool: + """Generate and commit a sovereignty report for the current session. + + Primary entry point — call at session end / application shutdown. + Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread`` + so it does not block the event loop. + + Args: + session_id: The session identifier. + + Returns: + ``True`` if the report was generated and committed successfully. + """ + import asyncio + + try: + report_md = generate_report(session_id) + logger.info("Sovereignty: report generated (%d chars)", len(report_md)) + committed = await asyncio.to_thread(commit_report, report_md, session_id) + return committed + except Exception as exc: + logger.warning("Sovereignty: report generation failed: %s", exc) + return False + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + + +def _format_duration(seconds: float) -> str: + """Format a duration in seconds as a human-readable string.""" + total = int(seconds) + hours, remainder = divmod(total, 3600) + minutes, secs = divmod(remainder, 60) + if hours: + return f"{hours}h {minutes}m {secs}s" + if minutes: + return f"{minutes}m {secs}s" + return f"{secs}s" + + +def _gather_session_data() -> dict[str, Any]: + """Pull session statistics from the session logger. + + Returns a dict with: + - ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors`` + - ``tool_call_breakdown``: dict[tool_name, count] + """ + default: dict[str, Any] = { + "user_messages": 0, + "timmy_messages": 0, + "tool_calls": 0, + "errors": 0, + "tool_call_breakdown": {}, + } + + try: + if get_session_logger is None: + return default + sl = get_session_logger() + sl.flush() + + # Read today's session file directly for accurate counts + if not sl.session_file.exists(): + return default + + entries: list[dict] = [] + with open(sl.session_file) as f: + for line in f: + line = line.strip() + if line: + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + + tool_breakdown: dict[str, int] = {} + user_msgs = timmy_msgs = tool_calls = errors = 0 + + for entry in entries: + etype = entry.get("type") + if etype == "message": + if entry.get("role") == "user": + user_msgs += 1 + elif entry.get("role") == "timmy": + timmy_msgs += 1 + elif etype == "tool_call": + tool_calls += 1 + tool_name = entry.get("tool", "unknown") + tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1 + elif etype == "error": + errors += 1 + + return { + "user_messages": user_msgs, + "timmy_messages": timmy_msgs, + "tool_calls": tool_calls, + "errors": errors, + "tool_call_breakdown": tool_breakdown, + } + + except Exception as exc: + logger.warning("Sovereignty: failed to gather session data: %s", exc) + return default + + +def _gather_sovereignty_data() -> dict[str, Any]: + """Pull sovereignty metrics from the SQLite store. + + Returns a dict with: + - ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()`` + - ``deltas``: per-metric start/end values within recent history window + - ``previous_session``: most recent prior value for each metric + """ + try: + if get_sovereignty_store is None: + return {"metrics": {}, "deltas": {}, "previous_session": {}} + store = get_sovereignty_store() + summary = store.get_summary() + + deltas: dict[str, dict[str, Any]] = {} + previous_session: dict[str, float | None] = {} + + for metric_type in GRADUATION_TARGETS: + history = store.get_latest(metric_type, limit=10) + if len(history) >= 2: + deltas[metric_type] = { + "start": history[-1]["value"], + "end": history[0]["value"], + } + previous_session[metric_type] = history[1]["value"] + elif len(history) == 1: + deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]} + previous_session[metric_type] = None + else: + deltas[metric_type] = {"start": None, "end": None} + previous_session[metric_type] = None + + return { + "metrics": summary, + "deltas": deltas, + "previous_session": previous_session, + } + + except Exception as exc: + logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc) + return {"metrics": {}, "deltas": {}, "previous_session": {}} + + +def _render_markdown( + now: datetime, + session_id: str, + duration_secs: float, + session_data: dict[str, Any], + sov_data: dict[str, Any], +) -> str: + """Assemble the full sovereignty report in markdown.""" + lines: list[str] = [] + + # Header + lines += [ + "# Sovereignty Session Report", + "", + f"**Session ID:** `{session_id}` ", + f"**Date:** {now.strftime('%Y-%m-%d')} ", + f"**Duration:** {_format_duration(duration_secs)} ", + f"**Generated:** {now.isoformat()}", + "", + "---", + "", + ] + + # Session activity + lines += [ + "## Session Activity", + "", + "| Metric | Count |", + "|--------|-------|", + f"| User messages | {session_data['user_messages']} |", + f"| Timmy responses | {session_data['timmy_messages']} |", + f"| Tool calls | {session_data['tool_calls']} |", + f"| Errors | {session_data['errors']} |", + "", + ] + + tool_breakdown = session_data.get("tool_call_breakdown", {}) + if tool_breakdown: + lines += ["### Model Calls by Tool", ""] + for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]): + lines.append(f"- `{tool_name}`: {count}") + lines.append("") + + # Sovereignty scorecard + + lines += [ + "## Sovereignty Scorecard", + "", + "| Metric | Current | Target (graduation) | Phase |", + "|--------|---------|---------------------|-------|", + ] + + for metric_type, data in sov_data["metrics"].items(): + current = data.get("current") + current_str = f"{current:.4f}" if current is not None else "N/A" + grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation") + grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A" + phase = data.get("phase", "unknown") + lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |") + + lines += ["", "### Sovereignty Delta (This Session)", ""] + + for metric_type, delta_info in sov_data.get("deltas", {}).items(): + start_val = delta_info.get("start") + end_val = delta_info.get("end") + if start_val is not None and end_val is not None: + diff = end_val - start_val + sign = "+" if diff >= 0 else "" + lines.append( + f"- **{metric_type}**: {start_val:.4f} → {end_val:.4f} ({sign}{diff:.4f})" + ) + else: + lines.append(f"- **{metric_type}**: N/A (no data recorded)") + + # Cost breakdown + lines += ["", "## Cost Breakdown", ""] + api_cost_data = sov_data["metrics"].get("api_cost", {}) + current_cost = api_cost_data.get("current") + if current_cost is not None: + lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}") + else: + lines.append("- **Total API spend:** N/A (no data recorded)") + lines.append("") + + # Per-layer sovereignty + lines += [ + "## Per-Layer Sovereignty", + "", + "| Layer | Sovereignty % |", + "|-------|--------------|", + "| Perception (VLM) | N/A |", + "| Decision (LLM) | N/A |", + "| Narration (TTS) | N/A |", + "", + "> Per-layer tracking requires instrumented inference calls. See #957.", + "", + ] + + # Skills crystallized + lines += [ + "## Skills Crystallized", + "", + "_Skill crystallization tracking not yet implemented. See #957._", + "", + ] + + # Trend vs previous session + lines += ["## Trend vs Previous Session", ""] + prev_data = sov_data.get("previous_session", {}) + has_prev = any(v is not None for v in prev_data.values()) + + if has_prev: + lines += [ + "| Metric | Previous | Current | Change |", + "|--------|----------|---------|--------|", + ] + for metric_type, curr_info in sov_data["metrics"].items(): + curr_val = curr_info.get("current") + prev_val = prev_data.get(metric_type) + curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A" + prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A" + if curr_val is not None and prev_val is not None: + diff = curr_val - prev_val + sign = "+" if diff >= 0 else "" + change_str = f"{sign}{diff:.4f}" + else: + change_str = "N/A" + lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |") + lines.append("") + else: + lines += ["_No previous session data available for comparison._", ""] + + # Footer + lines += [ + "---", + "_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_", + ] + + return "\n".join(lines) diff --git a/tests/timmy/test_session_report.py b/tests/timmy/test_session_report.py new file mode 100644 index 00000000..54f2b736 --- /dev/null +++ b/tests/timmy/test_session_report.py @@ -0,0 +1,444 @@ +"""Tests for timmy.sovereignty.session_report. + +Refs: #957 (Session Sovereignty Report Generator) +""" + +import base64 +import json +import time +from datetime import UTC, datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +pytestmark = pytest.mark.unit + +from timmy.sovereignty.session_report import ( + _format_duration, + _gather_session_data, + _gather_sovereignty_data, + _render_markdown, + commit_report, + generate_and_commit_report, + generate_report, + mark_session_start, +) + + +# --------------------------------------------------------------------------- +# _format_duration +# --------------------------------------------------------------------------- + + +class TestFormatDuration: + def test_seconds_only(self): + assert _format_duration(45) == "45s" + + def test_minutes_and_seconds(self): + assert _format_duration(125) == "2m 5s" + + def test_hours_minutes_seconds(self): + assert _format_duration(3661) == "1h 1m 1s" + + def test_zero(self): + assert _format_duration(0) == "0s" + + +# --------------------------------------------------------------------------- +# mark_session_start + generate_report (smoke) +# --------------------------------------------------------------------------- + + +class TestMarkSessionStart: + def test_sets_session_start(self): + import timmy.sovereignty.session_report as sr + + sr._SESSION_START = None + mark_session_start() + assert sr._SESSION_START is not None + assert sr._SESSION_START.tzinfo == UTC + + def test_idempotent_overwrite(self): + import timmy.sovereignty.session_report as sr + + mark_session_start() + first = sr._SESSION_START + time.sleep(0.01) + mark_session_start() + second = sr._SESSION_START + assert second >= first + + +# --------------------------------------------------------------------------- +# _gather_session_data +# --------------------------------------------------------------------------- + + +class TestGatherSessionData: + def test_returns_defaults_when_no_file(self, tmp_path): + mock_logger = MagicMock() + mock_logger.flush.return_value = None + mock_logger.session_file = tmp_path / "nonexistent.jsonl" + + with patch( + "timmy.sovereignty.session_report.get_session_logger", + return_value=mock_logger, + ): + data = _gather_session_data() + + assert data["user_messages"] == 0 + assert data["timmy_messages"] == 0 + assert data["tool_calls"] == 0 + assert data["errors"] == 0 + assert data["tool_call_breakdown"] == {} + + def test_counts_entries_correctly(self, tmp_path): + session_file = tmp_path / "session_2026-03-23.jsonl" + entries = [ + {"type": "message", "role": "user", "content": "hello"}, + {"type": "message", "role": "timmy", "content": "hi"}, + {"type": "message", "role": "user", "content": "test"}, + {"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"}, + {"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"}, + {"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"}, + {"type": "error", "error": "boom"}, + ] + with open(session_file, "w") as f: + for e in entries: + f.write(json.dumps(e) + "\n") + + mock_logger = MagicMock() + mock_logger.flush.return_value = None + mock_logger.session_file = session_file + + with patch( + "timmy.sovereignty.session_report.get_session_logger", + return_value=mock_logger, + ): + data = _gather_session_data() + + assert data["user_messages"] == 2 + assert data["timmy_messages"] == 1 + assert data["tool_calls"] == 3 + assert data["errors"] == 1 + assert data["tool_call_breakdown"]["memory_search"] == 2 + assert data["tool_call_breakdown"]["shell"] == 1 + + def test_graceful_on_import_error(self): + with patch( + "timmy.sovereignty.session_report.get_session_logger", + side_effect=ImportError("no session_logger"), + ): + data = _gather_session_data() + + assert data["tool_calls"] == 0 + + +# --------------------------------------------------------------------------- +# _gather_sovereignty_data +# --------------------------------------------------------------------------- + + +class TestGatherSovereigntyData: + def test_returns_empty_on_import_error(self): + with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}): + with patch( + "timmy.sovereignty.session_report.get_sovereignty_store", + side_effect=ImportError("no store"), + ): + data = _gather_sovereignty_data() + + assert data["metrics"] == {} + assert data["deltas"] == {} + assert data["previous_session"] == {} + + def test_populates_deltas_from_history(self): + mock_store = MagicMock() + mock_store.get_summary.return_value = { + "cache_hit_rate": {"current": 0.5, "phase": "week1"}, + } + # get_latest returns newest-first + mock_store.get_latest.return_value = [ + {"value": 0.5}, + {"value": 0.3}, + {"value": 0.1}, + ] + + with patch( + "timmy.sovereignty.session_report.get_sovereignty_store", + return_value=mock_store, + ): + with patch( + "timmy.sovereignty.session_report.GRADUATION_TARGETS", + {"cache_hit_rate": {"graduation": 0.9}}, + ): + data = _gather_sovereignty_data() + + delta = data["deltas"].get("cache_hit_rate") + assert delta is not None + assert delta["start"] == 0.1 # oldest in window + assert delta["end"] == 0.5 # most recent + assert data["previous_session"]["cache_hit_rate"] == 0.3 + + def test_single_data_point_no_delta(self): + mock_store = MagicMock() + mock_store.get_summary.return_value = {} + mock_store.get_latest.return_value = [{"value": 0.4}] + + with patch( + "timmy.sovereignty.session_report.get_sovereignty_store", + return_value=mock_store, + ): + with patch( + "timmy.sovereignty.session_report.GRADUATION_TARGETS", + {"api_cost": {"graduation": 0.01}}, + ): + data = _gather_sovereignty_data() + + delta = data["deltas"]["api_cost"] + assert delta["start"] == 0.4 + assert delta["end"] == 0.4 + assert data["previous_session"]["api_cost"] is None + + +# --------------------------------------------------------------------------- +# generate_report (integration — smoke test) +# --------------------------------------------------------------------------- + + +class TestGenerateReport: + def _minimal_session_data(self): + return { + "user_messages": 3, + "timmy_messages": 3, + "tool_calls": 2, + "errors": 0, + "tool_call_breakdown": {"memory_search": 2}, + } + + def _minimal_sov_data(self): + return { + "metrics": { + "cache_hit_rate": {"current": 0.45, "phase": "week1"}, + "api_cost": {"current": 0.12, "phase": "pre-start"}, + }, + "deltas": { + "cache_hit_rate": {"start": 0.40, "end": 0.45}, + "api_cost": {"start": 0.10, "end": 0.12}, + }, + "previous_session": { + "cache_hit_rate": 0.40, + "api_cost": 0.10, + }, + } + + def test_smoke_produces_markdown(self): + with ( + patch( + "timmy.sovereignty.session_report._gather_session_data", + return_value=self._minimal_session_data(), + ), + patch( + "timmy.sovereignty.session_report._gather_sovereignty_data", + return_value=self._minimal_sov_data(), + ), + ): + report = generate_report("test-session") + + assert "# Sovereignty Session Report" in report + assert "test-session" in report + assert "## Session Activity" in report + assert "## Sovereignty Scorecard" in report + assert "## Cost Breakdown" in report + assert "## Trend vs Previous Session" in report + + def test_report_contains_session_stats(self): + with ( + patch( + "timmy.sovereignty.session_report._gather_session_data", + return_value=self._minimal_session_data(), + ), + patch( + "timmy.sovereignty.session_report._gather_sovereignty_data", + return_value=self._minimal_sov_data(), + ), + ): + report = generate_report() + + assert "| User messages | 3 |" in report + assert "memory_search" in report + + def test_report_no_previous_session(self): + sov = self._minimal_sov_data() + sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None} + + with ( + patch( + "timmy.sovereignty.session_report._gather_session_data", + return_value=self._minimal_session_data(), + ), + patch( + "timmy.sovereignty.session_report._gather_sovereignty_data", + return_value=sov, + ), + ): + report = generate_report() + + assert "No previous session data" in report + + +# --------------------------------------------------------------------------- +# commit_report +# --------------------------------------------------------------------------- + + +class TestCommitReport: + def test_returns_false_when_gitea_disabled(self): + with patch("timmy.sovereignty.session_report.settings") as mock_settings: + mock_settings.gitea_enabled = False + result = commit_report("# test", "dashboard") + + assert result is False + + def test_returns_false_when_no_token(self): + with patch("timmy.sovereignty.session_report.settings") as mock_settings: + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "" + result = commit_report("# test", "dashboard") + + assert result is False + + def test_creates_file_via_put(self): + mock_response = MagicMock() + mock_response.status_code = 201 + mock_response.raise_for_status.return_value = None + + mock_check = MagicMock() + mock_check.status_code = 404 # file does not exist yet + + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.get.return_value = mock_check + mock_client.put.return_value = mock_response + + with ( + patch("timmy.sovereignty.session_report.settings") as mock_settings, + patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client), + ): + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "fake-token" + mock_settings.gitea_url = "http://localhost:3000" + mock_settings.gitea_repo = "owner/repo" + + result = commit_report("# report content", "dashboard") + + assert result is True + mock_client.put.assert_called_once() + call_kwargs = mock_client.put.call_args + payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {}) + decoded = base64.b64decode(payload["content"]).decode() + assert "# report content" in decoded + + def test_updates_existing_file_with_sha(self): + mock_check = MagicMock() + mock_check.status_code = 200 + mock_check.json.return_value = {"sha": "abc123"} + + mock_response = MagicMock() + mock_response.raise_for_status.return_value = None + + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.get.return_value = mock_check + mock_client.put.return_value = mock_response + + with ( + patch("timmy.sovereignty.session_report.settings") as mock_settings, + patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client), + ): + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "fake-token" + mock_settings.gitea_url = "http://localhost:3000" + mock_settings.gitea_repo = "owner/repo" + + result = commit_report("# updated", "dashboard") + + assert result is True + payload = mock_client.put.call_args.kwargs.get("json", {}) + assert payload.get("sha") == "abc123" + + def test_returns_false_on_http_error(self): + import httpx + + mock_check = MagicMock() + mock_check.status_code = 404 + + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.get.return_value = mock_check + mock_client.put.side_effect = httpx.HTTPStatusError( + "403", request=MagicMock(), response=MagicMock(status_code=403) + ) + + with ( + patch("timmy.sovereignty.session_report.settings") as mock_settings, + patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client), + ): + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "fake-token" + mock_settings.gitea_url = "http://localhost:3000" + mock_settings.gitea_repo = "owner/repo" + + result = commit_report("# test", "dashboard") + + assert result is False + + +# --------------------------------------------------------------------------- +# generate_and_commit_report (async) +# --------------------------------------------------------------------------- + + +class TestGenerateAndCommitReport: + async def test_returns_true_on_success(self): + with ( + patch( + "timmy.sovereignty.session_report.generate_report", + return_value="# mock report", + ), + patch( + "timmy.sovereignty.session_report.commit_report", + return_value=True, + ), + ): + result = await generate_and_commit_report("test") + + assert result is True + + async def test_returns_false_when_commit_fails(self): + with ( + patch( + "timmy.sovereignty.session_report.generate_report", + return_value="# mock report", + ), + patch( + "timmy.sovereignty.session_report.commit_report", + return_value=False, + ), + ): + result = await generate_and_commit_report() + + assert result is False + + async def test_graceful_on_exception(self): + with patch( + "timmy.sovereignty.session_report.generate_report", + side_effect=RuntimeError("explode"), + ): + result = await generate_and_commit_report() + + assert result is False