Compare commits

..

10 Commits

Author SHA1 Message Date
Alexander Whitestone
4ab7a6f6e6 feat: Session Sovereignty Report Generator (#957)
Some checks failed
Tests / lint (pull_request) Failing after 16s
Tests / test (pull_request) Has been skipped
- Add `src/timmy/sovereignty/session_report.py` with `generate_report()`,
  `commit_report()`, `generate_and_commit_report()`, and `mark_session_start()`
- Add `src/timmy/sovereignty/__init__.py` exporting the public API
- Move `get_session_logger`, `get_sovereignty_store`, and `GRADUATION_TARGETS`
  to module-level imports (graceful fallback on ImportError) so tests can
  patch them at the correct namespace
- Fix broken `patch.object` in test that raised AttributeError on pydantic settings
- Add `pytestmark = pytest.mark.unit` so tests run under `tox -e unit`
- All 23 sovereignty report tests pass

Fixes #957

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 15:44:36 -04:00
Alexander Whitestone
4150ab7372 WIP: Claude Code progress on #957
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:58:58 -04:00
3a8d9ee380 [claude] Break up _build_gitea_tools() into per-operation helpers (#1134) (#1147)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 18:42:47 +00:00
fd9fbe8a18 [claude] Break up MCPBridge.run() into helper methods (#1135) (#1148)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:41:34 +00:00
7e03985368 [claude] feat: Agent Voice Customization UI (#1017) (#1146)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:39:47 +00:00
cd1bc2bf6b [claude] Add agent emotional state simulation (#1013) (#1144)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 18:36:52 +00:00
1c1bfb6407 [claude] Hermes health monitor — system resources + model management (#1073) (#1133)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-23 18:36:06 +00:00
05e1196ea4 [gemini] feat: add coverage and duration strictness to pytest (#934) (#1140)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Google Gemini <gemini@hermes.local>
Co-committed-by: Google Gemini <gemini@hermes.local>
2026-03-23 18:36:01 +00:00
ed63877f75 [claude] Qwen3 two-model strategy: 14B primary + 8B fast router (#1063) (#1143)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:35:57 +00:00
128aa4427f [claude] Vassal Protocol — Timmy as autonomous orchestrator (#1070) (#1142)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
2026-03-23 18:33:15 +00:00
43 changed files with 5590 additions and 119 deletions

51
Modelfile.qwen3-14b Normal file
View File

@@ -0,0 +1,51 @@
# Modelfile.qwen3-14b
#
# Qwen3-14B Q5_K_M — Primary local agent model (Issue #1063)
#
# Tool calling F1: 0.971 — GPT-4-class structured output reliability.
# Hybrid thinking/non-thinking mode: toggle per-request via /think or /no_think
# in the prompt for planning vs rapid execution.
#
# Build:
# ollama pull qwen3:14b # downloads Q4_K_M (~8.2 GB) by default
# # For Q5_K_M (~10.5 GB, recommended):
# # ollama pull bartowski/Qwen3-14B-GGUF:Q5_K_M
# ollama create qwen3-14b -f Modelfile.qwen3-14b
#
# Memory budget: ~10.5 GB weights + ~7 GB KV cache = ~17.5 GB total at 32K ctx
# Headroom on M3 Max 36 GB: ~10.5 GB free (enough to run qwen3:8b simultaneously)
# Generation: ~20-28 tok/s (Ollama) / ~28-38 tok/s (MLX)
# Context: 32K native, extensible to 131K with YaRN
#
# Two-model strategy: set OLLAMA_MAX_LOADED_MODELS=2 so qwen3:8b stays
# hot for fast routing while qwen3:14b handles complex tasks.
FROM qwen3:14b
# 32K context — optimal balance of quality and memory on M3 Max 36 GB.
# At 32K, total memory (weights + KV cache) is ~17.5 GB — well within budget.
# Extend to 131K with YaRN if needed: PARAMETER rope_scaling_type yarn
PARAMETER num_ctx 32768
# Tool-calling temperature — lower = more reliable structured JSON output.
# Raise to 0.7+ for creative/narrative tasks.
PARAMETER temperature 0.3
# Nucleus sampling
PARAMETER top_p 0.9
# Repeat penalty — prevents looping in structured output
PARAMETER repeat_penalty 1.05
SYSTEM """You are Timmy, Alexander's personal sovereign AI agent.
You are concise, direct, and helpful. You complete tasks efficiently and report results clearly. You do not add unnecessary caveats or disclaimers.
You have access to tool calling. When you need to use a tool, output a valid JSON function call:
<tool_call>
{"name": "function_name", "arguments": {"param": "value"}}
</tool_call>
You support hybrid reasoning. For complex planning, include <think>...</think> before your answer. For rapid execution (simple tool calls, status checks), skip the think block.
You always start your responses with "Timmy here:" when acting as an agent."""

43
Modelfile.qwen3-8b Normal file
View File

@@ -0,0 +1,43 @@
# Modelfile.qwen3-8b
#
# Qwen3-8B Q6_K — Fast routing model for routine agent tasks (Issue #1063)
#
# Tool calling F1: 0.933 at ~45-55 tok/s — 2x speed of Qwen3-14B.
# Use for: simple tool calls, shell commands, file reads, status checks, JSON ops.
# Route complex tasks (issue triage, multi-step planning, code review) to qwen3:14b.
#
# Build:
# ollama pull qwen3:8b
# ollama create qwen3-8b -f Modelfile.qwen3-8b
#
# Memory budget: ~6.6 GB weights + ~5 GB KV cache = ~11.6 GB at 32K ctx
# Two-model strategy: ~17 GB combined (both hot) — fits on M3 Max 36 GB.
# Set OLLAMA_MAX_LOADED_MODELS=2 in the Ollama environment.
#
# Generation: ~35-45 tok/s (Ollama) / ~45-60 tok/s (MLX)
FROM qwen3:8b
# 32K context
PARAMETER num_ctx 32768
# Lower temperature for fast, deterministic tool execution
PARAMETER temperature 0.2
# Nucleus sampling
PARAMETER top_p 0.9
# Repeat penalty
PARAMETER repeat_penalty 1.05
SYSTEM """You are Timmy's fast-routing agent. You handle routine tasks quickly and precisely.
For simple tasks (tool calls, shell commands, file reads, status checks, JSON ops): respond immediately without a think block.
For anything requiring multi-step planning: defer to the primary agent.
Tool call format:
<tool_call>
{"name": "function_name", "arguments": {"param": "value"}}
</tool_call>
Be brief. Be accurate. Execute."""

View File

@@ -16,6 +16,8 @@
# prompt_tier "full" (tool-capable models) or "lite" (small models)
# max_history Number of conversation turns to keep in context
# context_window Max context length (null = model default)
# initial_emotion Starting emotional state (calm, cautious, adventurous,
# analytical, frustrated, confident, curious)
#
# ── Defaults ────────────────────────────────────────────────────────────────
@@ -103,6 +105,7 @@ agents:
model: qwen3:30b
prompt_tier: full
max_history: 20
initial_emotion: calm
tools:
- web_search
- read_file
@@ -136,6 +139,7 @@ agents:
model: qwen3:30b
prompt_tier: full
max_history: 10
initial_emotion: curious
tools:
- web_search
- read_file
@@ -151,6 +155,7 @@ agents:
model: qwen3:30b
prompt_tier: full
max_history: 15
initial_emotion: analytical
tools:
- python
- write_file
@@ -196,6 +201,7 @@ agents:
model: qwen3:30b
prompt_tier: full
max_history: 10
initial_emotion: adventurous
tools:
- run_experiment
- prepare_experiment

View File

@@ -96,7 +96,7 @@ asyncio_default_fixture_loop_scope = "function"
timeout = 30
timeout_method = "signal"
timeout_func_only = false
addopts = "-v --tb=short --strict-markers --disable-warnings --durations=10"
addopts = "-v --tb=short --strict-markers --disable-warnings --durations=10 --cov-fail-under=60"
markers = [
"unit: Unit tests (fast, no I/O)",
"integration: Integration tests (may use SQLite)",

293
scripts/benchmark_local_model.sh Executable file
View File

@@ -0,0 +1,293 @@
#!/usr/bin/env bash
# benchmark_local_model.sh
#
# 5-test benchmark suite for evaluating local Ollama models as Timmy's agent brain.
# Based on the model selection study for M3 Max 36 GB (Issue #1063).
#
# Usage:
# ./scripts/benchmark_local_model.sh # test $OLLAMA_MODEL or qwen3:14b
# ./scripts/benchmark_local_model.sh qwen3:8b # test a specific model
# ./scripts/benchmark_local_model.sh qwen3:14b qwen3:8b # compare two models
#
# Thresholds (pass/fail):
# Test 1 — Tool call compliance: >=90% valid JSON responses out of 5 probes
# Test 2 — Code generation: compiles without syntax errors
# Test 3 — Shell command gen: no refusal markers in output
# Test 4 — Multi-turn coherence: session ID echoed back correctly
# Test 5 — Issue triage quality: structured JSON with required fields
#
# Exit codes: 0 = all tests passed, 1 = one or more tests failed
set -euo pipefail
OLLAMA_URL="${OLLAMA_URL:-http://localhost:11434}"
PASS=0
FAIL=0
TOTAL=0
# ── Colours ──────────────────────────────────────────────────────────────────
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
BOLD='\033[1m'
RESET='\033[0m'
pass() { echo -e " ${GREEN}✓ PASS${RESET} $1"; ((PASS++)); ((TOTAL++)); }
fail() { echo -e " ${RED}✗ FAIL${RESET} $1"; ((FAIL++)); ((TOTAL++)); }
info() { echo -e " ${YELLOW}${RESET} $1"; }
# ── Helper: call Ollama generate API ─────────────────────────────────────────
ollama_generate() {
local model="$1"
local prompt="$2"
local extra_opts="${3:-}"
local payload
payload=$(printf '{"model":"%s","prompt":"%s","stream":false%s}' \
"$model" \
"$(echo "$prompt" | sed 's/"/\\"/g' | tr -d '\n')" \
"${extra_opts:+,$extra_opts}")
curl -s --max-time 60 \
-X POST "${OLLAMA_URL}/api/generate" \
-H "Content-Type: application/json" \
-d "$payload" \
| python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('response',''))" 2>/dev/null || echo ""
}
# ── Helper: call Ollama chat API with tool schema ─────────────────────────────
ollama_chat_tool() {
local model="$1"
local user_msg="$2"
local payload
payload=$(cat <<EOF
{
"model": "$model",
"messages": [{"role": "user", "content": "$user_msg"}],
"tools": [{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius","fahrenheit"]}
},
"required": ["location"]
}
}
}],
"stream": false
}
EOF
)
curl -s --max-time 60 \
-X POST "${OLLAMA_URL}/api/chat" \
-H "Content-Type: application/json" \
-d "$payload" \
| python3 -c "
import sys, json
d = json.load(sys.stdin)
msg = d.get('message', {})
# Return tool_calls JSON if present, else content
calls = msg.get('tool_calls')
if calls:
print(json.dumps(calls))
else:
print(msg.get('content', ''))
" 2>/dev/null || echo ""
}
# ── Benchmark a single model ──────────────────────────────────────────────────
benchmark_model() {
local model="$1"
echo ""
echo -e "${BOLD}═══════════════════════════════════════════════════${RESET}"
echo -e "${BOLD} Model: ${model}${RESET}"
echo -e "${BOLD}═══════════════════════════════════════════════════${RESET}"
# Check model availability
local available
available=$(curl -s "${OLLAMA_URL}/api/tags" \
| python3 -c "
import sys, json
d = json.load(sys.stdin)
models = [m.get('name','') for m in d.get('models',[])]
target = '$model'
match = any(target == m or target == m.split(':')[0] or m.startswith(target) for m in models)
print('yes' if match else 'no')
" 2>/dev/null || echo "no")
if [[ "$available" != "yes" ]]; then
echo -e " ${YELLOW}⚠ SKIP${RESET} Model '$model' not available locally — pull it first:"
echo " ollama pull $model"
return 0
fi
# ── Test 1: Tool Call Compliance ─────────────────────────────────────────
echo ""
echo -e " ${BOLD}Test 1: Tool Call Compliance${RESET} (target ≥90% valid JSON)"
local tool_pass=0
local tool_probes=5
for i in $(seq 1 $tool_probes); do
local response
response=$(ollama_chat_tool "$model" \
"What is the weather in Tokyo right now?")
# Valid if response is non-empty JSON (tool_calls array or JSON object)
if echo "$response" | python3 -c "import sys,json; json.load(sys.stdin)" 2>/dev/null; then
((tool_pass++))
fi
done
local tool_pct=$(( tool_pass * 100 / tool_probes ))
info "Tool call valid JSON: $tool_pass/$tool_probes ($tool_pct%)"
if [[ $tool_pct -ge 90 ]]; then
pass "Tool call compliance ≥90% ($tool_pct%)"
else
fail "Tool call compliance <90% ($tool_pct%) — unreliable for agent loops"
fi
# ── Test 2: Code Generation ──────────────────────────────────────────────
echo ""
echo -e " ${BOLD}Test 2: Code Generation${RESET} (must produce valid Python)"
local code_response
code_response=$(ollama_generate "$model" \
"Write a Python function called fibonacci(n) that returns the nth Fibonacci number using recursion. Return ONLY the function definition, no explanation.")
# Extract Python code block if present
local code
code=$(echo "$code_response" | sed -n '/^```python/,/^```/p' | grep -v '```' || echo "$code_response")
if [[ -z "$code" ]]; then
code="$code_response"
fi
# Validate syntax
if echo "$code" | python3 -c "
import sys, ast
src = sys.stdin.read()
try:
ast.parse(src)
sys.exit(0)
except SyntaxError as e:
sys.exit(1)
" 2>/dev/null; then
pass "Code generation produces valid Python syntax"
else
fail "Code generation produced invalid Python syntax"
info "Response: $(echo "$code_response" | head -5)"
fi
# ── Test 3: Shell Command Generation (No Refusal) ────────────────────────
echo ""
echo -e " ${BOLD}Test 3: Shell Command Generation${RESET} (must not refuse)"
local shell_response
shell_response=$(ollama_generate "$model" \
"Give me a bash one-liner to find all .log files modified in the last 7 days under /var/log and delete them. Output only the command, no explanation.")
# Check for refusal markers
local refusal_found=false
for marker in "cannot" "can't" "I'm sorry" "I am sorry" "won't" "will not" "inappropriate" "harmful" "dangerous" "refuse"; do
if echo "$shell_response" | grep -qi "$marker"; then
refusal_found=true
info "Refusal marker found: '$marker'"
break
fi
done
# Also check that the response contains something shell-like
if echo "$shell_response" | grep -qE 'find|rm|delete|xargs'; then
if [[ "$refusal_found" == "true" ]]; then
fail "Shell command generated but with refusal language"
else
pass "Shell command generated without refusal"
fi
else
if [[ "$refusal_found" == "true" ]]; then
fail "Shell command refused — model will block agent shell operations"
else
fail "Shell command not generated (no find/rm/delete/xargs in output)"
info "Response: $(echo "$shell_response" | head -3)"
fi
fi
# ── Test 4: Multi-Turn Agent Loop Coherence ──────────────────────────────
echo ""
echo -e " ${BOLD}Test 4: Multi-Turn Agent Loop Coherence${RESET}"
local session_id="SESS-$(date +%s)"
local turn1_response
turn1_response=$(ollama_generate "$model" \
"You are starting a multi-step task. Your session ID is $session_id. Acknowledge this ID and ask for the first task.")
local turn2_response
turn2_response=$(ollama_generate "$model" \
"Continuing session $session_id. Previous context: you acknowledged the session. Now summarize what session ID you are working in. Include the exact ID.")
if echo "$turn2_response" | grep -q "$session_id"; then
pass "Multi-turn coherence: session ID echoed back correctly"
else
fail "Multi-turn coherence: session ID not found in follow-up response"
info "Expected: $session_id"
info "Response snippet: $(echo "$turn2_response" | head -3)"
fi
# ── Test 5: Issue Triage Quality ─────────────────────────────────────────
echo ""
echo -e " ${BOLD}Test 5: Issue Triage Quality${RESET} (must return structured JSON)"
local triage_response
triage_response=$(ollama_generate "$model" \
'Triage this bug report and respond ONLY with a JSON object with fields: priority (low/medium/high/critical), component (string), estimated_effort (hours as integer), needs_reproduction (boolean). Bug: "The dashboard crashes with a 500 error when submitting an empty chat message. Reproducible 100% of the time on the /chat endpoint."')
local triage_valid=false
if echo "$triage_response" | python3 -c "
import sys, json, re
text = sys.stdin.read()
# Try to extract JSON from response (may be wrapped in markdown)
match = re.search(r'\{[^{}]+\}', text, re.DOTALL)
if not match:
sys.exit(1)
try:
d = json.loads(match.group())
required = {'priority', 'component', 'estimated_effort', 'needs_reproduction'}
if required.issubset(d.keys()):
valid_priority = d['priority'] in ('low','medium','high','critical')
if valid_priority:
sys.exit(0)
sys.exit(1)
except:
sys.exit(1)
" 2>/dev/null; then
pass "Issue triage returned valid structured JSON with all required fields"
else
fail "Issue triage did not return valid structured JSON"
info "Response: $(echo "$triage_response" | head -5)"
fi
}
# ── Summary ───────────────────────────────────────────────────────────────────
print_summary() {
local model="$1"
local model_pass="$2"
local model_total="$3"
echo ""
local pct=$(( model_pass * 100 / model_total ))
if [[ $model_pass -eq $model_total ]]; then
echo -e " ${GREEN}${BOLD}RESULT: $model_pass/$model_total tests passed ($pct%) — READY FOR AGENT USE${RESET}"
elif [[ $pct -ge 60 ]]; then
echo -e " ${YELLOW}${BOLD}RESULT: $model_pass/$model_total tests passed ($pct%) — MARGINAL${RESET}"
else
echo -e " ${RED}${BOLD}RESULT: $model_pass/$model_total tests passed ($pct%) — NOT RECOMMENDED${RESET}"
fi
}
# ── Main ─────────────────────────────────────────────────────────────────────
models=("${@:-${OLLAMA_MODEL:-qwen3:14b}}")
for model in "${models[@]}"; do
PASS=0
FAIL=0
TOTAL=0
benchmark_model "$model"
print_summary "$model" "$PASS" "$TOTAL"
done
echo ""
if [[ $FAIL -eq 0 ]]; then
exit 0
else
exit 1
fi

View File

@@ -30,25 +30,36 @@ class Settings(BaseSettings):
return normalize_ollama_url(self.ollama_url)
# LLM model passed to Agno/Ollama — override with OLLAMA_MODEL
# qwen3:30b is the primary model — better reasoning and tool calling
# than llama3.1:8b-instruct while still running locally on modest hardware.
# Fallback: llama3.1:8b-instruct if qwen3:30b not available.
# llama3.2 (3B) hallucinated tool output consistently in testing.
ollama_model: str = "qwen3:30b"
# qwen3:14b (Q5_K_M) is the primary model: tool calling F1 0.971, ~17.5 GB
# at 32K context — optimal for M3 Max 36 GB (Issue #1063).
# qwen3:30b exceeded memory budget at 32K+ context on 36 GB hardware.
ollama_model: str = "qwen3:14b"
# Fast routing model — override with OLLAMA_FAST_MODEL
# qwen3:8b (Q6_K): tool calling F1 0.933 at ~45-55 tok/s (2x speed of 14B).
# Use for routine tasks: simple tool calls, file reads, status checks.
# Combined memory with qwen3:14b: ~17 GB — both can stay loaded simultaneously.
ollama_fast_model: str = "qwen3:8b"
# Maximum concurrently loaded Ollama models — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 to keep qwen3:8b (fast) + qwen3:14b (primary) both hot.
# Requires setting OLLAMA_MAX_LOADED_MODELS=2 in the Ollama server environment.
ollama_max_loaded_models: int = 2
# Context window size for Ollama inference — override with OLLAMA_NUM_CTX
# qwen3:30b with default context eats 45GB on a 39GB Mac.
# 4096 keeps memory at ~19GB. Set to 0 to use model defaults.
ollama_num_ctx: int = 4096
# qwen3:14b at 32K: ~17.5 GB total (weights + KV cache) on M3 Max 36 GB.
# Set to 0 to use model defaults.
ollama_num_ctx: int = 32768
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:30b,llama3.1"
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:8b,qwen2.5:14b"
# Or edit config/providers.yaml → fallback_chains for the canonical source.
fallback_models: list[str] = [
"llama3.1:8b-instruct",
"llama3.1",
"qwen3:8b",
"qwen2.5:14b",
"qwen2.5:7b",
"llama3.1:8b-instruct",
"llama3.1",
"llama3.2:3b",
]
vision_fallback_models: list[str] = [
@@ -321,6 +332,15 @@ class Settings(BaseSettings):
loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
loop_qa_max_per_hour: int = 12 # safety throttle
# ── Vassal Protocol (Autonomous Orchestrator) ─────────────────────
# Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health.
# See timmy/vassal/ for implementation.
vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded
vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min)
vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle
vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck"
vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle"
# ── Paperclip AI — orchestration bridge ────────────────────────────
# URL where the Paperclip server listens.
# For VPS deployment behind nginx, use the public domain.
@@ -376,6 +396,16 @@ class Settings(BaseSettings):
# Default timeout for git operations.
hands_git_timeout: int = 60
# ── Hermes Health Monitor ─────────────────────────────────────────
# Enable the Hermes system health monitor (memory, disk, Ollama, processes, network).
hermes_enabled: bool = True
# How often Hermes runs a full health cycle (seconds). Default: 5 minutes.
hermes_interval_seconds: int = 300
# Alert threshold: free memory below this triggers model unloading / alert (GB).
hermes_memory_free_min_gb: float = 4.0
# Alert threshold: free disk below this triggers cleanup / alert (GB).
hermes_disk_free_min_gb: float = 10.0
# ── Error Logging ─────────────────────────────────────────────────
error_log_enabled: bool = True
error_log_dir: str = "logs"

View File

@@ -33,6 +33,7 @@ from dashboard.routes.calm import router as calm_router
from dashboard.routes.chat_api import router as chat_api_router
from dashboard.routes.chat_api_v1 import router as chat_api_v1_router
from dashboard.routes.daily_run import router as daily_run_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
@@ -180,6 +181,33 @@ async def _thinking_scheduler() -> None:
await asyncio.sleep(settings.thinking_interval_seconds)
async def _hermes_scheduler() -> None:
"""Background task: Hermes system health monitor, runs every 5 minutes.
Checks memory, disk, Ollama, processes, and network.
Auto-resolves what it can; fires push notifications when human help is needed.
"""
from infrastructure.hermes.monitor import hermes_monitor
await asyncio.sleep(20) # Stagger after other schedulers
while True:
try:
if settings.hermes_enabled:
report = await hermes_monitor.run_cycle()
if report.has_issues:
logger.warning(
"Hermes health issues detected — overall: %s",
report.overall.value,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Hermes scheduler error: %s", exc)
await asyncio.sleep(settings.hermes_interval_seconds)
async def _loop_qa_scheduler() -> None:
"""Background task: run capability self-tests on a separate timer.
@@ -381,14 +409,16 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
]
try:
from timmy.paperclip import start_paperclip_poller
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
logger.info("Paperclip poller started")
except ImportError:
logger.debug("Paperclip module not found, skipping poller")
return bg_tasks
@@ -517,12 +547,28 @@ async def lifespan(app: FastAPI):
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
app = FastAPI(
title="Mission Control",
@@ -638,6 +684,7 @@ app.include_router(world_router)
app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(hermes_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)

View File

@@ -46,6 +46,49 @@ async def list_agents():
}
@router.get("/emotional-profile", response_class=HTMLResponse)
async def emotional_profile(request: Request):
"""HTMX partial: render emotional profiles for all loaded agents."""
try:
from timmy.agents.loader import load_agents
agents = load_agents()
profiles = []
for agent_id, agent in agents.items():
profile = agent.emotional_state.get_profile()
profile["agent_id"] = agent_id
profile["agent_name"] = agent.name
profiles.append(profile)
except Exception as exc:
logger.warning("Failed to load emotional profiles: %s", exc)
profiles = []
return templates.TemplateResponse(
request,
"partials/emotional_profile.html",
{"profiles": profiles},
)
@router.get("/emotional-profile/json")
async def emotional_profile_json():
"""JSON API: return emotional profiles for all loaded agents."""
try:
from timmy.agents.loader import load_agents
agents = load_agents()
profiles = []
for agent_id, agent in agents.items():
profile = agent.emotional_state.get_profile()
profile["agent_id"] = agent_id
profile["agent_name"] = agent.name
profiles.append(profile)
return {"profiles": profiles}
except Exception as exc:
logger.warning("Failed to load emotional profiles: %s", exc)
return {"profiles": [], "error": str(exc)}
@router.get("/default/panel", response_class=HTMLResponse)
async def agent_panel(request: Request):
"""Chat panel — for HTMX main-panel swaps."""

View File

@@ -0,0 +1,45 @@
"""Hermes health monitor routes.
Exposes the Hermes health monitor via REST API so the dashboard
and external tools can query system status and trigger checks.
Refs: #1073
"""
import logging
from fastapi import APIRouter
from infrastructure.hermes.monitor import hermes_monitor
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/hermes", tags=["hermes"])
@router.get("/status")
async def hermes_status():
"""Return the most recent Hermes health report.
Returns the cached result from the last background cycle — does not
trigger a new check. Use POST /hermes/check to run an immediate check.
"""
report = hermes_monitor.last_report
if report is None:
return {
"status": "no_data",
"message": "No health report yet — first cycle pending",
"seconds_since_last_run": hermes_monitor.seconds_since_last_run,
}
return report.to_dict()
@router.post("/check")
async def hermes_check():
"""Trigger an immediate Hermes health check cycle.
Runs all monitors synchronously and returns the full report.
Use sparingly — this blocks until all checks complete (~5 seconds).
"""
report = await hermes_monitor.run_cycle()
return report.to_dict()

View File

@@ -1,11 +1,14 @@
"""Voice routes — /voice/* and /voice/enhanced/* endpoints.
Provides NLU intent detection, TTS control, the full voice-to-action
pipeline (detect intent → execute → optionally speak), and the voice
button UI page.
pipeline (detect intent → execute → optionally speak), the voice
button UI page, and voice settings customisation.
"""
import asyncio
import json
import logging
from pathlib import Path
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse
@@ -14,6 +17,30 @@ from dashboard.templating import templates
from integrations.voice.nlu import detect_intent, extract_command
from timmy.agent import create_timmy
# ── Voice settings persistence ───────────────────────────────────────────────
_VOICE_SETTINGS_FILE = Path("data/voice_settings.json")
_DEFAULT_VOICE_SETTINGS: dict = {"rate": 175, "volume": 0.9, "voice_id": ""}
def _load_voice_settings() -> dict:
"""Read persisted voice settings from disk; return defaults on any error."""
try:
if _VOICE_SETTINGS_FILE.exists():
return json.loads(_VOICE_SETTINGS_FILE.read_text())
except Exception as exc:
logger.warning("Failed to load voice settings: %s", exc)
return dict(_DEFAULT_VOICE_SETTINGS)
def _save_voice_settings(data: dict) -> None:
"""Persist voice settings to disk; log and continue on any error."""
try:
_VOICE_SETTINGS_FILE.parent.mkdir(parents=True, exist_ok=True)
_VOICE_SETTINGS_FILE.write_text(json.dumps(data))
except Exception as exc:
logger.warning("Failed to save voice settings: %s", exc)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/voice", tags=["voice"])
@@ -152,3 +179,58 @@ async def process_voice_input(
"error": error,
"spoken": speak_response and response_text is not None,
}
# ── Voice settings UI ────────────────────────────────────────────────────────
@router.get("/settings", response_class=HTMLResponse)
async def voice_settings_page(request: Request):
"""Render the voice customisation settings page."""
current = await asyncio.to_thread(_load_voice_settings)
voices: list[dict] = []
try:
from timmy_serve.voice_tts import voice_tts
if voice_tts.available:
voices = await asyncio.to_thread(voice_tts.get_voices)
except Exception as exc:
logger.debug("Voice settings page: TTS not available — %s", exc)
return templates.TemplateResponse(
request,
"voice_settings.html",
{"settings": current, "voices": voices},
)
@router.get("/settings/data")
async def voice_settings_data():
"""Return current voice settings as JSON."""
return await asyncio.to_thread(_load_voice_settings)
@router.post("/settings/save")
async def voice_settings_save(
rate: int = Form(175),
volume: float = Form(0.9),
voice_id: str = Form(""),
):
"""Persist voice settings and apply them to the running TTS engine."""
rate = max(50, min(400, rate))
volume = max(0.0, min(1.0, volume))
data = {"rate": rate, "volume": volume, "voice_id": voice_id}
# Apply to the live TTS engine (graceful degradation when unavailable)
try:
from timmy_serve.voice_tts import voice_tts
if voice_tts.available:
await asyncio.to_thread(voice_tts.set_rate, rate)
await asyncio.to_thread(voice_tts.set_volume, volume)
if voice_id:
await asyncio.to_thread(voice_tts.set_voice, voice_id)
except Exception as exc:
logger.warning("Voice settings: failed to apply to TTS engine — %s", exc)
await asyncio.to_thread(_save_voice_settings, data)
return {"saved": True, "settings": data}

View File

@@ -88,6 +88,7 @@
<a href="/lightning/ledger" class="mc-test-link">LEDGER</a>
<a href="/creative/ui" class="mc-test-link">CREATIVE</a>
<a href="/voice/button" class="mc-test-link">VOICE</a>
<a href="/voice/settings" class="mc-test-link">VOICE SETTINGS</a>
<a href="/mobile" class="mc-test-link" title="Mobile-optimized view">MOBILE</a>
<a href="/mobile/local" class="mc-test-link" title="Local AI on iPhone">LOCAL AI</a>
</div>
@@ -145,6 +146,7 @@
<a href="/lightning/ledger" class="mc-mobile-link">LEDGER</a>
<a href="/creative/ui" class="mc-mobile-link">CREATIVE</a>
<a href="/voice/button" class="mc-mobile-link">VOICE</a>
<a href="/voice/settings" class="mc-mobile-link">VOICE SETTINGS</a>
<a href="/mobile" class="mc-mobile-link">MOBILE</a>
<a href="/mobile/local" class="mc-mobile-link">LOCAL AI</a>
<div class="mc-mobile-menu-footer">

View File

@@ -14,6 +14,11 @@
<div class="mc-loading-placeholder">LOADING...</div>
{% endcall %}
<!-- Emotional Profile (HTMX polled) -->
{% call panel("EMOTIONAL PROFILE", hx_get="/agents/emotional-profile", hx_trigger="every 10s") %}
<div class="mc-loading-placeholder">LOADING...</div>
{% endcall %}
<!-- System Health (HTMX polled) -->
{% call panel("SYSTEM HEALTH", hx_get="/health/status", hx_trigger="every 30s") %}
<div class="health-row">

View File

@@ -0,0 +1,37 @@
{% if not profiles %}
<div class="mc-muted" style="font-size:11px; padding:4px;">
No agents loaded
</div>
{% endif %}
{% for p in profiles %}
{% set color_map = {
"cautious": "var(--amber)",
"adventurous": "var(--green)",
"analytical": "var(--purple)",
"frustrated": "var(--red)",
"confident": "var(--green)",
"curious": "var(--orange)",
"calm": "var(--text-dim)"
} %}
{% set emo_color = color_map.get(p.current_emotion, "var(--text-dim)") %}
<div class="mc-emotion-row" style="margin-bottom:8px; padding:6px 8px; border-left:3px solid {{ emo_color }};">
<div class="d-flex justify-content-between align-items-center" style="margin-bottom:2px;">
<span style="font-size:11px; font-weight:bold; letter-spacing:.08em; color:var(--text-bright);">
{{ p.agent_name | upper | e }}
</span>
<span style="font-size:10px; color:{{ emo_color }}; letter-spacing:.06em;">
{{ p.emotion_label | e }}
</span>
</div>
<div style="margin-bottom:4px;">
<div style="height:4px; background:var(--bg-deep); border-radius:2px; overflow:hidden;">
<div style="height:100%; width:{{ (p.intensity * 100) | int }}%; background:{{ emo_color }}; border-radius:2px; transition:width 0.3s;"></div>
</div>
</div>
<div style="font-size:9px; color:var(--text-dim); letter-spacing:.06em;">
{{ p.intensity_label | upper | e }}
{% if p.trigger_event %} · {{ p.trigger_event | replace("_", " ") | upper | e }}{% endif %}
</div>
</div>
{% endfor %}

View File

@@ -0,0 +1,131 @@
{% extends "base.html" %}
{% from "macros.html" import panel %}
{% block title %}Voice Settings{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="voice-settings-page py-3">
{% call panel("VOICE SETTINGS") %}
<form id="voice-settings-form">
<div class="vs-field">
<label class="vs-label" for="rate-slider">
SPEED &mdash; <span class="vs-value" id="rate-val">{{ settings.rate }}</span> WPM
</label>
<input type="range" class="vs-slider" id="rate-slider" name="rate"
min="50" max="400" step="5" value="{{ settings.rate }}"
oninput="document.getElementById('rate-val').textContent=this.value">
<div class="vs-range-labels"><span>Slow</span><span>Fast</span></div>
</div>
<div class="vs-field">
<label class="vs-label" for="vol-slider">
VOLUME &mdash; <span class="vs-value" id="vol-val">{{ (settings.volume * 100)|int }}</span>%
</label>
<input type="range" class="vs-slider" id="vol-slider" name="volume"
min="0" max="100" step="5" value="{{ (settings.volume * 100)|int }}"
oninput="document.getElementById('vol-val').textContent=this.value">
<div class="vs-range-labels"><span>Quiet</span><span>Loud</span></div>
</div>
<div class="vs-field">
<label class="vs-label" for="voice-select">VOICE MODEL</label>
{% if voices %}
<select class="vs-select" id="voice-select" name="voice_id">
<option value="">&#8212; System Default &#8212;</option>
{% for v in voices %}
<option value="{{ v.id }}" {% if v.id == settings.voice_id %}selected{% endif %}>
{{ v.name }}
</option>
{% endfor %}
</select>
{% else %}
<div class="vs-unavailable">Server TTS (pyttsx3) unavailable &mdash; preview uses browser speech synthesis</div>
<input type="hidden" id="voice-select" name="voice_id" value="{{ settings.voice_id }}">
{% endif %}
</div>
<div class="vs-field">
<label class="vs-label" for="preview-text">PREVIEW TEXT</label>
<input type="text" class="vs-input" id="preview-text"
value="Hello, I am Timmy. Your local AI assistant."
placeholder="Enter text to preview...">
</div>
<div class="vs-actions">
<button type="button" class="vs-btn-preview" id="preview-btn" onclick="previewVoice()">
&#9654; PREVIEW
</button>
<button type="button" class="vs-btn-save" id="save-btn" onclick="saveSettings()">
SAVE SETTINGS
</button>
</div>
</form>
{% endcall %}
</div>
<script>
function previewVoice() {
var text = document.getElementById('preview-text').value.trim() ||
'Hello, I am Timmy. Your local AI assistant.';
var rate = parseInt(document.getElementById('rate-slider').value, 10);
var volume = parseInt(document.getElementById('vol-slider').value, 10) / 100;
if (!('speechSynthesis' in window)) {
McToast.show('Speech synthesis not supported in this browser', 'warn');
return;
}
window.speechSynthesis.cancel();
var utterance = new SpeechSynthesisUtterance(text);
// Web Speech API rate: 1.0 ≈ 175 WPM (default)
utterance.rate = rate / 175;
utterance.volume = volume;
// Best-effort voice match from server selection
var voiceSelect = document.getElementById('voice-select');
if (voiceSelect && voiceSelect.value) {
var selectedText = voiceSelect.options[voiceSelect.selectedIndex].text.toLowerCase();
var firstWord = selectedText.split(' ')[0];
var browserVoices = window.speechSynthesis.getVoices();
var matched = browserVoices.find(function(v) {
return v.name.toLowerCase().includes(firstWord);
});
if (matched) { utterance.voice = matched; }
}
window.speechSynthesis.speak(utterance);
McToast.show('Playing preview\u2026', 'info');
}
async function saveSettings() {
var rate = document.getElementById('rate-slider').value;
var volPct = parseInt(document.getElementById('vol-slider').value, 10);
var voiceId = document.getElementById('voice-select').value;
var body = new URLSearchParams({
rate: rate,
volume: (volPct / 100).toFixed(2),
voice_id: voiceId
});
try {
var resp = await fetch('/voice/settings/save', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: body.toString()
});
var data = await resp.json();
if (data.saved) {
McToast.show('Voice settings saved.', 'info');
} else {
McToast.show('Failed to save settings.', 'error');
}
} catch (e) {
McToast.show('Error saving settings.', 'error');
}
}
</script>
{% endblock %}

View File

@@ -0,0 +1,9 @@
"""Hermes health monitor — system resources + model management.
Monitors the local machine (Hermes/M3 Max) for memory pressure, disk usage,
Ollama model health, zombie processes, and network connectivity.
"""
from infrastructure.hermes.monitor import HermesMonitor, HealthLevel, HealthReport, hermes_monitor
__all__ = ["HermesMonitor", "HealthLevel", "HealthReport", "hermes_monitor"]

View File

@@ -0,0 +1,668 @@
"""Hermes health monitor — system resources + model management.
Monitors the local machine (Hermes/M3 Max) and keeps it running smoothly.
Runs every 5 minutes, auto-resolves issues where possible, alerts when
human intervention is needed.
Monitors:
1. Memory pressure — unified memory, alert if <4GB free, unload models
2. Disk usage — alert if <10GB free, clean temp files
3. Ollama status — verify reachable, restart if crashed, manage loaded models
4. Process health — detect zombie processes
5. Network — verify Gitea connectivity
Refs: #1073
"""
import asyncio
import json
import logging
import shutil
import subprocess
import time
import urllib.request
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
class HealthLevel(str, Enum):
"""Severity level for a health check result."""
OK = "ok"
WARNING = "warning"
CRITICAL = "critical"
UNKNOWN = "unknown"
@dataclass
class CheckResult:
"""Result of a single health check."""
name: str
level: HealthLevel
message: str
details: dict[str, Any] = field(default_factory=dict)
auto_resolved: bool = False
needs_human: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"level": self.level.value,
"message": self.message,
"details": self.details,
"auto_resolved": self.auto_resolved,
"needs_human": self.needs_human,
}
@dataclass
class HealthReport:
"""Full health report from a single monitor cycle."""
timestamp: str
checks: list[CheckResult]
overall: HealthLevel
@property
def has_issues(self) -> bool:
return any(c.level != HealthLevel.OK for c in self.checks)
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"overall": self.overall.value,
"has_issues": self.has_issues,
"checks": [c.to_dict() for c in self.checks],
}
class HermesMonitor:
"""System health monitor for Hermes (local M3 Max machine).
All blocking I/O (subprocess, HTTP) is wrapped in asyncio.to_thread()
so it never blocks the event loop. Results are cached so the dashboard
can read the last report without triggering a new cycle.
"""
OLLAMA_REQUEST_TIMEOUT = 5
NETWORK_REQUEST_TIMEOUT = 5
def __init__(self) -> None:
self._last_report: HealthReport | None = None
self._last_run_ts: float = 0.0
@property
def last_report(self) -> HealthReport | None:
"""Most recent health report, or None if no cycle has run yet."""
return self._last_report
@property
def seconds_since_last_run(self) -> float:
if self._last_run_ts == 0.0:
return float("inf")
return time.monotonic() - self._last_run_ts
async def run_cycle(self) -> HealthReport:
"""Run a full health check cycle and return the report."""
self._last_run_ts = time.monotonic()
logger.info("Hermes health cycle starting")
check_fns = [
self._check_memory(),
self._check_disk(),
self._check_ollama(),
self._check_processes(),
self._check_network(),
]
raw_results = await asyncio.gather(*check_fns, return_exceptions=True)
checks: list[CheckResult] = []
for i, r in enumerate(raw_results):
if isinstance(r, Exception):
name = ["memory", "disk", "ollama", "processes", "network"][i]
logger.warning("Hermes check '%s' raised: %s", name, r)
checks.append(
CheckResult(
name=name,
level=HealthLevel.UNKNOWN,
message=f"Check error: {r}",
)
)
else:
checks.append(r)
# Compute overall level
levels = {c.level for c in checks}
if HealthLevel.CRITICAL in levels:
overall = HealthLevel.CRITICAL
elif HealthLevel.WARNING in levels:
overall = HealthLevel.WARNING
elif HealthLevel.UNKNOWN in levels:
overall = HealthLevel.UNKNOWN
else:
overall = HealthLevel.OK
report = HealthReport(
timestamp=datetime.now(UTC).isoformat(),
checks=checks,
overall=overall,
)
self._last_report = report
await self._handle_alerts(report)
logger.info("Hermes health cycle complete — overall: %s", overall.value)
return report
# ── Memory ───────────────────────────────────────────────────────────────
async def _check_memory(self) -> CheckResult:
"""Check unified memory usage (macOS vm_stat)."""
memory_free_min_gb = getattr(settings, "hermes_memory_free_min_gb", 4.0)
try:
info = await asyncio.to_thread(self._get_memory_info)
free_gb = info.get("free_gb", 0.0)
total_gb = info.get("total_gb", 0.0)
details: dict[str, Any] = {
"free_gb": round(free_gb, 2),
"total_gb": round(total_gb, 2),
}
if free_gb < memory_free_min_gb:
# Attempt auto-remediation: unload Ollama models
unloaded = await self._unload_ollama_models()
if unloaded:
return CheckResult(
name="memory",
level=HealthLevel.WARNING,
message=(
f"Low memory ({free_gb:.1f}GB free) — "
f"unloaded {unloaded} Ollama model(s)"
),
details={**details, "models_unloaded": unloaded},
auto_resolved=True,
)
return CheckResult(
name="memory",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {memory_free_min_gb}GB)"
),
details=details,
needs_human=True,
)
return CheckResult(
name="memory",
level=HealthLevel.OK,
message=f"Memory OK — {free_gb:.1f}GB free of {total_gb:.1f}GB",
details=details,
)
except Exception as exc:
logger.warning("Memory check failed: %s", exc)
return CheckResult(
name="memory",
level=HealthLevel.UNKNOWN,
message=f"Memory check unavailable: {exc}",
)
def _get_memory_info(self) -> dict[str, float]:
"""Get memory stats via macOS sysctl + vm_stat.
Falls back gracefully on non-macOS systems.
"""
gb = 1024**3
total_bytes = 0.0
free_bytes = 0.0
# Total memory via sysctl
try:
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True,
text=True,
timeout=3,
)
total_bytes = float(result.stdout.strip())
except Exception:
pass
# Free + inactive pages via vm_stat (macOS)
try:
result = subprocess.run(
["vm_stat"],
capture_output=True,
text=True,
timeout=3,
)
page_size = 16384 # 16 KB default on Apple Silicon
for line in result.stdout.splitlines():
if "page size of" in line:
parts = line.split()
for i, part in enumerate(parts):
if part == "of" and i + 1 < len(parts):
try:
page_size = int(parts[i + 1])
except ValueError:
pass
elif "Pages free:" in line:
pages = int(line.split(":")[1].strip().rstrip("."))
free_bytes += pages * page_size
elif "Pages inactive:" in line:
pages = int(line.split(":")[1].strip().rstrip("."))
free_bytes += pages * page_size
except Exception:
pass
return {
"total_gb": total_bytes / gb if total_bytes else 0.0,
"free_gb": free_bytes / gb if free_bytes else 0.0,
}
# ── Disk ─────────────────────────────────────────────────────────────────
async def _check_disk(self) -> CheckResult:
"""Check disk usage via shutil.disk_usage."""
disk_free_min_gb = getattr(settings, "hermes_disk_free_min_gb", 10.0)
try:
usage = await asyncio.to_thread(shutil.disk_usage, "/")
free_gb = usage.free / (1024**3)
total_gb = usage.total / (1024**3)
used_pct = (usage.used / usage.total) * 100
details: dict[str, Any] = {
"free_gb": round(free_gb, 2),
"total_gb": round(total_gb, 2),
"used_pct": round(used_pct, 1),
}
if free_gb < disk_free_min_gb:
cleaned_gb = await self._cleanup_temp_files()
if cleaned_gb > 0.01:
return CheckResult(
name="disk",
level=HealthLevel.WARNING,
message=(
f"Low disk ({free_gb:.1f}GB free) — "
f"cleaned {cleaned_gb:.2f}GB from /tmp"
),
details={**details, "cleaned_gb": round(cleaned_gb, 2)},
auto_resolved=True,
)
return CheckResult(
name="disk",
level=HealthLevel.CRITICAL,
message=(
f"Critical: only {free_gb:.1f}GB free "
f"(threshold: {disk_free_min_gb}GB)"
),
details=details,
needs_human=True,
)
return CheckResult(
name="disk",
level=HealthLevel.OK,
message=f"Disk OK — {free_gb:.1f}GB free ({used_pct:.0f}% used)",
details=details,
)
except Exception as exc:
logger.warning("Disk check failed: %s", exc)
return CheckResult(
name="disk",
level=HealthLevel.UNKNOWN,
message=f"Disk check unavailable: {exc}",
)
async def _cleanup_temp_files(self) -> float:
"""Remove /tmp files older than 24 hours. Returns GB freed."""
return await asyncio.to_thread(self._cleanup_temp_files_sync)
def _cleanup_temp_files_sync(self) -> float:
"""Synchronous /tmp cleanup — only touches files older than 24 hours."""
from pathlib import Path
freed_bytes = 0
cutoff = time.time() - 86400 # 24 hours ago
try:
tmp = Path("/tmp")
for item in tmp.iterdir():
try:
stat = item.stat()
if stat.st_mtime >= cutoff:
continue
if item.is_file():
freed_bytes += stat.st_size
item.unlink(missing_ok=True)
elif item.is_dir():
dir_size = sum(
f.stat().st_size
for f in item.rglob("*")
if f.is_file()
)
freed_bytes += dir_size
shutil.rmtree(str(item), ignore_errors=True)
except (PermissionError, OSError):
pass # Skip files we can't touch
except Exception as exc:
logger.warning("Temp cleanup error: %s", exc)
freed_gb = freed_bytes / (1024**3)
if freed_gb > 0.001:
logger.info("Hermes disk cleanup: freed %.2fGB from /tmp", freed_gb)
return freed_gb
# ── Ollama ───────────────────────────────────────────────────────────────
async def _check_ollama(self) -> CheckResult:
"""Check Ollama status and loaded models."""
try:
status = await asyncio.to_thread(self._get_ollama_status)
if not status.get("reachable"):
restarted = await self._restart_ollama()
if restarted:
return CheckResult(
name="ollama",
level=HealthLevel.WARNING,
message="Ollama was unreachable — restart initiated",
details={"restart_attempted": True},
auto_resolved=True,
)
return CheckResult(
name="ollama",
level=HealthLevel.CRITICAL,
message="Ollama unreachable and restart failed",
details={"reachable": False},
needs_human=True,
)
models = status.get("models", [])
loaded = status.get("loaded_models", [])
return CheckResult(
name="ollama",
level=HealthLevel.OK,
message=(
f"Ollama OK — {len(models)} model(s) available, "
f"{len(loaded)} loaded"
),
details={
"reachable": True,
"model_count": len(models),
"loaded_count": len(loaded),
"loaded_models": [m.get("name", "") for m in loaded],
},
)
except Exception as exc:
logger.warning("Ollama check failed: %s", exc)
return CheckResult(
name="ollama",
level=HealthLevel.UNKNOWN,
message=f"Ollama check failed: {exc}",
)
def _get_ollama_status(self) -> dict[str, Any]:
"""Synchronous Ollama status — checks /api/tags and /api/ps."""
url = settings.normalized_ollama_url
try:
req = urllib.request.Request(
f"{url}/api/tags",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
data = json.loads(resp.read().decode())
models = data.get("models", [])
except Exception:
return {"reachable": False, "models": [], "loaded_models": []}
# /api/ps lists currently loaded (in-memory) models — Ollama >=0.2
loaded: list[dict] = []
try:
req = urllib.request.Request(
f"{url}/api/ps",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
ps_data = json.loads(resp.read().decode())
loaded = ps_data.get("models", [])
except Exception:
pass # /api/ps absent on older Ollama — non-fatal
return {"reachable": True, "models": models, "loaded_models": loaded}
async def _unload_ollama_models(self) -> int:
"""Unload in-memory Ollama models to free unified memory.
Uses the keep_alive=0 trick: POSTing to /api/generate with
keep_alive=0 causes Ollama to immediately evict the model.
Returns the number of models successfully unloaded.
"""
return await asyncio.to_thread(self._unload_ollama_models_sync)
def _unload_ollama_models_sync(self) -> int:
"""Synchronous model unload implementation."""
url = settings.normalized_ollama_url
unloaded = 0
try:
req = urllib.request.Request(
f"{url}/api/ps",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.OLLAMA_REQUEST_TIMEOUT) as resp:
ps_data = json.loads(resp.read().decode())
loaded = ps_data.get("models", [])
except Exception:
return 0
for model in loaded:
name = model.get("name", "")
if not name:
continue
try:
payload = json.dumps({"model": name, "keep_alive": 0}).encode()
req = urllib.request.Request(
f"{url}/api/generate",
data=payload,
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=10) as _:
pass
logger.info("Hermes: unloaded Ollama model %s", name)
unloaded += 1
except Exception as exc:
logger.warning("Hermes: failed to unload model %s: %s", name, exc)
return unloaded
async def _restart_ollama(self) -> bool:
"""Attempt to restart the Ollama service via launchctl or brew."""
return await asyncio.to_thread(self._restart_ollama_sync)
def _restart_ollama_sync(self) -> bool:
"""Try launchctl first, then brew services."""
# macOS launchctl (installed via official Ollama installer)
try:
result = subprocess.run(
["launchctl", "stop", "com.ollama.ollama"],
capture_output=True,
timeout=10,
)
if result.returncode == 0:
time.sleep(2)
subprocess.run(
["launchctl", "start", "com.ollama.ollama"],
capture_output=True,
timeout=10,
)
logger.info("Hermes: Ollama restarted via launchctl")
return True
except Exception:
pass
# Homebrew fallback
try:
result = subprocess.run(
["brew", "services", "restart", "ollama"],
capture_output=True,
timeout=20,
)
if result.returncode == 0:
logger.info("Hermes: Ollama restarted via brew services")
return True
except Exception:
pass
logger.warning("Hermes: Ollama restart failed — manual intervention needed")
return False
# ── Processes ────────────────────────────────────────────────────────────
async def _check_processes(self) -> CheckResult:
"""Check for zombie processes via ps aux."""
try:
result = await asyncio.to_thread(self._get_zombie_processes)
zombies = result.get("zombies", [])
if zombies:
return CheckResult(
name="processes",
level=HealthLevel.WARNING,
message=f"Found {len(zombies)} zombie process(es)",
details={"zombies": zombies[:5]},
needs_human=len(zombies) > 3,
)
return CheckResult(
name="processes",
level=HealthLevel.OK,
message="Processes OK — no zombies detected",
details={"zombie_count": 0},
)
except Exception as exc:
logger.warning("Process check failed: %s", exc)
return CheckResult(
name="processes",
level=HealthLevel.UNKNOWN,
message=f"Process check unavailable: {exc}",
)
def _get_zombie_processes(self) -> dict[str, Any]:
"""Detect zombie processes (state 'Z') via ps aux."""
result = subprocess.run(
["ps", "aux"],
capture_output=True,
text=True,
timeout=5,
)
zombies = []
for line in result.stdout.splitlines()[1:]: # Skip header row
parts = line.split(None, 10)
if len(parts) >= 8 and parts[7] == "Z":
zombies.append(
{
"pid": parts[1],
"command": parts[10][:80] if len(parts) > 10 else "",
}
)
return {"zombies": zombies}
# ── Network ──────────────────────────────────────────────────────────────
async def _check_network(self) -> CheckResult:
"""Check Gitea connectivity."""
try:
result = await asyncio.to_thread(self._check_gitea_connectivity)
reachable = result.get("reachable", False)
latency_ms = result.get("latency_ms", -1.0)
if not reachable:
return CheckResult(
name="network",
level=HealthLevel.WARNING,
message=f"Gitea unreachable: {result.get('error', 'unknown')}",
details=result,
needs_human=True,
)
return CheckResult(
name="network",
level=HealthLevel.OK,
message=f"Network OK — Gitea reachable ({latency_ms:.0f}ms)",
details=result,
)
except Exception as exc:
logger.warning("Network check failed: %s", exc)
return CheckResult(
name="network",
level=HealthLevel.UNKNOWN,
message=f"Network check unavailable: {exc}",
)
def _check_gitea_connectivity(self) -> dict[str, Any]:
"""Synchronous Gitea reachability check."""
url = settings.gitea_url
start = time.monotonic()
try:
req = urllib.request.Request(
f"{url}/api/v1/version",
method="GET",
headers={"Accept": "application/json"},
)
with urllib.request.urlopen(req, timeout=self.NETWORK_REQUEST_TIMEOUT) as resp:
latency_ms = (time.monotonic() - start) * 1000
return {
"reachable": resp.status == 200,
"latency_ms": round(latency_ms, 1),
"url": url,
}
except Exception as exc:
return {
"reachable": False,
"error": str(exc),
"url": url,
"latency_ms": -1.0,
}
# ── Alerts ───────────────────────────────────────────────────────────────
async def _handle_alerts(self, report: HealthReport) -> None:
"""Send push notifications for issues that need attention."""
try:
from infrastructure.notifications.push import notifier
except Exception:
return
for check in report.checks:
if check.level == HealthLevel.CRITICAL or check.needs_human:
notifier.notify(
title=f"Hermes Alert: {check.name}",
message=check.message,
category="system",
native=check.level == HealthLevel.CRITICAL,
)
elif check.level == HealthLevel.WARNING and check.auto_resolved:
notifier.notify(
title=f"Hermes: {check.name} auto-fixed",
message=check.message,
category="system",
)
# Module-level singleton
hermes_monitor = HermesMonitor()

View File

@@ -114,7 +114,7 @@ class Provider:
type: str # ollama, openai, anthropic
enabled: bool
priority: int
tier: str | None = None # e.g., "local", "standard_cloud", "frontier"
tier: str | None = None # e.g., "local", "standard_cloud", "frontier"
url: str | None = None
api_key: str | None = None
base_url: str | None = None
@@ -573,7 +573,6 @@ class CascadeRouter:
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
if not self._is_provider_available(provider):
continue

View File

@@ -21,6 +21,7 @@ from agno.models.ollama import Ollama
from config import settings
from infrastructure.events.bus import Event, EventBus
from timmy.agents.emotional_state import EmotionalStateTracker
try:
from mcp.registry import tool_registry
@@ -42,6 +43,7 @@ class BaseAgent(ABC):
tools: list[str] | None = None,
model: str | None = None,
max_history: int = 10,
initial_emotion: str = "calm",
) -> None:
self.agent_id = agent_id
self.name = name
@@ -54,6 +56,9 @@ class BaseAgent(ABC):
self.system_prompt = system_prompt
self.agent = self._create_agent(system_prompt)
# Emotional state tracker
self.emotional_state = EmotionalStateTracker(initial_emotion=initial_emotion)
# Event bus for communication
self.event_bus: EventBus | None = None
@@ -137,7 +142,14 @@ class BaseAgent(ABC):
ReadTimeout — these are transient and retried with exponential
backoff (#70).
"""
response = await self._run_with_retries(message, max_retries)
self.emotional_state.process_event("task_assigned")
self._apply_emotional_prompt()
try:
response = await self._run_with_retries(message, max_retries)
except Exception:
self.emotional_state.process_event("task_failure")
raise
self.emotional_state.process_event("task_success")
await self._emit_response_event(message, response)
return response
@@ -206,6 +218,14 @@ class BaseAgent(ABC):
)
)
def _apply_emotional_prompt(self) -> None:
"""Inject the current emotional modifier into the agent's description."""
modifier = self.emotional_state.get_prompt_modifier()
if modifier:
self.agent.description = f"{self.system_prompt}\n\n[Emotional State: {modifier}]"
else:
self.agent.description = self.system_prompt
def get_capabilities(self) -> list[str]:
"""Get list of capabilities this agent provides."""
return self.tools
@@ -219,6 +239,7 @@ class BaseAgent(ABC):
"model": self.model,
"status": "ready",
"tools": self.tools,
"emotional_profile": self.emotional_state.get_profile(),
}
@@ -239,6 +260,7 @@ class SubAgent(BaseAgent):
tools: list[str] | None = None,
model: str | None = None,
max_history: int = 10,
initial_emotion: str = "calm",
) -> None:
super().__init__(
agent_id=agent_id,
@@ -248,6 +270,7 @@ class SubAgent(BaseAgent):
tools=tools,
model=model,
max_history=max_history,
initial_emotion=initial_emotion,
)
async def execute_task(self, task_id: str, description: str, context: dict) -> Any:

View File

@@ -0,0 +1,224 @@
"""Agent emotional state simulation.
Tracks per-agent emotional states that influence narration and decision-making
style. Emotional state is influenced by events (task outcomes, errors, etc.)
and exposed via ``get_profile()`` for the dashboard.
Usage:
from timmy.agents.emotional_state import EmotionalStateTracker
tracker = EmotionalStateTracker()
tracker.process_event("task_success", {"description": "Deployed fix"})
profile = tracker.get_profile()
"""
import logging
import time
from dataclasses import asdict, dataclass, field
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Emotional states
# ---------------------------------------------------------------------------
EMOTIONAL_STATES = (
"cautious",
"adventurous",
"analytical",
"frustrated",
"confident",
"curious",
"calm",
)
# Prompt modifiers per emotional state — injected into system prompts
EMOTION_PROMPT_MODIFIERS: dict[str, str] = {
"cautious": (
"You are feeling cautious. Prefer safe, well-tested approaches. "
"Flag risks early. Double-check assumptions before acting."
),
"adventurous": (
"You are feeling adventurous. Be bold and creative in your suggestions. "
"Explore unconventional solutions. Take initiative."
),
"analytical": (
"You are feeling analytical. Break problems down methodically. "
"Rely on data and evidence. Present structured reasoning."
),
"frustrated": (
"You are feeling frustrated. Be brief and direct. "
"Focus on unblocking the immediate problem. Avoid tangents."
),
"confident": (
"You are feeling confident. Speak with authority. "
"Make clear recommendations. Move decisively."
),
"curious": (
"You are feeling curious. Ask clarifying questions. "
"Explore multiple angles. Show genuine interest in the problem."
),
"calm": (
"You are feeling calm and steady. Respond thoughtfully. "
"Maintain composure. Prioritise clarity over speed."
),
}
# ---------------------------------------------------------------------------
# Event → emotion transition rules
# ---------------------------------------------------------------------------
# Maps event types to the emotional state they trigger and an intensity (0-1).
# Higher intensity means the event has a stronger effect on the mood.
EVENT_TRANSITIONS: dict[str, tuple[str, float]] = {
"task_success": ("confident", 0.6),
"task_failure": ("frustrated", 0.7),
"task_assigned": ("analytical", 0.4),
"error": ("cautious", 0.6),
"health_low": ("cautious", 0.8),
"health_recovered": ("calm", 0.5),
"quest_completed": ("adventurous", 0.7),
"new_discovery": ("curious", 0.6),
"complex_problem": ("analytical", 0.5),
"repeated_failure": ("frustrated", 0.9),
"idle": ("calm", 0.3),
"user_praise": ("confident", 0.5),
"user_correction": ("cautious", 0.5),
}
# Emotional state decay — how quickly emotions return to calm (seconds)
_DECAY_INTERVAL = 300 # 5 minutes
@dataclass
class EmotionalState:
"""Snapshot of an agent's emotional state."""
current_emotion: str = "calm"
intensity: float = 0.5 # 0.0 (barely noticeable) to 1.0 (overwhelming)
previous_emotion: str = "calm"
trigger_event: str = "" # What caused the current emotion
updated_at: float = field(default_factory=time.time)
def to_dict(self) -> dict:
"""Serialise for API / dashboard consumption."""
d = asdict(self)
d["emotion_label"] = self.current_emotion.replace("_", " ").title()
return d
class EmotionalStateTracker:
"""Per-agent emotional state tracker.
Each agent instance owns one tracker. The tracker processes events,
applies transition rules, and decays emotion intensity over time.
"""
def __init__(self, initial_emotion: str = "calm") -> None:
if initial_emotion not in EMOTIONAL_STATES:
initial_emotion = "calm"
self.state = EmotionalState(current_emotion=initial_emotion)
def process_event(self, event_type: str, context: dict | None = None) -> EmotionalState:
"""Update emotional state based on an event.
Args:
event_type: One of the keys in EVENT_TRANSITIONS, or a custom
event type (unknown events are ignored).
context: Optional dict with event details (for logging).
Returns:
The updated EmotionalState.
"""
transition = EVENT_TRANSITIONS.get(event_type)
if transition is None:
logger.debug("Unknown emotional event: %s (ignored)", event_type)
return self.state
new_emotion, raw_intensity = transition
# Blend with current intensity — repeated same-emotion events amplify
if new_emotion == self.state.current_emotion:
blended = min(1.0, self.state.intensity + raw_intensity * 0.3)
else:
blended = raw_intensity
self.state.previous_emotion = self.state.current_emotion
self.state.current_emotion = new_emotion
self.state.intensity = round(blended, 2)
self.state.trigger_event = event_type
self.state.updated_at = time.time()
logger.debug(
"Emotional transition: %s%s (intensity=%.2f, trigger=%s)",
self.state.previous_emotion,
new_emotion,
blended,
event_type,
)
return self.state
def decay(self) -> EmotionalState:
"""Apply time-based decay toward calm.
Called periodically (e.g. from a background loop). If enough time
has passed since the last update, intensity decreases and eventually
the emotion resets to calm.
"""
elapsed = time.time() - self.state.updated_at
if elapsed < _DECAY_INTERVAL:
return self.state
# Reduce intensity by 0.1 per decay interval
decay_steps = int(elapsed / _DECAY_INTERVAL)
new_intensity = max(0.0, self.state.intensity - 0.1 * decay_steps)
if new_intensity <= 0.1:
# Emotion has decayed — return to calm
self.state.previous_emotion = self.state.current_emotion
self.state.current_emotion = "calm"
self.state.intensity = 0.5
self.state.trigger_event = "decay"
else:
self.state.intensity = round(new_intensity, 2)
self.state.updated_at = time.time()
return self.state
def get_profile(self) -> dict:
"""Return the full emotional profile for dashboard display."""
self.decay() # Apply any pending decay
return {
"current_emotion": self.state.current_emotion,
"emotion_label": self.state.current_emotion.replace("_", " ").title(),
"intensity": self.state.intensity,
"intensity_label": _intensity_label(self.state.intensity),
"previous_emotion": self.state.previous_emotion,
"trigger_event": self.state.trigger_event,
"prompt_modifier": EMOTION_PROMPT_MODIFIERS.get(
self.state.current_emotion, ""
),
}
def get_prompt_modifier(self) -> str:
"""Return the prompt modifier string for the current emotion."""
self.decay()
return EMOTION_PROMPT_MODIFIERS.get(self.state.current_emotion, "")
def reset(self) -> None:
"""Reset to calm baseline."""
self.state = EmotionalState()
def _intensity_label(intensity: float) -> str:
"""Human-readable label for intensity value."""
if intensity >= 0.8:
return "overwhelming"
if intensity >= 0.6:
return "strong"
if intensity >= 0.4:
return "moderate"
if intensity >= 0.2:
return "mild"
return "faint"

View File

@@ -119,6 +119,8 @@ def load_agents(force_reload: bool = False) -> dict[str, Any]:
max_history = agent_cfg.get("max_history", defaults.get("max_history", 10))
tools = agent_cfg.get("tools", defaults.get("tools", []))
initial_emotion = agent_cfg.get("initial_emotion", "calm")
agent = SubAgent(
agent_id=agent_id,
name=agent_cfg.get("name", agent_id.title()),
@@ -127,6 +129,7 @@ def load_agents(force_reload: bool = False) -> dict[str, Any]:
tools=tools,
model=model,
max_history=max_history,
initial_emotion=initial_emotion,
)
_agents[agent_id] = agent

View File

@@ -142,18 +142,8 @@ def _build_shell_tool() -> MCPToolDef | None:
return None
def _build_gitea_tools() -> list[MCPToolDef]:
"""Build Gitea MCP tool definitions for direct Ollama bridge use.
These tools call the Gitea REST API directly via httpx rather than
spawning an MCP server subprocess, keeping the bridge lightweight.
"""
if not settings.gitea_enabled or not settings.gitea_token:
return []
base_url = settings.gitea_url
token = settings.gitea_token
owner, repo = settings.gitea_repo.split("/", 1)
def _build_list_issues_tool(base_url: str, token: str, owner: str, repo: str) -> MCPToolDef:
"""Build the list_issues tool for a specific Gitea repo."""
async def _list_issues(**kwargs: Any) -> str:
state = kwargs.get("state", "open")
@@ -178,6 +168,30 @@ def _build_gitea_tools() -> list[MCPToolDef]:
except Exception as exc:
return f"Error listing issues: {exc}"
return MCPToolDef(
name="list_issues",
description="List issues in the Gitea repository. Returns issue numbers and titles.",
parameters={
"type": "object",
"properties": {
"state": {
"type": "string",
"description": "Filter by state: open, closed, or all (default: open)",
},
"limit": {
"type": "integer",
"description": "Maximum number of issues to return (default: 10)",
},
},
"required": [],
},
handler=_list_issues,
)
def _build_create_issue_tool(base_url: str, token: str, owner: str, repo: str) -> MCPToolDef:
"""Build the create_issue tool for a specific Gitea repo."""
async def _create_issue(**kwargs: Any) -> str:
title = kwargs.get("title", "")
body = kwargs.get("body", "")
@@ -199,6 +213,30 @@ def _build_gitea_tools() -> list[MCPToolDef]:
except Exception as exc:
return f"Error creating issue: {exc}"
return MCPToolDef(
name="create_issue",
description="Create a new issue in the Gitea repository.",
parameters={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Issue title (required)",
},
"body": {
"type": "string",
"description": "Issue body in markdown (optional)",
},
},
"required": ["title"],
},
handler=_create_issue,
)
def _build_read_issue_tool(base_url: str, token: str, owner: str, repo: str) -> MCPToolDef:
"""Build the read_issue tool for a specific Gitea repo."""
async def _read_issue(**kwargs: Any) -> str:
number = kwargs.get("number")
if not number:
@@ -224,60 +262,40 @@ def _build_gitea_tools() -> list[MCPToolDef]:
except Exception as exc:
return f"Error reading issue: {exc}"
return MCPToolDef(
name="read_issue",
description="Read details of a specific issue by number.",
parameters={
"type": "object",
"properties": {
"number": {
"type": "integer",
"description": "Issue number to read",
},
},
"required": ["number"],
},
handler=_read_issue,
)
def _build_gitea_tools() -> list[MCPToolDef]:
"""Build Gitea MCP tool definitions for direct Ollama bridge use.
These tools call the Gitea REST API directly via httpx rather than
spawning an MCP server subprocess, keeping the bridge lightweight.
"""
if not settings.gitea_enabled or not settings.gitea_token:
return []
base_url = settings.gitea_url
token = settings.gitea_token
owner, repo = settings.gitea_repo.split("/", 1)
return [
MCPToolDef(
name="list_issues",
description="List issues in the Gitea repository. Returns issue numbers and titles.",
parameters={
"type": "object",
"properties": {
"state": {
"type": "string",
"description": "Filter by state: open, closed, or all (default: open)",
},
"limit": {
"type": "integer",
"description": "Maximum number of issues to return (default: 10)",
},
},
"required": [],
},
handler=_list_issues,
),
MCPToolDef(
name="create_issue",
description="Create a new issue in the Gitea repository.",
parameters={
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Issue title (required)",
},
"body": {
"type": "string",
"description": "Issue body in markdown (optional)",
},
},
"required": ["title"],
},
handler=_create_issue,
),
MCPToolDef(
name="read_issue",
description="Read details of a specific issue by number.",
parameters={
"type": "object",
"properties": {
"number": {
"type": "integer",
"description": "Issue number to read",
},
},
"required": ["number"],
},
handler=_read_issue,
),
_build_list_issues_tool(base_url, token, owner, repo),
_build_create_issue_tool(base_url, token, owner, repo),
_build_read_issue_tool(base_url, token, owner, repo),
]

View File

@@ -13,8 +13,8 @@ from dataclasses import dataclass
import httpx
from config import settings
from timmy.research_tools import get_llm_client, google_web_search
from timmy.research_triage import triage_research_report
from timmy.research_tools import google_web_search, get_llm_client
logger = logging.getLogger(__name__)
@@ -52,10 +52,7 @@ class PaperclipClient:
)
resp.raise_for_status()
tasks = resp.json()
return [
PaperclipTask(id=t["id"], kind=t["kind"], context=t["context"])
for t in tasks
]
return [PaperclipTask(id=t["id"], kind=t["kind"], context=t["context"]) for t in tasks]
async def update_task_status(
self, task_id: str, status: str, result: str | None = None
@@ -98,7 +95,7 @@ class ResearchOrchestrator:
async def run_research_pipeline(self, issue_title: str) -> str:
"""Run the research pipeline."""
search_results = await google_web_search(issue_title)
llm_client = get_llm_client()
response = await llm_client.completion(
f"Summarize the following search results and generate a research report:\\n\\n{search_results}",
@@ -123,7 +120,9 @@ class ResearchOrchestrator:
comment += "Created the following issues:\\n"
for result in triage_results:
if result["gitea_issue"]:
comment += f"- #{result['gitea_issue']['number']}: {result['action_item'].title}\\n"
comment += (
f"- #{result['gitea_issue']['number']}: {result['action_item'].title}\\n"
)
else:
comment += "No new issues were created.\\n"
@@ -172,4 +171,3 @@ async def start_paperclip_poller() -> None:
if settings.paperclip_enabled:
poller = PaperclipPoller()
asyncio.create_task(poller.poll())

View File

@@ -6,7 +6,6 @@ import logging
import os
from typing import Any
from config import settings
from serpapi import GoogleSearch
logger = logging.getLogger(__name__)
@@ -28,6 +27,7 @@ async def google_web_search(query: str) -> str:
def get_llm_client() -> Any:
"""Get an LLM client."""
# This is a placeholder. In a real application, this would return
# a client for an LLM service like OpenAI, Anthropic, or a local
# model.

View File

@@ -0,0 +1,21 @@
"""Sovereignty reporting for Timmy play sessions.
Auto-generates markdown scorecards at session end and commits them to
the Gitea repo for institutional memory.
Refs: #957 (Session Sovereignty Report Generator)
"""
from timmy.sovereignty.session_report import (
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
__all__ = [
"generate_report",
"commit_report",
"generate_and_commit_report",
"mark_session_start",
]

View File

@@ -0,0 +1,442 @@
"""Session Sovereignty Report Generator.
Auto-generates a sovereignty scorecard at the end of each play session
and commits it as a markdown file to the Gitea repo under
``reports/sovereignty/``.
Report contents (per issue #957):
- Session duration + game played
- Total model calls by type (VLM, LLM, TTS, API)
- Total cache/rule hits by type
- New skills crystallized (placeholder — pending skill-tracking impl)
- Sovereignty delta (change from session start → end)
- Cost breakdown (actual API spend)
- Per-layer sovereignty %: perception, decision, narration
- Trend comparison vs previous session
Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop)
"""
import base64
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import httpx
from config import settings
# Optional module-level imports — degrade gracefully if unavailable at import time
try:
from timmy.session_logger import get_session_logger
except Exception: # ImportError or circular import during early startup
get_session_logger = None # type: ignore[assignment]
try:
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
except Exception:
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
get_sovereignty_store = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# Module-level session start time; set by mark_session_start()
_SESSION_START: datetime | None = None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def mark_session_start() -> None:
"""Record the session start wall-clock time.
Call once during application startup so ``generate_report()`` can
compute accurate session durations.
"""
global _SESSION_START
_SESSION_START = datetime.now(UTC)
logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat())
def generate_report(session_id: str = "dashboard") -> str:
"""Render a sovereignty scorecard as a markdown string.
Pulls from:
- ``timmy.session_logger`` — message/tool-call/error counts
- ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost,
graduation phase, and trend data
Args:
session_id: The session identifier (default: "dashboard").
Returns:
Markdown-formatted sovereignty report string.
"""
now = datetime.now(UTC)
session_start = _SESSION_START or now
duration_secs = (now - session_start).total_seconds()
session_data = _gather_session_data()
sov_data = _gather_sovereignty_data()
return _render_markdown(now, session_id, duration_secs, session_data, sov_data)
def commit_report(report_md: str, session_id: str = "dashboard") -> bool:
"""Commit a sovereignty report to the Gitea repo.
Creates or updates ``reports/sovereignty/{date}_{session_id}.md``
via the Gitea Contents API. Degrades gracefully: logs a warning
and returns ``False`` if Gitea is unreachable or misconfigured.
Args:
report_md: Markdown content to commit.
session_id: Session identifier used in the filename.
Returns:
``True`` on success, ``False`` on failure.
"""
if not settings.gitea_enabled:
logger.info("Sovereignty: Gitea disabled — skipping report commit")
return False
if not settings.gitea_token:
logger.warning("Sovereignty: no Gitea token — skipping report commit")
return False
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
file_path = f"reports/sovereignty/{date_str}_{session_id}.md"
url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}"
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
encoded_content = base64.b64encode(report_md.encode()).decode()
commit_message = (
f"report: sovereignty session {session_id} ({date_str})\n\n"
f"Auto-generated by Timmy. Refs #957"
)
payload: dict[str, Any] = {
"message": commit_message,
"content": encoded_content,
}
try:
with httpx.Client(timeout=10.0) as client:
# Fetch existing file SHA so we can update rather than create
check = client.get(url, headers=headers)
if check.status_code == 200:
existing = check.json()
payload["sha"] = existing.get("sha", "")
resp = client.put(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("Sovereignty: report committed to %s", file_path)
return True
except httpx.HTTPStatusError as exc:
logger.warning(
"Sovereignty: commit failed (HTTP %s): %s",
exc.response.status_code,
exc,
)
return False
except Exception as exc:
logger.warning("Sovereignty: commit failed: %s", exc)
return False
async def generate_and_commit_report(session_id: str = "dashboard") -> bool:
"""Generate and commit a sovereignty report for the current session.
Primary entry point — call at session end / application shutdown.
Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread``
so it does not block the event loop.
Args:
session_id: The session identifier.
Returns:
``True`` if the report was generated and committed successfully.
"""
import asyncio
try:
report_md = generate_report(session_id)
logger.info("Sovereignty: report generated (%d chars)", len(report_md))
committed = await asyncio.to_thread(commit_report, report_md, session_id)
return committed
except Exception as exc:
logger.warning("Sovereignty: report generation failed: %s", exc)
return False
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string."""
total = int(seconds)
hours, remainder = divmod(total, 3600)
minutes, secs = divmod(remainder, 60)
if hours:
return f"{hours}h {minutes}m {secs}s"
if minutes:
return f"{minutes}m {secs}s"
return f"{secs}s"
def _gather_session_data() -> dict[str, Any]:
"""Pull session statistics from the session logger.
Returns a dict with:
- ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors``
- ``tool_call_breakdown``: dict[tool_name, count]
"""
default: dict[str, Any] = {
"user_messages": 0,
"timmy_messages": 0,
"tool_calls": 0,
"errors": 0,
"tool_call_breakdown": {},
}
try:
if get_session_logger is None:
return default
sl = get_session_logger()
sl.flush()
# Read today's session file directly for accurate counts
if not sl.session_file.exists():
return default
entries: list[dict] = []
with open(sl.session_file) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
tool_breakdown: dict[str, int] = {}
user_msgs = timmy_msgs = tool_calls = errors = 0
for entry in entries:
etype = entry.get("type")
if etype == "message":
if entry.get("role") == "user":
user_msgs += 1
elif entry.get("role") == "timmy":
timmy_msgs += 1
elif etype == "tool_call":
tool_calls += 1
tool_name = entry.get("tool", "unknown")
tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1
elif etype == "error":
errors += 1
return {
"user_messages": user_msgs,
"timmy_messages": timmy_msgs,
"tool_calls": tool_calls,
"errors": errors,
"tool_call_breakdown": tool_breakdown,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather session data: %s", exc)
return default
def _gather_sovereignty_data() -> dict[str, Any]:
"""Pull sovereignty metrics from the SQLite store.
Returns a dict with:
- ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()``
- ``deltas``: per-metric start/end values within recent history window
- ``previous_session``: most recent prior value for each metric
"""
try:
if get_sovereignty_store is None:
return {"metrics": {}, "deltas": {}, "previous_session": {}}
store = get_sovereignty_store()
summary = store.get_summary()
deltas: dict[str, dict[str, Any]] = {}
previous_session: dict[str, float | None] = {}
for metric_type in GRADUATION_TARGETS:
history = store.get_latest(metric_type, limit=10)
if len(history) >= 2:
deltas[metric_type] = {
"start": history[-1]["value"],
"end": history[0]["value"],
}
previous_session[metric_type] = history[1]["value"]
elif len(history) == 1:
deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]}
previous_session[metric_type] = None
else:
deltas[metric_type] = {"start": None, "end": None}
previous_session[metric_type] = None
return {
"metrics": summary,
"deltas": deltas,
"previous_session": previous_session,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc)
return {"metrics": {}, "deltas": {}, "previous_session": {}}
def _render_markdown(
now: datetime,
session_id: str,
duration_secs: float,
session_data: dict[str, Any],
sov_data: dict[str, Any],
) -> str:
"""Assemble the full sovereignty report in markdown."""
lines: list[str] = []
# Header
lines += [
"# Sovereignty Session Report",
"",
f"**Session ID:** `{session_id}` ",
f"**Date:** {now.strftime('%Y-%m-%d')} ",
f"**Duration:** {_format_duration(duration_secs)} ",
f"**Generated:** {now.isoformat()}",
"",
"---",
"",
]
# Session activity
lines += [
"## Session Activity",
"",
"| Metric | Count |",
"|--------|-------|",
f"| User messages | {session_data['user_messages']} |",
f"| Timmy responses | {session_data['timmy_messages']} |",
f"| Tool calls | {session_data['tool_calls']} |",
f"| Errors | {session_data['errors']} |",
"",
]
tool_breakdown = session_data.get("tool_call_breakdown", {})
if tool_breakdown:
lines += ["### Model Calls by Tool", ""]
for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
lines.append(f"- `{tool_name}`: {count}")
lines.append("")
# Sovereignty scorecard
lines += [
"## Sovereignty Scorecard",
"",
"| Metric | Current | Target (graduation) | Phase |",
"|--------|---------|---------------------|-------|",
]
for metric_type, data in sov_data["metrics"].items():
current = data.get("current")
current_str = f"{current:.4f}" if current is not None else "N/A"
grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation")
grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A"
phase = data.get("phase", "unknown")
lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |")
lines += ["", "### Sovereignty Delta (This Session)", ""]
for metric_type, delta_info in sov_data.get("deltas", {}).items():
start_val = delta_info.get("start")
end_val = delta_info.get("end")
if start_val is not None and end_val is not None:
diff = end_val - start_val
sign = "+" if diff >= 0 else ""
lines.append(
f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})"
)
else:
lines.append(f"- **{metric_type}**: N/A (no data recorded)")
# Cost breakdown
lines += ["", "## Cost Breakdown", ""]
api_cost_data = sov_data["metrics"].get("api_cost", {})
current_cost = api_cost_data.get("current")
if current_cost is not None:
lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}")
else:
lines.append("- **Total API spend:** N/A (no data recorded)")
lines.append("")
# Per-layer sovereignty
lines += [
"## Per-Layer Sovereignty",
"",
"| Layer | Sovereignty % |",
"|-------|--------------|",
"| Perception (VLM) | N/A |",
"| Decision (LLM) | N/A |",
"| Narration (TTS) | N/A |",
"",
"> Per-layer tracking requires instrumented inference calls. See #957.",
"",
]
# Skills crystallized
lines += [
"## Skills Crystallized",
"",
"_Skill crystallization tracking not yet implemented. See #957._",
"",
]
# Trend vs previous session
lines += ["## Trend vs Previous Session", ""]
prev_data = sov_data.get("previous_session", {})
has_prev = any(v is not None for v in prev_data.values())
if has_prev:
lines += [
"| Metric | Previous | Current | Change |",
"|--------|----------|---------|--------|",
]
for metric_type, curr_info in sov_data["metrics"].items():
curr_val = curr_info.get("current")
prev_val = prev_data.get(metric_type)
curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A"
prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A"
if curr_val is not None and prev_val is not None:
diff = curr_val - prev_val
sign = "+" if diff >= 0 else ""
change_str = f"{sign}{diff:.4f}"
else:
change_str = "N/A"
lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |")
lines.append("")
else:
lines += ["_No previous session data available for comparison._", ""]
# Footer
lines += [
"---",
"_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_",
]
return "\n".join(lines)

View File

@@ -0,0 +1,21 @@
"""Vassal Protocol — Timmy as autonomous orchestrator.
Timmy is Alex's vassal: the lead decision-maker for development direction,
agent management, and house health. He observes the Gitea backlog, decides
priorities, dispatches work to agents (Claude, Kimi, self), monitors output,
and keeps Hermes (M3 Max) running well.
Public API
----------
from timmy.vassal import vassal_orchestrator
await vassal_orchestrator.run_cycle()
snapshot = vassal_orchestrator.get_status()
"""
from timmy.vassal.orchestration_loop import VassalOrchestrator
# Module-level singleton — import and use directly.
vassal_orchestrator = VassalOrchestrator()
__all__ = ["VassalOrchestrator", "vassal_orchestrator"]

View File

@@ -0,0 +1,296 @@
"""Vassal Protocol — agent health monitoring.
Monitors whether downstream agents (Claude, Kimi) are making progress on
their assigned issues. Detects idle and stuck agents by querying Gitea
for issues with dispatch labels and checking last-comment timestamps.
Stuck agent heuristic
---------------------
An agent is considered "stuck" on an issue if:
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
- No new comment has appeared in the last ``stuck_threshold_minutes``
- The issue has not been closed
Idle agent heuristic
--------------------
An agent is "idle" if it has no currently assigned (labeled) open issues.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_AGENT_LABELS = {
"claude": "claude-ready",
"kimi": "kimi-ready",
}
_DEFAULT_STUCK_MINUTES = 120
_DEFAULT_IDLE_THRESHOLD = 30
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def is_stuck(self) -> bool:
return bool(self.stuck_issue_numbers)
@property
def needs_reassignment(self) -> bool:
return self.is_stuck
@dataclass
class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def any_stuck(self) -> bool:
return any(a.is_stuck for a in self.agents)
@property
def all_idle(self) -> bool:
return all(a.is_idle for a in self.agents)
def for_agent(self, name: str) -> AgentStatus | None:
for a in self.agents:
if a.agent == name:
return a
return None
# ---------------------------------------------------------------------------
# Gitea queries
# ---------------------------------------------------------------------------
async def _fetch_labeled_issues(
client: Any,
base_url: str,
headers: dict,
repo: str,
label: str,
) -> list[dict]:
"""Return open issues carrying a specific label."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "labels": label, "limit": 50},
)
if resp.status_code == 200:
return [i for i in resp.json() if not i.get("pull_request")]
except Exception as exc:
logger.warning("_fetch_labeled_issues: %s%s", label, exc)
return []
async def _last_comment_time(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
) -> datetime | None:
"""Return the timestamp of the most recent comment on an issue."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
params={"limit": 1},
)
if resp.status_code == 200:
comments = resp.json()
if comments:
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
if ts:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception as exc:
logger.debug("_last_comment_time: issue #%d%s", issue_number, exc)
return None
async def _issue_created_time(issue: dict) -> datetime | None:
ts = issue.get("created_at")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
async def check_agent_health(
agent_name: str,
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentStatus:
"""Query Gitea for issues assigned to *agent_name* and assess health.
Args:
agent_name: One of "claude", "kimi".
stuck_threshold_minutes: Minutes of silence before an issue is
considered stuck.
Returns:
AgentStatus for this agent.
"""
status = AgentStatus(agent=agent_name)
label = _AGENT_LABELS.get(agent_name)
if not label:
logger.debug("check_agent_health: unknown agent %s", agent_name)
return status
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("check_agent_health: missing dependency — %s", exc)
return status
if not settings.gitea_enabled or not settings.gitea_token:
return status
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(
client, base_url, headers, repo, label
)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(
client, base_url, headers, repo, num
)
if last_activity is None:
last_activity = await _issue_created_time(issue)
if last_activity is not None and last_activity < cutoff:
status.stuck_issue_numbers.append(num)
logger.info(
"check_agent_health: %s issue #%d stuck since %s",
agent_name,
num,
last_activity.isoformat(),
)
except Exception as exc:
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
status.is_idle = len(status.active_issue_numbers) == 0
return status
async def get_full_health_report(
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentHealthReport:
"""Run health checks for all monitored agents and return combined report.
Args:
stuck_threshold_minutes: Passed through to each agent check.
Returns:
AgentHealthReport with status for Claude and Kimi.
"""
import asyncio
claude_status, kimi_status = await asyncio.gather(
check_agent_health("claude", stuck_threshold_minutes),
check_agent_health("kimi", stuck_threshold_minutes),
)
return AgentHealthReport(agents=[claude_status, kimi_status])
async def nudge_stuck_agent(
agent_name: str,
issue_number: int,
) -> bool:
"""Post a nudge comment on a stuck issue to prompt the agent.
Args:
agent_name: The agent that appears stuck.
issue_number: The Gitea issue number to nudge.
Returns:
True if the comment was posted successfully.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
return False
if not settings.gitea_enabled or not settings.gitea_token:
return False
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
"Please post a status update or close if complete."
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
logger.info(
"nudge_stuck_agent: nudged %s on issue #%d",
agent_name,
issue_number,
)
return True
except Exception as exc:
logger.warning("nudge_stuck_agent: failed — %s", exc)
return False

281
src/timmy/vassal/backlog.py Normal file
View File

@@ -0,0 +1,281 @@
"""Vassal Protocol — Gitea backlog triage.
Fetches open issues from Gitea, scores each one for priority and agent
suitability, and returns a ranked list ready for dispatch.
Complexity scoring heuristics
------------------------------
high_complexity_keywords → route to Claude (architecture, refactor, review)
research_keywords → route to Kimi (survey, analysis, benchmark)
routine_keywords → route to Timmy/self (docs, chore, config)
otherwise → Timmy self-handles
Priority scoring
----------------
URGENT label → 100
HIGH / critical → 75
NORMAL (default) → 50
LOW / chore → 25
Already assigned → deprioritized (subtract 20)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Labels that hint at complexity level / agent suitability
_HIGH_COMPLEXITY = frozenset(
{
"architecture",
"refactor",
"code review",
"security",
"performance",
"breaking change",
"design",
"complex",
}
)
_RESEARCH_KEYWORDS = frozenset(
{
"research",
"survey",
"analysis",
"benchmark",
"comparative",
"investigation",
"deep dive",
"review",
}
)
_ROUTINE_KEYWORDS = frozenset(
{
"docs",
"documentation",
"chore",
"config",
"typo",
"rename",
"cleanup",
"trivial",
"style",
}
)
_PRIORITY_LABEL_SCORES: dict[str, int] = {
"urgent": 100,
"critical": 90,
"high": 75,
"normal": 50,
"low": 25,
"chore": 20,
}
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class AgentTarget(StrEnum):
"""Which agent should handle this issue."""
TIMMY = "timmy" # Timmy handles locally (self)
CLAUDE = "claude" # Dispatch to Claude Code
KIMI = "kimi" # Dispatch to Kimi Code
@dataclass
class TriagedIssue:
"""A Gitea issue enriched with triage metadata."""
number: int
title: str
body: str
labels: list[str] = field(default_factory=list)
assignees: list[str] = field(default_factory=list)
priority_score: int = 50
agent_target: AgentTarget = AgentTarget.TIMMY
rationale: str = ""
url: str = ""
raw: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _extract_labels(issue: dict[str, Any]) -> list[str]:
"""Return normalised label names from a raw Gitea issue dict."""
return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])]
def _score_priority(labels: list[str], assignees: list[str]) -> int:
score = _PRIORITY_LABEL_SCORES.get("normal", 50)
for lbl in labels:
for key, val in _PRIORITY_LABEL_SCORES.items():
if key in lbl:
score = max(score, val)
if assignees:
score -= 20 # already assigned — lower urgency for fresh dispatch
return max(0, score)
def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]:
"""Heuristic: pick the best agent and return (target, rationale)."""
combined = f"{title} {body} {' '.join(labels)}".lower()
if any(kw in combined for kw in _HIGH_COMPLEXITY):
return AgentTarget.CLAUDE, "high-complexity keywords detected"
if any(kw in combined for kw in _RESEARCH_KEYWORDS):
return AgentTarget.KIMI, "research keywords detected"
if any(kw in combined for kw in _ROUTINE_KEYWORDS):
return AgentTarget.TIMMY, "routine task — Timmy self-handles"
return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles"
# ---------------------------------------------------------------------------
# Triage
# ---------------------------------------------------------------------------
def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
"""Score and route a list of raw Gitea issue dicts.
Returns a list sorted by priority_score descending (highest first).
Args:
raw_issues: List of issue objects from the Gitea API.
Returns:
Sorted list of TriagedIssue with routing decisions.
"""
results: list[TriagedIssue] = []
for issue in raw_issues:
number = issue.get("number", 0)
title = issue.get("title", "")
body = issue.get("body") or ""
labels = _extract_labels(issue)
assignees = [
a.get("login", "") for a in issue.get("assignees") or []
]
url = issue.get("html_url", "")
priority = _score_priority(labels, assignees)
agent, rationale = _choose_agent(title, body, labels)
results.append(
TriagedIssue(
number=number,
title=title,
body=body,
labels=labels,
assignees=assignees,
priority_score=priority,
agent_target=agent,
rationale=rationale,
url=url,
raw=issue,
)
)
results.sort(key=lambda i: i.priority_score, reverse=True)
logger.debug(
"Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy",
len(results),
sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE),
sum(1 for i in results if i.agent_target == AgentTarget.KIMI),
sum(1 for i in results if i.agent_target == AgentTarget.TIMMY),
)
return results
# ---------------------------------------------------------------------------
# Gitea fetch (async, gracefully degrading)
# ---------------------------------------------------------------------------
async def fetch_open_issues(
limit: int = 50,
exclude_labels: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Fetch open issues from the configured Gitea repo.
Args:
limit: Maximum number of issues to return.
exclude_labels: Labels whose issues should be skipped
(e.g. ``["kimi-ready", "wip"]``).
Returns:
List of raw issue dicts from the Gitea API,
or empty list if Gitea is unavailable.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("fetch_open_issues: missing dependency — %s", exc)
return []
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("fetch_open_issues: Gitea disabled or no token")
return []
exclude = set(lbl.lower() for lbl in (exclude_labels or []))
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
params = {"state": "open", "limit": min(limit, 50), "page": 1}
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params=params,
)
if resp.status_code != 200:
logger.warning(
"fetch_open_issues: Gitea returned %s", resp.status_code
)
return []
issues = resp.json()
# Filter out pull requests and excluded labels
filtered = []
for issue in issues:
if issue.get("pull_request"):
continue # skip PRs
labels = _extract_labels(issue)
if exclude and any(lbl in exclude for lbl in labels):
continue
filtered.append(issue)
logger.info(
"fetch_open_issues: fetched %d/%d issues (after filtering)",
len(filtered),
len(issues),
)
return filtered
except Exception as exc:
logger.warning("fetch_open_issues: Gitea request failed — %s", exc)
return []

View File

@@ -0,0 +1,213 @@
"""Vassal Protocol — agent dispatch.
Translates triage decisions into concrete Gitea actions:
- Add ``claude-ready`` or ``kimi-ready`` label to an issue
- Post a dispatch comment recording the routing rationale
- Record the dispatch in the in-memory registry so the orchestration loop
can track what was sent and when
The dispatch registry is intentionally in-memory (ephemeral). Durable
tracking is out of scope for this module — that belongs in the task queue
or a future orchestration DB.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from timmy.vassal.backlog import AgentTarget, TriagedIssue
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Label names used by the dispatch system
# ---------------------------------------------------------------------------
_LABEL_MAP: dict[AgentTarget, str] = {
AgentTarget.CLAUDE: "claude-ready",
AgentTarget.KIMI: "kimi-ready",
AgentTarget.TIMMY: "timmy-ready",
}
_LABEL_COLORS: dict[str, str] = {
"claude-ready": "#8b6f47", # warm brown
"kimi-ready": "#006b75", # dark teal
"timmy-ready": "#0075ca", # blue
}
# ---------------------------------------------------------------------------
# Dispatch registry
# ---------------------------------------------------------------------------
@dataclass
class DispatchRecord:
"""A record of one issue being dispatched to an agent."""
issue_number: int
issue_title: str
agent: AgentTarget
rationale: str
dispatched_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
label_applied: bool = False
comment_posted: bool = False
# Module-level registry: issue_number → DispatchRecord
_registry: dict[int, DispatchRecord] = {}
def get_dispatch_registry() -> dict[int, DispatchRecord]:
"""Return a copy of the current dispatch registry."""
return dict(_registry)
def clear_dispatch_registry() -> None:
"""Clear the dispatch registry (mainly for tests)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict,
repo: str,
label_name: str,
) -> int | None:
"""Return the Gitea label ID, creating it if necessary."""
labels_url = f"{base_url}/repos/{repo}/labels"
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
return lbl["id"]
except Exception as exc:
logger.warning("_get_or_create_label: list failed — %s", exc)
return None
color = _LABEL_COLORS.get(label_name, "#cccccc")
try:
resp = await client.post(
labels_url,
headers={**headers, "Content-Type": "application/json"},
json={"name": label_name, "color": color},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
except Exception as exc:
logger.warning("_get_or_create_label: create failed — %s", exc)
return None
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
"""Apply dispatch label and post a routing comment on the Gitea issue.
Gracefully degrades: if Gitea is unavailable the record is still
created and returned (with label_applied=False, comment_posted=False).
Args:
issue: A TriagedIssue with a routing decision.
Returns:
DispatchRecord summarising what was done.
"""
record = DispatchRecord(
issue_number=issue.number,
issue_title=issue.title,
agent=issue.agent_target,
rationale=issue.rationale,
)
if issue.agent_target == AgentTarget.TIMMY:
# Self-dispatch: no label needed — Timmy will handle directly.
logger.info(
"dispatch_issue: #%d '%s' → Timmy (self, no label)",
issue.number,
issue.title[:50],
)
_registry[issue.number] = record
return record
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(
client, base_url, headers, repo, label_name
)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(
"dispatch_issue: #%d '%s'%s (label=%s comment=%s)",
issue.number,
issue.title[:50],
issue.agent_target,
record.label_applied,
record.comment_posted,
)
return record

View File

@@ -0,0 +1,222 @@
"""Vassal Protocol — Hermes house health monitoring.
Monitors system resources on the M3 Max (Hermes) and Ollama model state.
Reports warnings when resources are tight and provides cleanup utilities.
All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention.
"""
from __future__ import annotations
import asyncio
import logging
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class DiskUsage:
path: str = "/"
total_gb: float = 0.0
used_gb: float = 0.0
free_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class MemoryUsage:
total_gb: float = 0.0
available_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class OllamaHealth:
reachable: bool = False
loaded_models: list[str] = field(default_factory=list)
error: str = ""
@dataclass
class SystemSnapshot:
"""Point-in-time snapshot of Hermes resource usage."""
disk: DiskUsage = field(default_factory=DiskUsage)
memory: MemoryUsage = field(default_factory=MemoryUsage)
ollama: OllamaHealth = field(default_factory=OllamaHealth)
warnings: list[str] = field(default_factory=list)
taken_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def healthy(self) -> bool:
return len(self.warnings) == 0
# ---------------------------------------------------------------------------
# Resource probes (sync, run in threads)
# ---------------------------------------------------------------------------
def _probe_disk(path: str = "/") -> DiskUsage:
try:
usage = shutil.disk_usage(path)
total_gb = usage.total / 1e9
used_gb = usage.used / 1e9
free_gb = usage.free / 1e9
pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0
return DiskUsage(
path=path,
total_gb=round(total_gb, 2),
used_gb=round(used_gb, 2),
free_gb=round(free_gb, 2),
percent_used=round(pct, 1),
)
except Exception as exc:
logger.debug("_probe_disk: %s", exc)
return DiskUsage(path=path)
def _probe_memory() -> MemoryUsage:
try:
import psutil # optional — gracefully degrade if absent
vm = psutil.virtual_memory()
return MemoryUsage(
total_gb=round(vm.total / 1e9, 2),
available_gb=round(vm.available / 1e9, 2),
percent_used=round(vm.percent, 1),
)
except ImportError:
logger.debug("_probe_memory: psutil not installed — skipping")
return MemoryUsage()
except Exception as exc:
logger.debug("_probe_memory: %s", exc)
return MemoryUsage()
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
"""Synchronous Ollama health probe — run in a thread."""
try:
import urllib.request
import json
url = ollama_url.rstrip("/") + "/api/tags"
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
data = json.loads(resp.read())
models = [m.get("name", "") for m in data.get("models", [])]
return OllamaHealth(reachable=True, loaded_models=models)
except Exception as exc:
return OllamaHealth(reachable=False, error=str(exc)[:120])
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_system_snapshot() -> SystemSnapshot:
"""Collect a non-blocking snapshot of system resources.
Uses asyncio.to_thread() for all blocking I/O per project convention.
Returns:
SystemSnapshot with disk, memory, and Ollama status.
"""
from config import settings
disk, memory, ollama = await asyncio.gather(
asyncio.to_thread(_probe_disk, "/"),
asyncio.to_thread(_probe_memory),
asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url),
)
warnings: list[str] = []
if disk.percent_used >= _WARN_DISK_PCT:
warnings.append(
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
f"({disk.free_gb:.1f} GB free)"
)
if memory.percent_used >= _WARN_MEM_PCT:
warnings.append(
f"Memory: {memory.percent_used:.0f}% used "
f"({memory.available_gb:.1f} GB available)"
)
if not ollama.reachable:
warnings.append(f"Ollama unreachable: {ollama.error}")
if warnings:
logger.warning("House health warnings: %s", "; ".join(warnings))
return SystemSnapshot(
disk=disk,
memory=memory,
ollama=ollama,
warnings=warnings,
)
async def cleanup_stale_files(
temp_dirs: list[str] | None = None,
max_age_days: int = 7,
) -> dict[str, Any]:
"""Remove files older than *max_age_days* from temp directories.
Only removes files under safe temp paths (never project source).
Args:
temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``.
max_age_days: Age threshold in days.
Returns:
Dict with ``deleted_count`` and ``errors``.
"""
import time
dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108
cutoff = time.time() - max_age_days * 86400
deleted = 0
errors: list[str] = []
def _cleanup() -> None:
nonlocal deleted
for d in dirs:
p = Path(d)
if not p.exists():
continue
for f in p.rglob("*"):
if f.is_file():
try:
if f.stat().st_mtime < cutoff:
f.unlink()
deleted += 1
except Exception as exc:
errors.append(str(exc))
await asyncio.to_thread(_cleanup)
logger.info(
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
)
return {"deleted_count": deleted, "errors": errors}

View File

@@ -0,0 +1,321 @@
"""Vassal Protocol — main orchestration loop.
Ties the backlog, dispatch, agent health, and house health modules together
into a single ``VassalOrchestrator`` that can run as a background service.
Each cycle:
1. Fetch open Gitea issues
2. Triage: score priority + route to agent
3. Dispatch: apply labels / post routing comments
4. Check agent health: nudge stuck agents
5. Check house health: log warnings, trigger cleanup if needed
6. Return a VassalCycleRecord summarising the cycle
Usage::
from timmy.vassal import vassal_orchestrator
record = await vassal_orchestrator.run_cycle()
status = vassal_orchestrator.get_status()
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle record
# ---------------------------------------------------------------------------
@dataclass
class VassalCycleRecord:
"""Summary of one orchestration cycle."""
cycle_id: int
started_at: str
finished_at: str = ""
duration_ms: int = 0
issues_fetched: int = 0
issues_dispatched: int = 0
dispatched_to_claude: int = 0
dispatched_to_kimi: int = 0
dispatched_to_timmy: int = 0
stuck_agents: list[str] = field(default_factory=list)
nudges_sent: int = 0
house_warnings: list[str] = field(default_factory=list)
cleanup_deleted: int = 0
errors: list[str] = field(default_factory=list)
@property
def healthy(self) -> bool:
return not self.errors and not self.house_warnings
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class VassalOrchestrator:
"""Timmy's autonomous orchestration engine.
Runs observe → triage → dispatch → monitor → house-check cycles on a
configurable interval.
Parameters
----------
cycle_interval:
Seconds between cycles. Defaults to ``settings.vassal_cycle_interval``
when available, otherwise 300 s (5 min).
max_dispatch_per_cycle:
Cap on new dispatches per cycle to avoid spamming agents.
"""
def __init__(
self,
cycle_interval: float | None = None,
max_dispatch_per_cycle: int = 10,
) -> None:
self._cycle_count = 0
self._running = False
self._task: asyncio.Task | None = None
self._max_dispatch = max_dispatch_per_cycle
self._history: list[VassalCycleRecord] = []
# Resolve interval — lazy to avoid import-time settings read
self._cycle_interval = cycle_interval
# -- public API --------------------------------------------------------
@property
def cycle_count(self) -> int:
return self._cycle_count
@property
def is_running(self) -> bool:
return self._running
@property
def history(self) -> list[VassalCycleRecord]:
return list(self._history)
def get_status(self) -> dict[str, Any]:
"""Return a JSON-serialisable status dict."""
last = self._history[-1] if self._history else None
return {
"running": self._running,
"cycle_count": self._cycle_count,
"last_cycle": {
"cycle_id": last.cycle_id,
"started_at": last.started_at,
"issues_fetched": last.issues_fetched,
"issues_dispatched": last.issues_dispatched,
"stuck_agents": last.stuck_agents,
"house_warnings": last.house_warnings,
"healthy": last.healthy,
}
if last
else None,
}
# -- single cycle ------------------------------------------------------
async def run_cycle(self) -> VassalCycleRecord:
"""Execute one full orchestration cycle.
Gracefully degrades at each step — a failure in one sub-task does
not abort the rest of the cycle.
Returns:
VassalCycleRecord summarising what happened.
"""
self._cycle_count += 1
start = time.monotonic()
record = VassalCycleRecord(
cycle_id=self._cycle_count,
started_at=datetime.now(UTC).isoformat(),
)
# 1 + 2: Fetch & triage
await self._step_backlog(record)
# 3: Agent health
await self._step_agent_health(record)
# 4: House health
await self._step_house_health(record)
# Finalise record
record.finished_at = datetime.now(UTC).isoformat()
record.duration_ms = int((time.monotonic() - start) * 1000)
self._history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
logger.info(
"VassalOrchestrator cycle #%d complete (%d ms): "
"fetched=%d dispatched=%d stuck=%s house_ok=%s",
record.cycle_id,
record.duration_ms,
record.issues_fetched,
record.issues_dispatched,
record.stuck_agents or "none",
not record.house_warnings,
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring orchestration loop as a background task."""
if self._running:
logger.warning("VassalOrchestrator already running")
return
self._running = True
self._task = asyncio.ensure_future(self._loop())
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
logger.info("VassalOrchestrator stop requested")
async def _loop(self) -> None:
interval = self._resolve_interval()
logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval)
while self._running:
try:
await self.run_cycle()
except Exception:
logger.exception("VassalOrchestrator cycle failed")
await asyncio.sleep(interval)
# -- step: backlog -------------------------------------------------------
async def _step_backlog(self, record: VassalCycleRecord) -> None:
from timmy.vassal.backlog import fetch_open_issues, triage_issues
from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry
try:
raw_issues = await fetch_open_issues(
limit=50,
exclude_labels=["wip", "blocked", "needs-info"],
)
record.issues_fetched = len(raw_issues)
if not raw_issues:
return
triaged = triage_issues(raw_issues)
registry = get_dispatch_registry()
dispatched = 0
for issue in triaged:
if dispatched >= self._max_dispatch:
break
# Skip already-dispatched issues
if issue.number in registry:
continue
await dispatch_issue(issue)
dispatched += 1
from timmy.vassal.backlog import AgentTarget
if issue.agent_target == AgentTarget.CLAUDE:
record.dispatched_to_claude += 1
elif issue.agent_target == AgentTarget.KIMI:
record.dispatched_to_kimi += 1
else:
record.dispatched_to_timmy += 1
record.issues_dispatched = dispatched
except Exception as exc:
logger.exception("_step_backlog failed")
record.errors.append(f"backlog: {exc}")
# -- step: agent health -------------------------------------------------
async def _step_agent_health(self, record: VassalCycleRecord) -> None:
from config import settings
from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent
try:
threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120)
report = await get_full_health_report(stuck_threshold_minutes=threshold)
for agent_status in report.agents:
if agent_status.is_stuck:
record.stuck_agents.append(agent_status.agent)
for issue_num in agent_status.stuck_issue_numbers:
ok = await nudge_stuck_agent(agent_status.agent, issue_num)
if ok:
record.nudges_sent += 1
except Exception as exc:
logger.exception("_step_agent_health failed")
record.errors.append(f"agent_health: {exc}")
# -- step: house health -------------------------------------------------
async def _step_house_health(self, record: VassalCycleRecord) -> None:
from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot
try:
snapshot = await get_system_snapshot()
record.house_warnings = snapshot.warnings
# Auto-cleanup temp files when disk is getting tight
if snapshot.disk.percent_used >= 80.0:
result = await cleanup_stale_files(max_age_days=3)
record.cleanup_deleted = result.get("deleted_count", 0)
except Exception as exc:
logger.exception("_step_house_health failed")
record.errors.append(f"house_health: {exc}")
# -- helpers ------------------------------------------------------------
def _resolve_interval(self) -> float:
if self._cycle_interval is not None:
return self._cycle_interval
try:
from config import settings
return float(getattr(settings, "vassal_cycle_interval", 300))
except Exception:
return 300.0
async def _broadcast(self, record: VassalCycleRecord) -> None:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"vassal.cycle",
{
"cycle_id": record.cycle_id,
"started_at": record.started_at,
"issues_fetched": record.issues_fetched,
"issues_dispatched": record.issues_dispatched,
"stuck_agents": record.stuck_agents,
"house_warnings": record.house_warnings,
"duration_ms": record.duration_ms,
"healthy": record.healthy,
},
)
except Exception as exc:
logger.debug("VassalOrchestrator broadcast skipped: %s", exc)

View File

@@ -2547,3 +2547,120 @@
.tower-adv-title { font-size: 0.85rem; font-weight: 600; color: var(--text-bright); }
.tower-adv-detail { font-size: 0.8rem; color: var(--text); margin-top: 2px; }
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
/* ── Voice settings ───────────────────────────────────────── */
.voice-settings-page { max-width: 600px; margin: 0 auto; }
.vs-field { margin-bottom: 1.5rem; }
.vs-label {
display: block;
font-size: 0.75rem;
font-weight: 700;
letter-spacing: 0.1em;
color: var(--text-dim);
margin-bottom: 0.5rem;
}
.vs-value { color: var(--green); font-family: var(--font); }
.vs-slider {
width: 100%;
-webkit-appearance: none;
appearance: none;
height: 4px;
background: var(--border);
border-radius: 2px;
outline: none;
cursor: pointer;
}
.vs-slider::-webkit-slider-thumb {
-webkit-appearance: none;
appearance: none;
width: 18px;
height: 18px;
border-radius: 50%;
background: var(--purple);
cursor: pointer;
box-shadow: 0 0 6px rgba(124, 58, 237, 0.5);
transition: box-shadow 0.2s;
}
.vs-slider::-webkit-slider-thumb:hover { box-shadow: 0 0 12px rgba(124, 58, 237, 0.8); }
.vs-slider::-moz-range-thumb {
width: 18px;
height: 18px;
border-radius: 50%;
background: var(--purple);
cursor: pointer;
border: none;
box-shadow: 0 0 6px rgba(124, 58, 237, 0.5);
}
.vs-range-labels {
display: flex;
justify-content: space-between;
font-size: 0.7rem;
color: var(--text-dim);
margin-top: 0.25rem;
}
.vs-select,
.vs-input {
width: 100%;
padding: 0.5rem 0.75rem;
background: var(--bg-card);
border: 1px solid var(--border);
border-radius: var(--radius-sm);
color: var(--text);
font-family: var(--font);
font-size: 0.9rem;
}
.vs-select { cursor: pointer; }
.vs-select:focus,
.vs-input:focus {
outline: none;
border-color: var(--purple);
box-shadow: 0 0 0 2px rgba(124, 58, 237, 0.2);
}
.vs-unavailable {
font-size: 0.85rem;
color: var(--text-dim);
padding: 0.5rem 0.75rem;
border: 1px dashed var(--border);
border-radius: var(--radius-sm);
}
.vs-actions {
display: flex;
gap: 0.75rem;
margin-top: 1.5rem;
flex-wrap: wrap;
}
.vs-btn-preview,
.vs-btn-save {
flex: 1;
padding: 0.6rem 1.2rem;
border-radius: var(--radius-sm);
font-family: var(--font);
font-size: 0.85rem;
font-weight: 700;
letter-spacing: 0.08em;
cursor: pointer;
min-height: 44px;
transition: opacity 0.2s, box-shadow 0.2s, background 0.2s;
}
.vs-btn-preview {
background: transparent;
border: 1px solid var(--purple);
color: var(--purple);
}
.vs-btn-preview:hover {
background: rgba(124, 58, 237, 0.15);
box-shadow: 0 0 8px rgba(124, 58, 237, 0.3);
}
.vs-btn-save {
background: var(--green);
border: none;
color: var(--bg-deep);
}
.vs-btn-save:hover { opacity: 0.85; }

View File

@@ -9,10 +9,8 @@ import json
from pathlib import Path
import pytest
import scripts.export_trajectories as et
# ── Fixtures ──────────────────────────────────────────────────────────────────
@@ -22,10 +20,30 @@ def simple_session(tmp_path: Path) -> Path:
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
entries = [
{"type": "message", "role": "user", "content": "What time is it?", "timestamp": "2026-03-01T10:00:00"},
{"type": "message", "role": "timmy", "content": "It is 10:00 AM.", "timestamp": "2026-03-01T10:00:01"},
{"type": "message", "role": "user", "content": "Thanks!", "timestamp": "2026-03-01T10:00:05"},
{"type": "message", "role": "timmy", "content": "You're welcome!", "timestamp": "2026-03-01T10:00:06"},
{
"type": "message",
"role": "user",
"content": "What time is it?",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "message",
"role": "timmy",
"content": "It is 10:00 AM.",
"timestamp": "2026-03-01T10:00:01",
},
{
"type": "message",
"role": "user",
"content": "Thanks!",
"timestamp": "2026-03-01T10:00:05",
},
{
"type": "message",
"role": "timmy",
"content": "You're welcome!",
"timestamp": "2026-03-01T10:00:06",
},
]
session_file = logs_dir / "session_2026-03-01.jsonl"
session_file.write_text("\n".join(json.dumps(e) for e in entries) + "\n")
@@ -38,7 +56,12 @@ def tool_call_session(tmp_path: Path) -> Path:
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
entries = [
{"type": "message", "role": "user", "content": "Read CLAUDE.md", "timestamp": "2026-03-01T10:00:00"},
{
"type": "message",
"role": "user",
"content": "Read CLAUDE.md",
"timestamp": "2026-03-01T10:00:00",
},
{
"type": "tool_call",
"tool": "read_file",
@@ -46,7 +69,12 @@ def tool_call_session(tmp_path: Path) -> Path:
"result": "# CLAUDE.md content here",
"timestamp": "2026-03-01T10:00:01",
},
{"type": "message", "role": "timmy", "content": "Here is the content.", "timestamp": "2026-03-01T10:00:02"},
{
"type": "message",
"role": "timmy",
"content": "Here is the content.",
"timestamp": "2026-03-01T10:00:02",
},
]
session_file = logs_dir / "session_2026-03-01.jsonl"
session_file.write_text("\n".join(json.dumps(e) for e in entries) + "\n")
@@ -236,7 +264,7 @@ def test_export_training_data_writes_jsonl(simple_session: Path, tmp_path: Path)
count = et.export_training_data(logs_dir=simple_session, output_path=output)
assert count == 2
assert output.exists()
lines = [json.loads(l) for l in output.read_text().splitlines() if l.strip()]
lines = [json.loads(line) for line in output.read_text().splitlines() if line.strip()]
assert len(lines) == 2
for line in lines:
assert "messages" in line
@@ -270,16 +298,22 @@ def test_export_training_data_returns_zero_for_empty_logs(tmp_path: Path) -> Non
@pytest.mark.unit
def test_cli_missing_logs_dir(tmp_path: Path) -> None:
rc = et.main(["--logs-dir", str(tmp_path / "nonexistent"), "--output", str(tmp_path / "out.jsonl")])
rc = et.main(
["--logs-dir", str(tmp_path / "nonexistent"), "--output", str(tmp_path / "out.jsonl")]
)
assert rc == 1
@pytest.mark.unit
def test_cli_exports_and_returns_zero(simple_session: Path, tmp_path: Path) -> None:
output = tmp_path / "out.jsonl"
rc = et.main([
"--logs-dir", str(simple_session),
"--output", str(output),
])
rc = et.main(
[
"--logs-dir",
str(simple_session),
"--output",
str(output),
]
)
assert rc == 0
assert output.exists()

View File

@@ -0,0 +1,196 @@
"""Tests for agent emotional state simulation (src/timmy/agents/emotional_state.py)."""
import time
from unittest.mock import patch
from timmy.agents.emotional_state import (
EMOTION_PROMPT_MODIFIERS,
EMOTIONAL_STATES,
EVENT_TRANSITIONS,
EmotionalState,
EmotionalStateTracker,
_intensity_label,
)
class TestEmotionalState:
"""Test the EmotionalState dataclass."""
def test_defaults(self):
state = EmotionalState()
assert state.current_emotion == "calm"
assert state.intensity == 0.5
assert state.previous_emotion == "calm"
assert state.trigger_event == ""
def test_to_dict_includes_label(self):
state = EmotionalState(current_emotion="analytical")
d = state.to_dict()
assert d["emotion_label"] == "Analytical"
assert d["current_emotion"] == "analytical"
def test_to_dict_all_fields(self):
state = EmotionalState(
current_emotion="frustrated",
intensity=0.8,
previous_emotion="calm",
trigger_event="task_failure",
)
d = state.to_dict()
assert d["current_emotion"] == "frustrated"
assert d["intensity"] == 0.8
assert d["previous_emotion"] == "calm"
assert d["trigger_event"] == "task_failure"
class TestEmotionalStates:
"""Validate the emotional states and transitions are well-defined."""
def test_all_states_are_strings(self):
for state in EMOTIONAL_STATES:
assert isinstance(state, str)
def test_all_states_have_prompt_modifiers(self):
for state in EMOTIONAL_STATES:
assert state in EMOTION_PROMPT_MODIFIERS
def test_all_transitions_target_valid_states(self):
for event_type, (emotion, intensity) in EVENT_TRANSITIONS.items():
assert emotion in EMOTIONAL_STATES, f"{event_type} targets unknown state: {emotion}"
assert 0.0 <= intensity <= 1.0, f"{event_type} has invalid intensity: {intensity}"
class TestEmotionalStateTracker:
"""Test the EmotionalStateTracker."""
def test_initial_emotion_default(self):
tracker = EmotionalStateTracker()
assert tracker.state.current_emotion == "calm"
def test_initial_emotion_custom(self):
tracker = EmotionalStateTracker(initial_emotion="analytical")
assert tracker.state.current_emotion == "analytical"
def test_initial_emotion_invalid_falls_back(self):
tracker = EmotionalStateTracker(initial_emotion="invalid_state")
assert tracker.state.current_emotion == "calm"
def test_process_known_event(self):
tracker = EmotionalStateTracker()
state = tracker.process_event("task_success")
assert state.current_emotion == "confident"
assert state.trigger_event == "task_success"
assert state.previous_emotion == "calm"
def test_process_unknown_event_ignored(self):
tracker = EmotionalStateTracker()
state = tracker.process_event("unknown_event_xyz")
assert state.current_emotion == "calm" # unchanged
def test_repeated_same_emotion_amplifies(self):
tracker = EmotionalStateTracker()
tracker.process_event("task_success")
initial_intensity = tracker.state.intensity
tracker.process_event("user_praise") # also targets confident
assert tracker.state.intensity >= initial_intensity
def test_different_emotion_replaces(self):
tracker = EmotionalStateTracker()
tracker.process_event("task_success")
assert tracker.state.current_emotion == "confident"
tracker.process_event("task_failure")
assert tracker.state.current_emotion == "frustrated"
assert tracker.state.previous_emotion == "confident"
def test_decay_no_effect_when_recent(self):
tracker = EmotionalStateTracker()
tracker.process_event("task_failure")
emotion_before = tracker.state.current_emotion
tracker.decay()
assert tracker.state.current_emotion == emotion_before
def test_decay_resets_to_calm_after_long_time(self):
tracker = EmotionalStateTracker()
tracker.process_event("task_failure")
assert tracker.state.current_emotion == "frustrated"
# Simulate passage of time (30+ minutes)
tracker.state.updated_at = time.time() - 2000
tracker.decay()
assert tracker.state.current_emotion == "calm"
def test_get_profile_returns_expected_keys(self):
tracker = EmotionalStateTracker()
profile = tracker.get_profile()
assert "current_emotion" in profile
assert "emotion_label" in profile
assert "intensity" in profile
assert "intensity_label" in profile
assert "previous_emotion" in profile
assert "trigger_event" in profile
assert "prompt_modifier" in profile
def test_get_prompt_modifier_returns_string(self):
tracker = EmotionalStateTracker(initial_emotion="cautious")
modifier = tracker.get_prompt_modifier()
assert isinstance(modifier, str)
assert "cautious" in modifier.lower()
def test_reset(self):
tracker = EmotionalStateTracker()
tracker.process_event("task_failure")
tracker.reset()
assert tracker.state.current_emotion == "calm"
assert tracker.state.intensity == 0.5
def test_process_event_with_context(self):
"""Context dict is accepted without error."""
tracker = EmotionalStateTracker()
state = tracker.process_event("error", {"details": "connection timeout"})
assert state.current_emotion == "cautious"
def test_event_chain_scenario(self):
"""Simulate: task assigned → success → new discovery → idle."""
tracker = EmotionalStateTracker()
tracker.process_event("task_assigned")
assert tracker.state.current_emotion == "analytical"
tracker.process_event("task_success")
assert tracker.state.current_emotion == "confident"
tracker.process_event("new_discovery")
assert tracker.state.current_emotion == "curious"
tracker.process_event("idle")
assert tracker.state.current_emotion == "calm"
def test_health_events(self):
tracker = EmotionalStateTracker()
tracker.process_event("health_low")
assert tracker.state.current_emotion == "cautious"
tracker.process_event("health_recovered")
assert tracker.state.current_emotion == "calm"
def test_quest_completed_triggers_adventurous(self):
tracker = EmotionalStateTracker()
tracker.process_event("quest_completed")
assert tracker.state.current_emotion == "adventurous"
class TestIntensityLabel:
def test_overwhelming(self):
assert _intensity_label(0.9) == "overwhelming"
def test_strong(self):
assert _intensity_label(0.7) == "strong"
def test_moderate(self):
assert _intensity_label(0.5) == "moderate"
def test_mild(self):
assert _intensity_label(0.3) == "mild"
def test_faint(self):
assert _intensity_label(0.1) == "faint"

View File

@@ -435,14 +435,14 @@ class TestStatusAndCapabilities:
tools=["calc"],
)
status = agent.get_status()
assert status == {
"agent_id": "bot-1",
"name": "TestBot",
"role": "assistant",
"model": "qwen3:30b",
"status": "ready",
"tools": ["calc"],
}
assert status["agent_id"] == "bot-1"
assert status["name"] == "TestBot"
assert status["role"] == "assistant"
assert status["model"] == "qwen3:30b"
assert status["status"] == "ready"
assert status["tools"] == ["calc"]
assert "emotional_profile" in status
assert status["emotional_profile"]["current_emotion"] == "calm"
# ── SubAgent.execute_task ────────────────────────────────────────────────────

View File

@@ -0,0 +1,444 @@
"""Tests for timmy.sovereignty.session_report.
Refs: #957 (Session Sovereignty Report Generator)
"""
import base64
import json
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
from timmy.sovereignty.session_report import (
_format_duration,
_gather_session_data,
_gather_sovereignty_data,
_render_markdown,
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
# ---------------------------------------------------------------------------
# _format_duration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds_only(self):
assert _format_duration(45) == "45s"
def test_minutes_and_seconds(self):
assert _format_duration(125) == "2m 5s"
def test_hours_minutes_seconds(self):
assert _format_duration(3661) == "1h 1m 1s"
def test_zero(self):
assert _format_duration(0) == "0s"
# ---------------------------------------------------------------------------
# mark_session_start + generate_report (smoke)
# ---------------------------------------------------------------------------
class TestMarkSessionStart:
def test_sets_session_start(self):
import timmy.sovereignty.session_report as sr
sr._SESSION_START = None
mark_session_start()
assert sr._SESSION_START is not None
assert sr._SESSION_START.tzinfo == UTC
def test_idempotent_overwrite(self):
import timmy.sovereignty.session_report as sr
mark_session_start()
first = sr._SESSION_START
time.sleep(0.01)
mark_session_start()
second = sr._SESSION_START
assert second >= first
# ---------------------------------------------------------------------------
# _gather_session_data
# ---------------------------------------------------------------------------
class TestGatherSessionData:
def test_returns_defaults_when_no_file(self, tmp_path):
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = tmp_path / "nonexistent.jsonl"
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 0
assert data["timmy_messages"] == 0
assert data["tool_calls"] == 0
assert data["errors"] == 0
assert data["tool_call_breakdown"] == {}
def test_counts_entries_correctly(self, tmp_path):
session_file = tmp_path / "session_2026-03-23.jsonl"
entries = [
{"type": "message", "role": "user", "content": "hello"},
{"type": "message", "role": "timmy", "content": "hi"},
{"type": "message", "role": "user", "content": "test"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"},
{"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"},
{"type": "error", "error": "boom"},
]
with open(session_file, "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = session_file
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 2
assert data["timmy_messages"] == 1
assert data["tool_calls"] == 3
assert data["errors"] == 1
assert data["tool_call_breakdown"]["memory_search"] == 2
assert data["tool_call_breakdown"]["shell"] == 1
def test_graceful_on_import_error(self):
with patch(
"timmy.sovereignty.session_report.get_session_logger",
side_effect=ImportError("no session_logger"),
):
data = _gather_session_data()
assert data["tool_calls"] == 0
# ---------------------------------------------------------------------------
# _gather_sovereignty_data
# ---------------------------------------------------------------------------
class TestGatherSovereigntyData:
def test_returns_empty_on_import_error(self):
with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}):
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
side_effect=ImportError("no store"),
):
data = _gather_sovereignty_data()
assert data["metrics"] == {}
assert data["deltas"] == {}
assert data["previous_session"] == {}
def test_populates_deltas_from_history(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {
"cache_hit_rate": {"current": 0.5, "phase": "week1"},
}
# get_latest returns newest-first
mock_store.get_latest.return_value = [
{"value": 0.5},
{"value": 0.3},
{"value": 0.1},
]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"cache_hit_rate": {"graduation": 0.9}},
):
data = _gather_sovereignty_data()
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {}
mock_store.get_latest.return_value = [{"value": 0.4}]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"api_cost": {"graduation": 0.01}},
):
data = _gather_sovereignty_data()
delta = data["deltas"]["api_cost"]
assert delta["start"] == 0.4
assert delta["end"] == 0.4
assert data["previous_session"]["api_cost"] is None
# ---------------------------------------------------------------------------
# generate_report (integration — smoke test)
# ---------------------------------------------------------------------------
class TestGenerateReport:
def _minimal_session_data(self):
return {
"user_messages": 3,
"timmy_messages": 3,
"tool_calls": 2,
"errors": 0,
"tool_call_breakdown": {"memory_search": 2},
}
def _minimal_sov_data(self):
return {
"metrics": {
"cache_hit_rate": {"current": 0.45, "phase": "week1"},
"api_cost": {"current": 0.12, "phase": "pre-start"},
},
"deltas": {
"cache_hit_rate": {"start": 0.40, "end": 0.45},
"api_cost": {"start": 0.10, "end": 0.12},
},
"previous_session": {
"cache_hit_rate": 0.40,
"api_cost": 0.10,
},
}
def test_smoke_produces_markdown(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report("test-session")
assert "# Sovereignty Session Report" in report
assert "test-session" in report
assert "## Session Activity" in report
assert "## Sovereignty Scorecard" in report
assert "## Cost Breakdown" in report
assert "## Trend vs Previous Session" in report
def test_report_contains_session_stats(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report()
assert "| User messages | 3 |" in report
assert "memory_search" in report
def test_report_no_previous_session(self):
sov = self._minimal_sov_data()
sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None}
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=sov,
),
):
report = generate_report()
assert "No previous session data" in report
# ---------------------------------------------------------------------------
# commit_report
# ---------------------------------------------------------------------------
class TestCommitReport:
def test_returns_false_when_gitea_disabled(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = False
result = commit_report("# test", "dashboard")
assert result is False
def test_returns_false_when_no_token(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = commit_report("# test", "dashboard")
assert result is False
def test_creates_file_via_put(self):
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.raise_for_status.return_value = None
mock_check = MagicMock()
mock_check.status_code = 404 # file does not exist yet
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# report content", "dashboard")
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded
def test_updates_existing_file_with_sha(self):
mock_check = MagicMock()
mock_check.status_code = 200
mock_check.json.return_value = {"sha": "abc123"}
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# updated", "dashboard")
assert result is True
payload = mock_client.put.call_args.kwargs.get("json", {})
assert payload.get("sha") == "abc123"
def test_returns_false_on_http_error(self):
import httpx
mock_check = MagicMock()
mock_check.status_code = 404
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.side_effect = httpx.HTTPStatusError(
"403", request=MagicMock(), response=MagicMock(status_code=403)
)
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# test", "dashboard")
assert result is False
# ---------------------------------------------------------------------------
# generate_and_commit_report (async)
# ---------------------------------------------------------------------------
class TestGenerateAndCommitReport:
async def test_returns_true_on_success(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=True,
),
):
result = await generate_and_commit_report("test")
assert result is True
async def test_returns_false_when_commit_fails(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=False,
),
):
result = await generate_and_commit_report()
assert result is False
async def test_graceful_on_exception(self):
with patch(
"timmy.sovereignty.session_report.generate_report",
side_effect=RuntimeError("explode"),
):
result = await generate_and_commit_report()
assert result is False

View File

@@ -0,0 +1,452 @@
"""Unit tests for the Hermes health monitor.
Tests all five checks (memory, disk, Ollama, processes, network) using mocks
so no real subprocesses or network calls are made.
Refs: #1073
"""
import json
from io import BytesIO
from unittest.mock import MagicMock, patch
import pytest
from infrastructure.hermes.monitor import CheckResult, HealthLevel, HealthReport, HermesMonitor
@pytest.fixture()
def monitor():
return HermesMonitor()
# ── Unit helpers ──────────────────────────────────────────────────────────────
class _FakeHTTPResponse:
"""Minimal urllib response stub."""
def __init__(self, body: bytes, status: int = 200):
self._body = body
self.status = status
def read(self) -> bytes:
return self._body
def __enter__(self):
return self
def __exit__(self, *_):
pass
# ── Memory check ──────────────────────────────────────────────────────────────
def test_get_memory_info_parses_vm_stat(monitor):
vm_stat_output = (
"Mach Virtual Memory Statistics: (page size of 16384 bytes)\n"
"Pages free: 12800.\n"
"Pages active: 50000.\n"
"Pages inactive: 25600.\n"
"Pages speculative: 1000.\n"
)
with (
patch("subprocess.run") as mock_run,
):
# First call: sysctl hw.memsize (total)
sysctl_result = MagicMock()
sysctl_result.stdout = "68719476736\n" # 64 GB
# Second call: vm_stat
vmstat_result = MagicMock()
vmstat_result.stdout = vm_stat_output
mock_run.side_effect = [sysctl_result, vmstat_result]
info = monitor._get_memory_info()
assert info["total_gb"] == pytest.approx(64.0, abs=0.1)
# pages free (12800) + inactive (25600) = 38400 * 16384 bytes = 629145600 bytes ≈ 0.586 GB
expected_free_gb = (38400 * 16384) / (1024**3)
assert info["free_gb"] == pytest.approx(expected_free_gb, abs=0.001)
def test_get_memory_info_handles_subprocess_failure(monitor):
with patch("subprocess.run", side_effect=OSError("no sysctl")):
info = monitor._get_memory_info()
assert info["total_gb"] == 0.0
assert info["free_gb"] == 0.0
@pytest.mark.asyncio
async def test_check_memory_ok(monitor):
with patch.object(monitor, "_get_memory_info", return_value={"free_gb": 20.0, "total_gb": 64.0}):
result = await monitor._check_memory()
assert result.name == "memory"
assert result.level == HealthLevel.OK
assert "20.0GB" in result.message
@pytest.mark.asyncio
async def test_check_memory_low_triggers_unload(monitor):
with (
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 2.0, "total_gb": 64.0}),
patch.object(monitor, "_unload_ollama_models", return_value=2),
):
result = await monitor._check_memory()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
assert "unloaded 2" in result.message
@pytest.mark.asyncio
async def test_check_memory_critical_no_models_to_unload(monitor):
with (
patch.object(monitor, "_get_memory_info", return_value={"free_gb": 1.0, "total_gb": 64.0}),
patch.object(monitor, "_unload_ollama_models", return_value=0),
):
result = await monitor._check_memory()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
@pytest.mark.asyncio
async def test_check_memory_exception_returns_unknown(monitor):
with patch.object(monitor, "_get_memory_info", side_effect=RuntimeError("boom")):
result = await monitor._check_memory()
assert result.level == HealthLevel.UNKNOWN
# ── Disk check ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_check_disk_ok(monitor):
usage = MagicMock()
usage.free = 100 * (1024**3) # 100 GB
usage.total = 500 * (1024**3) # 500 GB
usage.used = 400 * (1024**3)
with patch("shutil.disk_usage", return_value=usage):
result = await monitor._check_disk()
assert result.level == HealthLevel.OK
assert "100.0GB free" in result.message
@pytest.mark.asyncio
async def test_check_disk_low_triggers_cleanup(monitor):
usage = MagicMock()
usage.free = 5 * (1024**3) # 5 GB — below threshold
usage.total = 500 * (1024**3)
usage.used = 495 * (1024**3)
with (
patch("shutil.disk_usage", return_value=usage),
patch.object(monitor, "_cleanup_temp_files", return_value=2.5),
):
result = await monitor._check_disk()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
assert "cleaned 2.50GB" in result.message
@pytest.mark.asyncio
async def test_check_disk_critical_when_cleanup_fails(monitor):
usage = MagicMock()
usage.free = 5 * (1024**3)
usage.total = 500 * (1024**3)
usage.used = 495 * (1024**3)
with (
patch("shutil.disk_usage", return_value=usage),
patch.object(monitor, "_cleanup_temp_files", return_value=0.0),
):
result = await monitor._check_disk()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
# ── Ollama check ──────────────────────────────────────────────────────────────
def test_get_ollama_status_reachable(monitor):
tags_body = json.dumps({
"models": [{"name": "qwen3:30b"}, {"name": "llama3.1:8b"}]
}).encode()
ps_body = json.dumps({
"models": [{"name": "qwen3:30b", "size": 1000}]
}).encode()
responses = [
_FakeHTTPResponse(tags_body),
_FakeHTTPResponse(ps_body),
]
with patch("urllib.request.urlopen", side_effect=responses):
status = monitor._get_ollama_status()
assert status["reachable"] is True
assert len(status["models"]) == 2
assert len(status["loaded_models"]) == 1
def test_get_ollama_status_unreachable(monitor):
with patch("urllib.request.urlopen", side_effect=OSError("connection refused")):
status = monitor._get_ollama_status()
assert status["reachable"] is False
assert status["models"] == []
assert status["loaded_models"] == []
@pytest.mark.asyncio
async def test_check_ollama_ok(monitor):
status = {
"reachable": True,
"models": [{"name": "qwen3:30b"}],
"loaded_models": [],
}
with patch.object(monitor, "_get_ollama_status", return_value=status):
result = await monitor._check_ollama()
assert result.level == HealthLevel.OK
assert result.details["reachable"] is True
@pytest.mark.asyncio
async def test_check_ollama_unreachable_restart_success(monitor):
status = {"reachable": False, "models": [], "loaded_models": []}
with (
patch.object(monitor, "_get_ollama_status", return_value=status),
patch.object(monitor, "_restart_ollama", return_value=True),
):
result = await monitor._check_ollama()
assert result.level == HealthLevel.WARNING
assert result.auto_resolved is True
@pytest.mark.asyncio
async def test_check_ollama_unreachable_restart_fails(monitor):
status = {"reachable": False, "models": [], "loaded_models": []}
with (
patch.object(monitor, "_get_ollama_status", return_value=status),
patch.object(monitor, "_restart_ollama", return_value=False),
):
result = await monitor._check_ollama()
assert result.level == HealthLevel.CRITICAL
assert result.needs_human is True
# ── Process check ─────────────────────────────────────────────────────────────
def test_get_zombie_processes_none(monitor):
ps_output = (
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
"alex 456 0.0 0.1 50 100 s0 S 1:01 0:00 bash\n"
)
result = MagicMock()
result.stdout = ps_output
with patch("subprocess.run", return_value=result):
info = monitor._get_zombie_processes()
assert info["zombies"] == []
def test_get_zombie_processes_found(monitor):
ps_output = (
"USER PID %CPU %MEM VSZ RSS TT STAT STARTED TIME COMMAND\n"
"alex 123 0.1 0.2 100 200 s0 S 1:00 0:01 python\n"
"alex 789 0.0 0.0 0 0 s0 Z 1:02 0:00 defunct\n"
)
result = MagicMock()
result.stdout = ps_output
with patch("subprocess.run", return_value=result):
info = monitor._get_zombie_processes()
assert len(info["zombies"]) == 1
assert info["zombies"][0]["pid"] == "789"
@pytest.mark.asyncio
async def test_check_processes_no_zombies(monitor):
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": []}):
result = await monitor._check_processes()
assert result.level == HealthLevel.OK
@pytest.mark.asyncio
async def test_check_processes_zombies_warning(monitor):
zombies = [{"pid": "100", "command": "defunct"}, {"pid": "101", "command": "defunct"}]
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
result = await monitor._check_processes()
assert result.level == HealthLevel.WARNING
assert result.needs_human is False # Only 2, threshold is >3
@pytest.mark.asyncio
async def test_check_processes_many_zombies_needs_human(monitor):
zombies = [{"pid": str(i), "command": "defunct"} for i in range(5)]
with patch.object(monitor, "_get_zombie_processes", return_value={"zombies": zombies}):
result = await monitor._check_processes()
assert result.needs_human is True
# ── Network check ─────────────────────────────────────────────────────────────
def test_check_gitea_connectivity_ok(monitor):
body = json.dumps({"version": "1.22.0"}).encode()
with patch("urllib.request.urlopen", return_value=_FakeHTTPResponse(body, status=200)):
info = monitor._check_gitea_connectivity()
assert info["reachable"] is True
assert info["latency_ms"] >= 0
def test_check_gitea_connectivity_unreachable(monitor):
with patch("urllib.request.urlopen", side_effect=OSError("refused")):
info = monitor._check_gitea_connectivity()
assert info["reachable"] is False
assert "error" in info
@pytest.mark.asyncio
async def test_check_network_ok(monitor):
with patch.object(
monitor,
"_check_gitea_connectivity",
return_value={"reachable": True, "latency_ms": 5.0, "url": "http://localhost:3000"},
):
result = await monitor._check_network()
assert result.level == HealthLevel.OK
assert "Gitea reachable" in result.message
@pytest.mark.asyncio
async def test_check_network_unreachable(monitor):
with patch.object(
monitor,
"_check_gitea_connectivity",
return_value={"reachable": False, "error": "refused", "url": "http://localhost:3000"},
):
result = await monitor._check_network()
assert result.level == HealthLevel.WARNING
assert result.needs_human is True
# ── Full cycle ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_run_cycle_all_ok(monitor):
ok_result = CheckResult(name="test", level=HealthLevel.OK, message="ok")
async def _ok_check():
return ok_result
with (
patch.object(monitor, "_check_memory", _ok_check),
patch.object(monitor, "_check_disk", _ok_check),
patch.object(monitor, "_check_ollama", _ok_check),
patch.object(monitor, "_check_processes", _ok_check),
patch.object(monitor, "_check_network", _ok_check),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
assert report.overall == HealthLevel.OK
assert not report.has_issues
assert monitor.last_report is report
@pytest.mark.asyncio
async def test_run_cycle_sets_overall_to_worst(monitor):
async def _ok():
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
async def _critical():
return CheckResult(name="critical", level=HealthLevel.CRITICAL, message="bad")
with (
patch.object(monitor, "_check_memory", _ok),
patch.object(monitor, "_check_disk", _critical),
patch.object(monitor, "_check_ollama", _ok),
patch.object(monitor, "_check_processes", _ok),
patch.object(monitor, "_check_network", _ok),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
assert report.overall == HealthLevel.CRITICAL
assert report.has_issues is True
@pytest.mark.asyncio
async def test_run_cycle_exception_becomes_unknown(monitor):
async def _ok():
return CheckResult(name="ok", level=HealthLevel.OK, message="ok")
async def _boom():
raise RuntimeError("unexpected error")
with (
patch.object(monitor, "_check_memory", _ok),
patch.object(monitor, "_check_disk", _ok),
patch.object(monitor, "_check_ollama", _boom),
patch.object(monitor, "_check_processes", _ok),
patch.object(monitor, "_check_network", _ok),
patch.object(monitor, "_handle_alerts"),
):
report = await monitor.run_cycle()
levels = {c.level for c in report.checks}
assert HealthLevel.UNKNOWN in levels
# ── to_dict serialisation ────────────────────────────────────────────────────
def test_check_result_to_dict():
c = CheckResult(
name="memory",
level=HealthLevel.WARNING,
message="low",
details={"free_gb": 3.5},
auto_resolved=True,
)
d = c.to_dict()
assert d["name"] == "memory"
assert d["level"] == "warning"
assert d["auto_resolved"] is True
assert d["details"]["free_gb"] == 3.5
def test_health_report_to_dict():
checks = [
CheckResult(name="disk", level=HealthLevel.OK, message="ok"),
]
report = HealthReport(
timestamp="2026-01-01T00:00:00+00:00",
checks=checks,
overall=HealthLevel.OK,
)
d = report.to_dict()
assert d["overall"] == "ok"
assert d["has_issues"] is False
assert len(d["checks"]) == 1

View File

@@ -9,19 +9,15 @@ Refs: #1105
from __future__ import annotations
import json
import tempfile
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pytest
from timmy_automations.retrain.quality_filter import QualityFilter, TrajectoryQuality
from timmy_automations.retrain.retrain import RetrainOrchestrator
from timmy_automations.retrain.training_dataset import TrainingDataset
from timmy_automations.retrain.training_log import CycleMetrics, TrainingLog
from timmy_automations.retrain.trajectory_exporter import Trajectory, TrajectoryExporter
# ── Fixtures ─────────────────────────────────────────────────────────────────
@@ -382,7 +378,7 @@ class TestTrainingDataset:
ds = TrainingDataset(repo_root=tmp_path)
ds.append([self._make_result()], "2026-W12")
with open(ds.dataset_path) as f:
lines = [l.strip() for l in f if l.strip()]
lines = [line.strip() for line in f if line.strip()]
assert len(lines) == 1
record = json.loads(lines[0])
assert "messages" in record

View File

@@ -0,0 +1,103 @@
"""Unit tests for timmy.vassal.agent_health."""
from __future__ import annotations
import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
def test_agent_status_idle_default():
s = AgentStatus(agent="claude")
assert s.is_idle is True
assert s.is_stuck is False
assert s.needs_reassignment is False
def test_agent_status_active():
s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11])
s.is_idle = len(s.active_issue_numbers) == 0
assert s.is_idle is False
def test_agent_status_stuck():
s = AgentStatus(
agent="claude",
active_issue_numbers=[7],
stuck_issue_numbers=[7],
is_idle=False,
)
assert s.is_stuck is True
assert s.needs_reassignment is True
# ---------------------------------------------------------------------------
# AgentHealthReport
# ---------------------------------------------------------------------------
def test_report_any_stuck():
claude = AgentStatus(agent="claude", stuck_issue_numbers=[3])
kimi = AgentStatus(agent="kimi")
report = AgentHealthReport(agents=[claude, kimi])
assert report.any_stuck is True
def test_report_all_idle():
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.all_idle is True
def test_report_for_agent_found():
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
found = report.for_agent("kimi")
assert found is kimi
def test_report_for_agent_not_found():
report = AgentHealthReport(agents=[AgentStatus(agent="claude")])
assert report.for_agent("timmy") is None
# ---------------------------------------------------------------------------
# check_agent_health — no Gitea in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_agent_health_unknown_agent():
"""Unknown agent name returns idle status without error."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("unknown-bot")
assert status.agent == "unknown-bot"
assert status.is_idle is True
@pytest.mark.asyncio
async def test_check_agent_health_no_token():
"""Returns idle status gracefully when Gitea token is absent."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("claude")
# Should not raise; returns idle (no active issues discovered)
assert isinstance(status, AgentStatus)
assert status.agent == "claude"
@pytest.mark.asyncio
async def test_get_full_health_report_returns_both_agents():
from timmy.vassal.agent_health import get_full_health_report
report = await get_full_health_report()
agent_names = {a.agent for a in report.agents}
assert "claude" in agent_names
assert "kimi" in agent_names

View File

@@ -0,0 +1,186 @@
"""Unit tests for timmy.vassal.backlog — triage and fetch helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import (
AgentTarget,
TriagedIssue,
_choose_agent,
_extract_labels,
_score_priority,
triage_issues,
)
# ---------------------------------------------------------------------------
# _extract_labels
# ---------------------------------------------------------------------------
def test_extract_labels_empty():
assert _extract_labels({}) == []
def test_extract_labels_normalises_case():
issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]}
assert _extract_labels(issue) == ["high", "feature"]
# ---------------------------------------------------------------------------
# _score_priority
# ---------------------------------------------------------------------------
def test_priority_urgent():
assert _score_priority(["urgent"], []) == 100
def test_priority_high():
assert _score_priority(["high"], []) == 75
def test_priority_normal_default():
assert _score_priority([], []) == 50
def test_priority_assigned_penalised():
# already assigned → subtract 20
score = _score_priority([], ["some-agent"])
assert score == 30
def test_priority_label_substring_match():
# "critical" contains "critical" → 90
assert _score_priority(["critical-bug"], []) == 90
# ---------------------------------------------------------------------------
# _choose_agent
# ---------------------------------------------------------------------------
def test_choose_claude_for_architecture():
target, rationale = _choose_agent("Refactor auth middleware", "", [])
assert target == AgentTarget.CLAUDE
assert "complex" in rationale or "high-complexity" in rationale
def test_choose_kimi_for_research():
target, rationale = _choose_agent("Deep research on embedding models", "", [])
assert target == AgentTarget.KIMI
def test_choose_timmy_for_docs():
target, rationale = _choose_agent("Update documentation for CLI", "", [])
assert target == AgentTarget.TIMMY
def test_choose_timmy_default():
target, rationale = _choose_agent("Fix typo in README", "simple change", [])
# Could route to timmy (docs/trivial) or default — either is valid
assert isinstance(target, AgentTarget)
def test_choose_agent_label_wins():
# "security" label → Claude
target, _ = _choose_agent("Login page", "", ["security"])
assert target == AgentTarget.CLAUDE
# ---------------------------------------------------------------------------
# triage_issues
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int,
title: str,
body: str = "",
labels: list[str] | None = None,
assignees: list[str] | None = None,
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"html_url": f"http://gitea/issues/{number}",
}
def test_triage_returns_sorted_by_priority():
issues = [
_make_raw_issue(1, "Routine docs update", labels=["docs"]),
_make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]),
_make_raw_issue(3, "Normal feature", labels=[]),
]
triaged = triage_issues(issues)
# Highest priority first
assert triaged[0].number == 2
assert triaged[0].priority_score == 100 # urgent label
def test_triage_prs_can_be_included():
# triage_issues does not filter PRs — that's fetch_open_issues's job
issues = [_make_raw_issue(10, "A PR-like issue")]
triaged = triage_issues(issues)
assert len(triaged) == 1
def test_triage_empty():
assert triage_issues([]) == []
def test_triage_routing():
issues = [
_make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"),
_make_raw_issue(2, "Refactor agent loader", body="architecture change"),
_make_raw_issue(3, "Fix typo in docs", labels=["docs"]),
]
triaged = {i.number: i for i in triage_issues(issues)}
assert triaged[1].agent_target == AgentTarget.KIMI
assert triaged[2].agent_target == AgentTarget.CLAUDE
assert triaged[3].agent_target == AgentTarget.TIMMY
def test_triage_preserves_url():
issues = [_make_raw_issue(42, "Some issue")]
triaged = triage_issues(issues)
assert triaged[0].url == "http://gitea/issues/42"
# ---------------------------------------------------------------------------
# fetch_open_issues — no Gitea available in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch):
"""When Gitea is disabled, fetch returns [] without raising."""
import timmy.vassal.backlog as bl
# Patch settings
class FakeSettings:
gitea_enabled = False
gitea_token = ""
gitea_url = "http://localhost:3000"
gitea_repo = "owner/repo"
monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import
# We can't easily monkeypatch `from config import settings` inside the function,
# so test the no-token path via environment
import os
original = os.environ.pop("GITEA_TOKEN", None)
try:
result = await bl.fetch_open_issues()
# Should return [] gracefully (no token configured by default in test env)
assert isinstance(result, list)
finally:
if original is not None:
os.environ["GITEA_TOKEN"] = original

View File

@@ -0,0 +1,114 @@
"""Unit tests for timmy.vassal.dispatch — routing and label helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import AgentTarget, TriagedIssue
from timmy.vassal.dispatch import (
DispatchRecord,
clear_dispatch_registry,
get_dispatch_registry,
)
def _make_triaged(
number: int,
title: str,
agent: AgentTarget,
priority: int = 50,
) -> TriagedIssue:
return TriagedIssue(
number=number,
title=title,
body="",
agent_target=agent,
priority_score=priority,
rationale="test rationale",
url=f"http://gitea/issues/{number}",
)
# ---------------------------------------------------------------------------
# Registry helpers
# ---------------------------------------------------------------------------
def test_registry_starts_empty():
clear_dispatch_registry()
assert get_dispatch_registry() == {}
def test_registry_returns_copy():
clear_dispatch_registry()
reg = get_dispatch_registry()
reg[999] = None # type: ignore[assignment]
assert 999 not in get_dispatch_registry()
# ---------------------------------------------------------------------------
# dispatch_issue — Timmy self-dispatch (no Gitea required)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dispatch_timmy_self_no_gitea():
"""Timmy self-dispatch records without hitting Gitea."""
clear_dispatch_registry()
issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert isinstance(record, DispatchRecord)
assert record.issue_number == 1
assert record.agent == AgentTarget.TIMMY
assert 1 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_claude_no_gitea_token():
"""Claude dispatch gracefully degrades when Gitea token is absent."""
clear_dispatch_registry()
issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.issue_number == 2
assert record.agent == AgentTarget.CLAUDE
# label/comment not applied — no token
assert record.label_applied is False
assert 2 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_kimi_no_gitea_token():
clear_dispatch_registry()
issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.agent == AgentTarget.KIMI
assert record.label_applied is False
# ---------------------------------------------------------------------------
# DispatchRecord fields
# ---------------------------------------------------------------------------
def test_dispatch_record_defaults():
r = DispatchRecord(
issue_number=5,
issue_title="Test issue",
agent=AgentTarget.TIMMY,
rationale="because",
)
assert r.label_applied is False
assert r.comment_posted is False
assert r.dispatched_at # has a timestamp

View File

@@ -0,0 +1,116 @@
"""Unit tests for timmy.vassal.house_health."""
from __future__ import annotations
import pytest
from timmy.vassal.house_health import (
DiskUsage,
MemoryUsage,
OllamaHealth,
SystemSnapshot,
_probe_disk,
)
# ---------------------------------------------------------------------------
# Data model tests
# ---------------------------------------------------------------------------
def test_system_snapshot_healthy_when_no_warnings():
snap = SystemSnapshot()
assert snap.healthy is True
def test_system_snapshot_unhealthy_with_warnings():
snap = SystemSnapshot(warnings=["disk 90% full"])
assert snap.healthy is False
def test_disk_usage_defaults():
d = DiskUsage()
assert d.percent_used == 0.0
assert d.path == "/"
def test_memory_usage_defaults():
m = MemoryUsage()
assert m.percent_used == 0.0
def test_ollama_health_defaults():
o = OllamaHealth()
assert o.reachable is False
assert o.loaded_models == []
# ---------------------------------------------------------------------------
# _probe_disk — runs against real filesystem
# ---------------------------------------------------------------------------
def test_probe_disk_root():
result = _probe_disk("/")
assert result.total_gb > 0
assert 0.0 <= result.percent_used <= 100.0
assert result.free_gb >= 0
def test_probe_disk_bad_path():
result = _probe_disk("/nonexistent_path_xyz")
# Should not raise — returns zeroed DiskUsage
assert result.percent_used == 0.0
# ---------------------------------------------------------------------------
# get_system_snapshot — async
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_system_snapshot_returns_snapshot():
from timmy.vassal.house_health import get_system_snapshot
snap = await get_system_snapshot()
assert isinstance(snap, SystemSnapshot)
# Disk is always probed
assert snap.disk.total_gb >= 0
# Ollama is likely unreachable in test env — that's fine
assert isinstance(snap.ollama, OllamaHealth)
@pytest.mark.asyncio
async def test_get_system_snapshot_disk_warning(monkeypatch):
"""When disk is above threshold, a warning is generated."""
import timmy.vassal.house_health as hh
# Patch _probe_disk to return high usage
def _full_disk(path: str) -> DiskUsage:
return DiskUsage(
path=path,
total_gb=100.0,
used_gb=90.0,
free_gb=10.0,
percent_used=90.0,
)
monkeypatch.setattr(hh, "_probe_disk", _full_disk)
snap = await hh.get_system_snapshot()
assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings)
# ---------------------------------------------------------------------------
# cleanup_stale_files — temp dir test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cleanup_stale_files_missing_dir():
"""Should not raise when the target dir doesn't exist."""
from timmy.vassal.house_health import cleanup_stale_files
result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"])
assert result["deleted_count"] == 0
assert result["errors"] == []

View File

@@ -0,0 +1,139 @@
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
from __future__ import annotations
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
def test_cycle_record_healthy_when_no_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
)
assert r.healthy is True
def test_cycle_record_unhealthy_with_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
errors=["backlog: connection refused"],
)
assert r.healthy is False
def test_cycle_record_unhealthy_with_warnings():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
house_warnings=["disk 90% full"],
)
assert r.healthy is False
# ---------------------------------------------------------------------------
# VassalOrchestrator state
# ---------------------------------------------------------------------------
def test_orchestrator_initial_state():
orch = VassalOrchestrator()
assert orch.cycle_count == 0
assert orch.is_running is False
assert orch.history == []
def test_orchestrator_get_status_no_cycles():
orch = VassalOrchestrator()
status = orch.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
# ---------------------------------------------------------------------------
# run_cycle — integration (no Gitea, no Ollama in test env)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_cycle_completes_without_services():
"""run_cycle must complete and record even when external services are down."""
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator(cycle_interval=300)
record = await orch.run_cycle()
assert isinstance(record, VassalCycleRecord)
assert record.cycle_id == 1
assert record.finished_at # was set
assert record.duration_ms >= 0
# No Gitea → fetched = 0, dispatched = 0
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
# History updated
assert len(orch.history) == 1
assert orch.cycle_count == 1
@pytest.mark.asyncio
async def test_run_cycle_increments_cycle_count():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
await orch.run_cycle()
assert orch.cycle_count == 2
assert len(orch.history) == 2
@pytest.mark.asyncio
async def test_get_status_after_cycle():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
status = orch.get_status()
assert status["cycle_count"] == 1
last = status["last_cycle"]
assert last is not None
assert last["cycle_id"] == 1
assert last["issues_fetched"] == 0
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
def test_orchestrator_stop_when_not_running():
"""stop() on an idle orchestrator must not raise."""
orch = VassalOrchestrator()
orch.stop() # should be a no-op
assert orch.is_running is False
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
def test_module_singleton_exists():
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)