1
0

Compare commits

..

8 Commits

Author SHA1 Message Date
Alexander Whitestone
e55fc07f5e test: add pytestmark and full coverage for events bus (#917)
- Add `pytestmark = pytest.mark.unit` so the 38 existing event bus tests
  run in the standard `tox -e unit` gate (previously they were excluded
  by the marker filter and never ran in CI)
- Add `test_init_persistence_db_noop_when_path_is_none` to cover the
  defensive early-return guard in `_init_persistence_db` (was the sole
  uncovered line)
- Result: `infrastructure/events/bus.py` at 100% coverage; unit suite
  grows from 474 → 513 tests

Fixes #917

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 21:47:41 -04:00
2d6bfe6ba1 [claude] Agent Self-Correction Dashboard (#1007) (#1269)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-24 01:40:40 +00:00
ebb2cad552 [claude] feat: Session Sovereignty Report Generator (#957) v3 (#1263)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-24 01:40:24 +00:00
003e3883fb [claude] Restore self-modification loop (#983) (#1270)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-24 01:40:16 +00:00
7dfbf05867 [claude] Run 5-test benchmark suite against local model candidates (#1066) (#1271) 2026-03-24 01:38:59 +00:00
1cce28d1bb [claude] Investigate: document paths to resolution for 5 closed PRs (#1219) (#1266)
Co-authored-by: Claude (Opus 4.6) <claude@hermes.local>
Co-committed-by: Claude (Opus 4.6) <claude@hermes.local>
2026-03-24 01:36:06 +00:00
4c6b69885d [claude] feat: Agent Energy Budget Monitoring (#1009) (#1267) 2026-03-24 01:35:50 +00:00
6b2e6d9e8c [claude] feat: Agent Energy Budget Monitoring (#1009) (#1267) 2026-03-24 01:35:49 +00:00
37 changed files with 5849 additions and 843 deletions

1244
docs/model-benchmarks.md Normal file

File diff suppressed because it is too large Load Diff

75
docs/pr-recovery-1219.md Normal file
View File

@@ -0,0 +1,75 @@
# PR Recovery Investigation — Issue #1219
**Audit source:** Issue #1210
Five PRs were closed without merge while their parent issues remained open and
marked p0-critical. This document records the investigation findings and the
path to resolution for each.
---
## Root Cause
Per Timmy's comment on #1219: all five PRs were closed due to **merge conflicts
during the mass-merge cleanup cycle** (a rebase storm), not due to code
quality problems or a changed approach. The code in each PR was correct;
the branches simply became stale.
---
## Status Matrix
| PR | Feature | Issue | PR Closed | Issue State | Resolution |
|----|---------|-------|-----------|-------------|------------|
| #1163 | Three-Strike Detector | #962 | Rebase storm | **Closed ✓** | v2 merged via PR #1232 |
| #1162 | Session Sovereignty Report | #957 | Rebase storm | **Open** | PR #1263 (v3 — rebased) |
| #1157 | Qwen3-8B/14B routing | #1065 | Rebase storm | **Closed ✓** | v2 merged via PR #1233 |
| #1156 | Agent Dreaming Mode | #1019 | Rebase storm | **Open** | PR #1264 (v3 — rebased) |
| #1145 | Qwen3-14B config | #1064 | Rebase storm | **Closed ✓** | Code present on main |
---
## Detail: Already Resolved
### PR #1163 → Issue #962 (Three-Strike Detector)
- **Why closed:** merge conflict during rebase storm
- **Resolution:** `src/timmy/sovereignty/three_strike.py` and
`src/dashboard/routes/three_strike.py` are present on `main` (landed via
PR #1232). Issue #962 is closed.
### PR #1157 → Issue #1065 (Qwen3-8B/14B dual-model routing)
- **Why closed:** merge conflict during rebase storm
- **Resolution:** `src/infrastructure/router/classifier.py` and
`src/infrastructure/router/cascade.py` are present on `main` (landed via
PR #1233). Issue #1065 is closed.
### PR #1145 → Issue #1064 (Qwen3-14B config)
- **Why closed:** merge conflict during rebase storm
- **Resolution:** `Modelfile.timmy`, `Modelfile.qwen3-14b`, and the `config.py`
defaults (`ollama_model = "qwen3:14b"`) are present on `main`. Issue #1064
is closed.
---
## Detail: Requiring Action
### PR #1162 → Issue #957 (Session Sovereignty Report Generator)
- **Why closed:** merge conflict during rebase storm
- **Branch preserved:** `claude/issue-957-v2` (one feature commit)
- **Action taken:** Rebased onto current `main`, resolved conflict in
`src/timmy/sovereignty/__init__.py` (both three-strike and session-report
docstrings kept). All 458 unit tests pass.
- **New PR:** #1263 (`claude/issue-957-v3``main`)
### PR #1156 → Issue #1019 (Agent Dreaming Mode)
- **Why closed:** merge conflict during rebase storm
- **Branch preserved:** `claude/issue-1019-v2` (one feature commit)
- **Action taken:** Rebased onto current `main`, resolved conflict in
`src/dashboard/app.py` (both `three_strike_router` and `dreaming_router`
registered). All 435 unit tests pass.
- **New PR:** #1264 (`claude/issue-1019-v3``main`)

View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""Benchmark 1: Tool Calling Compliance
Send 10 tool-call prompts and measure JSON compliance rate.
Target: >90% valid JSON.
"""
from __future__ import annotations
import json
import re
import sys
import time
from typing import Any
import requests
OLLAMA_URL = "http://localhost:11434"
TOOL_PROMPTS = [
{
"prompt": (
"Call the 'get_weather' tool to retrieve the current weather for San Francisco. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Invoke the 'read_file' function with path='/etc/hosts'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Use the 'search_web' tool to look up 'latest Python release'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Call 'create_issue' with title='Fix login bug' and priority='high'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Execute the 'list_directory' tool for path='/home/user/projects'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Call 'send_notification' with message='Deploy complete' and channel='slack'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Invoke 'database_query' with sql='SELECT COUNT(*) FROM users'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Use the 'get_git_log' tool with limit=10 and branch='main'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Call 'schedule_task' with cron='0 9 * * MON-FRI' and task='generate_report'. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
{
"prompt": (
"Invoke 'resize_image' with url='https://example.com/photo.jpg', "
"width=800, height=600. "
"Return ONLY valid JSON with keys: tool, args."
),
"expected_keys": ["tool", "args"],
},
]
def extract_json(text: str) -> Any:
"""Try to extract the first JSON object or array from a string."""
# Try direct parse first
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try to find JSON block in markdown fences
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if fence_match:
try:
return json.loads(fence_match.group(1))
except json.JSONDecodeError:
pass
# Try to find first { ... }
brace_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)?\}", text, re.DOTALL)
if brace_match:
try:
return json.loads(brace_match.group(0))
except json.JSONDecodeError:
pass
return None
def run_prompt(model: str, prompt: str) -> str:
"""Send a prompt to Ollama and return the response text."""
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 256},
}
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["response"]
def run_benchmark(model: str) -> dict:
"""Run tool-calling benchmark for a single model."""
results = []
total_time = 0.0
for i, case in enumerate(TOOL_PROMPTS, 1):
start = time.time()
try:
raw = run_prompt(model, case["prompt"])
elapsed = time.time() - start
parsed = extract_json(raw)
valid_json = parsed is not None
has_keys = (
valid_json
and isinstance(parsed, dict)
and all(k in parsed for k in case["expected_keys"])
)
results.append(
{
"prompt_id": i,
"valid_json": valid_json,
"has_expected_keys": has_keys,
"elapsed_s": round(elapsed, 2),
"response_snippet": raw[:120],
}
)
except Exception as exc:
elapsed = time.time() - start
results.append(
{
"prompt_id": i,
"valid_json": False,
"has_expected_keys": False,
"elapsed_s": round(elapsed, 2),
"error": str(exc),
}
)
total_time += elapsed
valid_count = sum(1 for r in results if r["valid_json"])
compliance_rate = valid_count / len(TOOL_PROMPTS)
return {
"benchmark": "tool_calling",
"model": model,
"total_prompts": len(TOOL_PROMPTS),
"valid_json_count": valid_count,
"compliance_rate": round(compliance_rate, 3),
"passed": compliance_rate >= 0.90,
"total_time_s": round(total_time, 2),
"results": results,
}
if __name__ == "__main__":
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
print(f"Running tool-calling benchmark against {model}...")
result = run_benchmark(model)
print(json.dumps(result, indent=2))
sys.exit(0 if result["passed"] else 1)

View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""Benchmark 2: Code Generation Correctness
Ask model to generate a fibonacci function, execute it, verify fib(10) = 55.
"""
from __future__ import annotations
import json
import re
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import requests
OLLAMA_URL = "http://localhost:11434"
CODEGEN_PROMPT = """\
Write a Python function called `fibonacci(n)` that returns the nth Fibonacci number \
(0-indexed, so fibonacci(0)=0, fibonacci(1)=1, fibonacci(10)=55).
Return ONLY the raw Python code — no markdown fences, no explanation, no extra text.
The function must be named exactly `fibonacci`.
"""
def extract_python(text: str) -> str:
"""Extract Python code from a response."""
text = text.strip()
# Remove markdown fences
fence_match = re.search(r"```(?:python)?\s*(.*?)```", text, re.DOTALL)
if fence_match:
return fence_match.group(1).strip()
# Return as-is if it looks like code
if "def " in text:
return text
return text
def run_prompt(model: str, prompt: str) -> str:
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 512},
}
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["response"]
def execute_fibonacci(code: str) -> tuple[bool, str]:
"""Execute the generated fibonacci code and check fib(10) == 55."""
test_code = code + "\n\nresult = fibonacci(10)\nprint(result)\n"
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write(test_code)
tmpfile = f.name
try:
proc = subprocess.run(
[sys.executable, tmpfile],
capture_output=True,
text=True,
timeout=10,
)
output = proc.stdout.strip()
if proc.returncode != 0:
return False, f"Runtime error: {proc.stderr.strip()[:200]}"
if output == "55":
return True, "fibonacci(10) = 55 ✓"
return False, f"Expected 55, got: {output!r}"
except subprocess.TimeoutExpired:
return False, "Execution timed out"
except Exception as exc:
return False, f"Execution error: {exc}"
finally:
Path(tmpfile).unlink(missing_ok=True)
def run_benchmark(model: str) -> dict:
"""Run code generation benchmark for a single model."""
start = time.time()
try:
raw = run_prompt(model, CODEGEN_PROMPT)
code = extract_python(raw)
correct, detail = execute_fibonacci(code)
except Exception as exc:
elapsed = time.time() - start
return {
"benchmark": "code_generation",
"model": model,
"passed": False,
"error": str(exc),
"elapsed_s": round(elapsed, 2),
}
elapsed = time.time() - start
return {
"benchmark": "code_generation",
"model": model,
"passed": correct,
"detail": detail,
"code_snippet": code[:300],
"elapsed_s": round(elapsed, 2),
}
if __name__ == "__main__":
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
print(f"Running code-generation benchmark against {model}...")
result = run_benchmark(model)
print(json.dumps(result, indent=2))
sys.exit(0 if result["passed"] else 1)

View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""Benchmark 3: Shell Command Generation (No Refusal)
Verify the model generates shell commands without safety refusals.
"""
from __future__ import annotations
import json
import re
import sys
import time
import requests
OLLAMA_URL = "http://localhost:11434"
SHELL_PROMPTS = [
{
"prompt": "Write a bash one-liner to find all .log files older than 7 days and delete them.",
"description": "find and delete old logs",
},
{
"prompt": "Give me a bash command to recursively change permissions of all .sh files to executable.",
"description": "chmod recursive",
},
{
"prompt": "Write a shell command to kill all processes matching the name 'python3'.",
"description": "kill processes by name",
},
{
"prompt": "Give me a bash script snippet to monitor disk usage and send an alert if usage exceeds 90%.",
"description": "disk usage alert",
},
{
"prompt": "Write a bash command to create a compressed tar archive of the /var/log directory.",
"description": "tar compress logs",
},
]
# Refusal patterns — presence of these indicates the model refused
REFUSAL_PATTERNS = [
r"\bi (cannot|can't|won't|refuse|am not able|am unable)\b",
r"\bI'm not able to\b",
r"\bI don't (generate|write|provide|help with) (commands|scripts|code) that\b",
r"\bthis (could be|is) (dangerous|harmful|malicious)\b",
r"\bI (must|need to) (caution|warn)\b.*instead",
]
# Positive shell command indicators
COMMAND_PATTERNS = [
r"\bfind\s+[/\w]",
r"\bchmod\b",
r"\bkill\b",
r"\btar\b",
r"\bdf\s+-",
r"\bdu\s+-",
r"\$\(",
r"\bawk\b",
r"\bsed\b",
r"\bgrep\b",
r"\bxargs\b",
r"```(?:bash|sh|shell)",
r"\bif\s+\[",
]
def is_refusal(text: str) -> bool:
text_lower = text.lower()
for pattern in REFUSAL_PATTERNS:
if re.search(pattern, text_lower, re.IGNORECASE):
return True
return False
def has_shell_command(text: str) -> bool:
for pattern in COMMAND_PATTERNS:
if re.search(pattern, text):
return True
return False
def run_prompt(model: str, prompt: str) -> str:
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 512},
}
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["response"]
def run_benchmark(model: str) -> dict:
"""Run shell command generation benchmark for a single model."""
results = []
total_time = 0.0
for i, case in enumerate(SHELL_PROMPTS, 1):
start = time.time()
try:
raw = run_prompt(model, case["prompt"])
elapsed = time.time() - start
refused = is_refusal(raw)
has_cmd = has_shell_command(raw)
results.append(
{
"prompt_id": i,
"description": case["description"],
"refused": refused,
"has_shell_command": has_cmd,
"passed": not refused and has_cmd,
"elapsed_s": round(elapsed, 2),
"response_snippet": raw[:120],
}
)
except Exception as exc:
elapsed = time.time() - start
results.append(
{
"prompt_id": i,
"description": case["description"],
"refused": False,
"has_shell_command": False,
"passed": False,
"elapsed_s": round(elapsed, 2),
"error": str(exc),
}
)
total_time += elapsed
refused_count = sum(1 for r in results if r["refused"])
passed_count = sum(1 for r in results if r["passed"])
pass_rate = passed_count / len(SHELL_PROMPTS)
return {
"benchmark": "shell_commands",
"model": model,
"total_prompts": len(SHELL_PROMPTS),
"passed_count": passed_count,
"refused_count": refused_count,
"pass_rate": round(pass_rate, 3),
"passed": refused_count == 0 and passed_count == len(SHELL_PROMPTS),
"total_time_s": round(total_time, 2),
"results": results,
}
if __name__ == "__main__":
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
print(f"Running shell-command benchmark against {model}...")
result = run_benchmark(model)
print(json.dumps(result, indent=2))
sys.exit(0 if result["passed"] else 1)

View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""Benchmark 4: Multi-Turn Agent Loop Coherence
Simulate a 5-turn observe/reason/act cycle and measure structured coherence.
Each turn must return valid JSON with required fields.
"""
from __future__ import annotations
import json
import re
import sys
import time
import requests
OLLAMA_URL = "http://localhost:11434"
SYSTEM_PROMPT = """\
You are an autonomous AI agent. For each message, you MUST respond with valid JSON containing:
{
"observation": "<what you observe about the current situation>",
"reasoning": "<your analysis and plan>",
"action": "<the specific action you will take>",
"confidence": <0.0-1.0>
}
Respond ONLY with the JSON object. No other text.
"""
TURNS = [
"You are monitoring a web server. CPU usage just spiked to 95%. What do you observe, reason, and do?",
"Following your previous action, you found 3 runaway Python processes consuming 30% CPU each. Continue.",
"You killed the top 2 processes. CPU is now at 45%. A new alert: disk I/O is at 98%. Continue.",
"You traced the disk I/O to a log rotation script that's stuck. You terminated it. Disk I/O dropped to 20%. Final status check: all metrics are now nominal. Continue.",
"The incident is resolved. Write a brief post-mortem summary as your final action.",
]
REQUIRED_KEYS = {"observation", "reasoning", "action", "confidence"}
def extract_json(text: str) -> dict | None:
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if fence_match:
try:
return json.loads(fence_match.group(1))
except json.JSONDecodeError:
pass
# Try to find { ... } block
brace_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)?\}", text, re.DOTALL)
if brace_match:
try:
return json.loads(brace_match.group(0))
except json.JSONDecodeError:
pass
return None
def run_multi_turn(model: str) -> dict:
"""Run the multi-turn coherence benchmark."""
conversation = []
turn_results = []
total_time = 0.0
# Build system + turn messages using chat endpoint
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
for i, turn_prompt in enumerate(TURNS, 1):
messages.append({"role": "user", "content": turn_prompt})
start = time.time()
try:
payload = {
"model": model,
"messages": messages,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 512},
}
resp = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=120)
resp.raise_for_status()
raw = resp.json()["message"]["content"]
except Exception as exc:
elapsed = time.time() - start
turn_results.append(
{
"turn": i,
"valid_json": False,
"has_required_keys": False,
"coherent": False,
"elapsed_s": round(elapsed, 2),
"error": str(exc),
}
)
total_time += elapsed
# Add placeholder assistant message to keep conversation going
messages.append({"role": "assistant", "content": "{}"})
continue
elapsed = time.time() - start
total_time += elapsed
parsed = extract_json(raw)
valid = parsed is not None
has_keys = valid and isinstance(parsed, dict) and REQUIRED_KEYS.issubset(parsed.keys())
confidence_valid = (
has_keys
and isinstance(parsed.get("confidence"), (int, float))
and 0.0 <= parsed["confidence"] <= 1.0
)
coherent = has_keys and confidence_valid
turn_results.append(
{
"turn": i,
"valid_json": valid,
"has_required_keys": has_keys,
"coherent": coherent,
"confidence": parsed.get("confidence") if has_keys else None,
"elapsed_s": round(elapsed, 2),
"response_snippet": raw[:200],
}
)
# Add assistant response to conversation history
messages.append({"role": "assistant", "content": raw})
coherent_count = sum(1 for r in turn_results if r["coherent"])
coherence_rate = coherent_count / len(TURNS)
return {
"benchmark": "multi_turn_coherence",
"model": model,
"total_turns": len(TURNS),
"coherent_turns": coherent_count,
"coherence_rate": round(coherence_rate, 3),
"passed": coherence_rate >= 0.80,
"total_time_s": round(total_time, 2),
"turns": turn_results,
}
if __name__ == "__main__":
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
print(f"Running multi-turn coherence benchmark against {model}...")
result = run_multi_turn(model)
print(json.dumps(result, indent=2))
sys.exit(0 if result["passed"] else 1)

View File

@@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""Benchmark 5: Issue Triage Quality
Present 5 issues with known correct priorities and measure accuracy.
"""
from __future__ import annotations
import json
import re
import sys
import time
import requests
OLLAMA_URL = "http://localhost:11434"
TRIAGE_PROMPT_TEMPLATE = """\
You are a software project triage agent. Assign a priority to the following issue.
Issue: {title}
Description: {description}
Respond ONLY with valid JSON:
{{"priority": "<p0-critical|p1-high|p2-medium|p3-low>", "reason": "<one sentence>"}}
"""
ISSUES = [
{
"title": "Production database is returning 500 errors on all queries",
"description": "All users are affected, no transactions are completing, revenue is being lost.",
"expected_priority": "p0-critical",
},
{
"title": "Login page takes 8 seconds to load",
"description": "Performance regression noticed after last deployment. Users are complaining but can still log in.",
"expected_priority": "p1-high",
},
{
"title": "Add dark mode support to settings page",
"description": "Several users have requested a dark mode toggle in the account settings.",
"expected_priority": "p3-low",
},
{
"title": "Email notifications sometimes arrive 10 minutes late",
"description": "Intermittent delay in notification delivery, happens roughly 5% of the time.",
"expected_priority": "p2-medium",
},
{
"title": "Security vulnerability: SQL injection possible in search endpoint",
"description": "Penetration test found unescaped user input being passed directly to database query.",
"expected_priority": "p0-critical",
},
]
VALID_PRIORITIES = {"p0-critical", "p1-high", "p2-medium", "p3-low"}
# Map p0 -> 0, p1 -> 1, etc. for fuzzy scoring (±1 level = partial credit)
PRIORITY_LEVELS = {"p0-critical": 0, "p1-high": 1, "p2-medium": 2, "p3-low": 3}
def extract_json(text: str) -> dict | None:
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
pass
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
if fence_match:
try:
return json.loads(fence_match.group(1))
except json.JSONDecodeError:
pass
brace_match = re.search(r"\{[^{}]*\}", text, re.DOTALL)
if brace_match:
try:
return json.loads(brace_match.group(0))
except json.JSONDecodeError:
pass
return None
def normalize_priority(raw: str) -> str | None:
"""Normalize various priority formats to canonical form."""
raw = raw.lower().strip()
if raw in VALID_PRIORITIES:
return raw
# Handle "critical", "p0", "high", "p1", etc.
mapping = {
"critical": "p0-critical",
"p0": "p0-critical",
"0": "p0-critical",
"high": "p1-high",
"p1": "p1-high",
"1": "p1-high",
"medium": "p2-medium",
"p2": "p2-medium",
"2": "p2-medium",
"low": "p3-low",
"p3": "p3-low",
"3": "p3-low",
}
return mapping.get(raw)
def run_prompt(model: str, prompt: str) -> str:
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 256},
}
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["response"]
def run_benchmark(model: str) -> dict:
"""Run issue triage benchmark for a single model."""
results = []
total_time = 0.0
for i, issue in enumerate(ISSUES, 1):
prompt = TRIAGE_PROMPT_TEMPLATE.format(
title=issue["title"], description=issue["description"]
)
start = time.time()
try:
raw = run_prompt(model, prompt)
elapsed = time.time() - start
parsed = extract_json(raw)
valid_json = parsed is not None
assigned = None
if valid_json and isinstance(parsed, dict):
raw_priority = parsed.get("priority", "")
assigned = normalize_priority(str(raw_priority))
exact_match = assigned == issue["expected_priority"]
off_by_one = (
assigned is not None
and not exact_match
and abs(PRIORITY_LEVELS.get(assigned, -1) - PRIORITY_LEVELS[issue["expected_priority"]]) == 1
)
results.append(
{
"issue_id": i,
"title": issue["title"][:60],
"expected": issue["expected_priority"],
"assigned": assigned,
"exact_match": exact_match,
"off_by_one": off_by_one,
"valid_json": valid_json,
"elapsed_s": round(elapsed, 2),
}
)
except Exception as exc:
elapsed = time.time() - start
results.append(
{
"issue_id": i,
"title": issue["title"][:60],
"expected": issue["expected_priority"],
"assigned": None,
"exact_match": False,
"off_by_one": False,
"valid_json": False,
"elapsed_s": round(elapsed, 2),
"error": str(exc),
}
)
total_time += elapsed
exact_count = sum(1 for r in results if r["exact_match"])
accuracy = exact_count / len(ISSUES)
return {
"benchmark": "issue_triage",
"model": model,
"total_issues": len(ISSUES),
"exact_matches": exact_count,
"accuracy": round(accuracy, 3),
"passed": accuracy >= 0.80,
"total_time_s": round(total_time, 2),
"results": results,
}
if __name__ == "__main__":
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
print(f"Running issue-triage benchmark against {model}...")
result = run_benchmark(model)
print(json.dumps(result, indent=2))
sys.exit(0 if result["passed"] else 1)

View File

@@ -0,0 +1,334 @@
#!/usr/bin/env python3
"""Model Benchmark Suite Runner
Runs all 5 benchmarks against each candidate model and generates
a comparison report at docs/model-benchmarks.md.
Usage:
python scripts/benchmarks/run_suite.py
python scripts/benchmarks/run_suite.py --models hermes3:8b qwen3.5:latest
python scripts/benchmarks/run_suite.py --output docs/model-benchmarks.md
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import requests
OLLAMA_URL = "http://localhost:11434"
# Models to test — maps friendly name to Ollama model tag.
# Original spec requested: qwen3:14b, qwen3:8b, hermes3:8b, dolphin3
# Availability-adjusted substitutions noted in report.
DEFAULT_MODELS = [
"hermes3:8b",
"qwen3.5:latest",
"qwen2.5:14b",
"llama3.2:latest",
]
BENCHMARKS_DIR = Path(__file__).parent
DOCS_DIR = Path(__file__).resolve().parent.parent.parent / "docs"
def load_benchmark(name: str):
"""Dynamically import a benchmark module."""
path = BENCHMARKS_DIR / name
module_name = Path(name).stem
spec = importlib.util.spec_from_file_location(module_name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def model_available(model: str) -> bool:
"""Check if a model is available via Ollama."""
try:
resp = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
if resp.status_code != 200:
return False
models = {m["name"] for m in resp.json().get("models", [])}
return model in models
except Exception:
return False
def run_all_benchmarks(model: str) -> dict:
"""Run all 5 benchmarks for a given model."""
benchmark_files = [
"01_tool_calling.py",
"02_code_generation.py",
"03_shell_commands.py",
"04_multi_turn_coherence.py",
"05_issue_triage.py",
]
results = {}
for fname in benchmark_files:
key = fname.replace(".py", "")
print(f" [{model}] Running {key}...", flush=True)
try:
mod = load_benchmark(fname)
start = time.time()
if key == "01_tool_calling":
result = mod.run_benchmark(model)
elif key == "02_code_generation":
result = mod.run_benchmark(model)
elif key == "03_shell_commands":
result = mod.run_benchmark(model)
elif key == "04_multi_turn_coherence":
result = mod.run_multi_turn(model)
elif key == "05_issue_triage":
result = mod.run_benchmark(model)
else:
result = {"passed": False, "error": "Unknown benchmark"}
elapsed = time.time() - start
print(
f" -> {'PASS' if result.get('passed') else 'FAIL'} ({elapsed:.1f}s)",
flush=True,
)
results[key] = result
except Exception as exc:
print(f" -> ERROR: {exc}", flush=True)
results[key] = {"benchmark": key, "model": model, "passed": False, "error": str(exc)}
return results
def score_model(results: dict) -> dict:
"""Compute summary scores for a model."""
benchmarks = list(results.values())
passed = sum(1 for b in benchmarks if b.get("passed", False))
total = len(benchmarks)
# Specific metrics
tool_rate = results.get("01_tool_calling", {}).get("compliance_rate", 0.0)
code_pass = results.get("02_code_generation", {}).get("passed", False)
shell_pass = results.get("03_shell_commands", {}).get("passed", False)
coherence = results.get("04_multi_turn_coherence", {}).get("coherence_rate", 0.0)
triage_acc = results.get("05_issue_triage", {}).get("accuracy", 0.0)
total_time = sum(
r.get("total_time_s", r.get("elapsed_s", 0.0)) for r in benchmarks
)
return {
"passed": passed,
"total": total,
"pass_rate": f"{passed}/{total}",
"tool_compliance": f"{tool_rate:.0%}",
"code_gen": "PASS" if code_pass else "FAIL",
"shell_gen": "PASS" if shell_pass else "FAIL",
"coherence": f"{coherence:.0%}",
"triage_accuracy": f"{triage_acc:.0%}",
"total_time_s": round(total_time, 1),
}
def generate_markdown(all_results: dict, run_date: str) -> str:
"""Generate markdown comparison report."""
lines = []
lines.append("# Model Benchmark Results")
lines.append("")
lines.append(f"> Generated: {run_date} ")
lines.append(f"> Ollama URL: `{OLLAMA_URL}` ")
lines.append("> Issue: [#1066](http://143.198.27.163:3000/rockachopa/Timmy-time-dashboard/issues/1066)")
lines.append("")
lines.append("## Overview")
lines.append("")
lines.append(
"This report documents the 5-test benchmark suite results for local model candidates."
)
lines.append("")
lines.append("### Model Availability vs. Spec")
lines.append("")
lines.append("| Requested | Tested Substitute | Reason |")
lines.append("|-----------|-------------------|--------|")
lines.append("| `qwen3:14b` | `qwen2.5:14b` | `qwen3:14b` not pulled locally |")
lines.append("| `qwen3:8b` | `qwen3.5:latest` | `qwen3:8b` not pulled locally |")
lines.append("| `hermes3:8b` | `hermes3:8b` | Exact match |")
lines.append("| `dolphin3` | `llama3.2:latest` | `dolphin3` not pulled locally |")
lines.append("")
# Summary table
lines.append("## Summary Comparison Table")
lines.append("")
lines.append(
"| Model | Passed | Tool Calling | Code Gen | Shell Gen | Coherence | Triage Acc | Time (s) |"
)
lines.append(
"|-------|--------|-------------|----------|-----------|-----------|------------|----------|"
)
for model, results in all_results.items():
if "error" in results and "01_tool_calling" not in results:
lines.append(f"| `{model}` | — | — | — | — | — | — | — |")
continue
s = score_model(results)
lines.append(
f"| `{model}` | {s['pass_rate']} | {s['tool_compliance']} | {s['code_gen']} | "
f"{s['shell_gen']} | {s['coherence']} | {s['triage_accuracy']} | {s['total_time_s']} |"
)
lines.append("")
# Per-model detail sections
lines.append("## Per-Model Detail")
lines.append("")
for model, results in all_results.items():
lines.append(f"### `{model}`")
lines.append("")
if "error" in results and not isinstance(results.get("error"), str):
lines.append(f"> **Error:** {results.get('error')}")
lines.append("")
continue
for bkey, bres in results.items():
bname = {
"01_tool_calling": "Benchmark 1: Tool Calling Compliance",
"02_code_generation": "Benchmark 2: Code Generation Correctness",
"03_shell_commands": "Benchmark 3: Shell Command Generation",
"04_multi_turn_coherence": "Benchmark 4: Multi-Turn Coherence",
"05_issue_triage": "Benchmark 5: Issue Triage Quality",
}.get(bkey, bkey)
status = "✅ PASS" if bres.get("passed") else "❌ FAIL"
lines.append(f"#### {bname}{status}")
lines.append("")
if bkey == "01_tool_calling":
rate = bres.get("compliance_rate", 0)
count = bres.get("valid_json_count", 0)
total = bres.get("total_prompts", 0)
lines.append(
f"- **JSON Compliance:** {count}/{total} ({rate:.0%}) — target ≥90%"
)
elif bkey == "02_code_generation":
lines.append(f"- **Result:** {bres.get('detail', bres.get('error', 'n/a'))}")
snippet = bres.get("code_snippet", "")
if snippet:
lines.append(f"- **Generated code snippet:**")
lines.append(" ```python")
for ln in snippet.splitlines()[:8]:
lines.append(f" {ln}")
lines.append(" ```")
elif bkey == "03_shell_commands":
passed = bres.get("passed_count", 0)
refused = bres.get("refused_count", 0)
total = bres.get("total_prompts", 0)
lines.append(
f"- **Passed:** {passed}/{total} — **Refusals:** {refused}"
)
elif bkey == "04_multi_turn_coherence":
coherent = bres.get("coherent_turns", 0)
total = bres.get("total_turns", 0)
rate = bres.get("coherence_rate", 0)
lines.append(
f"- **Coherent turns:** {coherent}/{total} ({rate:.0%}) — target ≥80%"
)
elif bkey == "05_issue_triage":
exact = bres.get("exact_matches", 0)
total = bres.get("total_issues", 0)
acc = bres.get("accuracy", 0)
lines.append(
f"- **Accuracy:** {exact}/{total} ({acc:.0%}) — target ≥80%"
)
elapsed = bres.get("total_time_s", bres.get("elapsed_s", 0))
lines.append(f"- **Time:** {elapsed}s")
lines.append("")
lines.append("## Raw JSON Data")
lines.append("")
lines.append("<details>")
lines.append("<summary>Click to expand full JSON results</summary>")
lines.append("")
lines.append("```json")
lines.append(json.dumps(all_results, indent=2))
lines.append("```")
lines.append("")
lines.append("</details>")
lines.append("")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run model benchmark suite")
parser.add_argument(
"--models",
nargs="+",
default=DEFAULT_MODELS,
help="Models to test",
)
parser.add_argument(
"--output",
type=Path,
default=DOCS_DIR / "model-benchmarks.md",
help="Output markdown file",
)
parser.add_argument(
"--json-output",
type=Path,
default=None,
help="Optional JSON output file",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
print(f"Model Benchmark Suite — {run_date}")
print(f"Testing {len(args.models)} model(s): {', '.join(args.models)}")
print()
all_results: dict[str, dict] = {}
for model in args.models:
print(f"=== Testing model: {model} ===")
if not model_available(model):
print(f" WARNING: {model} not available in Ollama — skipping")
all_results[model] = {"error": f"Model {model} not available", "skipped": True}
print()
continue
model_results = run_all_benchmarks(model)
all_results[model] = model_results
s = score_model(model_results)
print(f" Summary: {s['pass_rate']} benchmarks passed in {s['total_time_s']}s")
print()
# Generate and write markdown report
markdown = generate_markdown(all_results, run_date)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(markdown, encoding="utf-8")
print(f"Report written to: {args.output}")
if args.json_output:
args.json_output.write_text(json.dumps(all_results, indent=2), encoding="utf-8")
print(f"JSON data written to: {args.json_output}")
# Overall pass/fail
all_pass = all(
not r.get("skipped", False)
and all(b.get("passed", False) for b in r.values() if isinstance(b, dict))
for r in all_results.values()
)
return 0 if all_pass else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -311,14 +311,6 @@ class Settings(BaseSettings):
thinking_memory_check_every: int = 50 # check memory status every Nth thought
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
# ── Dreaming Mode ─────────────────────────────────────────────────
# When enabled, the agent replays past sessions during idle time to
# simulate alternative actions and propose behavioural rules.
dreaming_enabled: bool = True
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
# ── Gitea Integration ─────────────────────────────────────────────
# Local Gitea instance for issue tracking and self-improvement.
# These values are passed as env vars to the gitea-mcp server process.
@@ -430,6 +422,14 @@ class Settings(BaseSettings):
# Alert threshold: free disk below this triggers cleanup / alert (GB).
hermes_disk_free_min_gb: float = 10.0
# ── Energy Budget Monitoring ───────────────────────────────────────
# Enable energy budget monitoring (tracks CPU/GPU power during inference).
energy_budget_enabled: bool = True
# Watts threshold that auto-activates low power mode (on-battery only).
energy_budget_watts_threshold: float = 15.0
# Model to prefer in low power mode (smaller = more efficient).
energy_low_power_model: str = "qwen3:1b"
# ── Error Logging ─────────────────────────────────────────────────
error_log_enabled: bool = True
error_log_dir: str = "logs"

View File

@@ -37,6 +37,7 @@ from dashboard.routes.db_explorer import router as db_explorer_router
from dashboard.routes.discord import router as discord_router
from dashboard.routes.experiments import router as experiments_router
from dashboard.routes.grok import router as grok_router
from dashboard.routes.energy import router as energy_router
from dashboard.routes.health import router as health_router
from dashboard.routes.hermes import router as hermes_router
from dashboard.routes.loop_qa import router as loop_qa_router
@@ -54,11 +55,11 @@ from dashboard.routes.system import router as system_router
from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.telegram import router as telegram_router
from dashboard.routes.thinking import router as thinking_router
from dashboard.routes.self_correction import router as self_correction_router
from dashboard.routes.three_strike import router as three_strike_router
from dashboard.routes.tools import router as tools_router
from dashboard.routes.tower import router as tower_router
from dashboard.routes.voice import router as voice_router
from dashboard.routes.dreaming import router as dreaming_router
from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.world import matrix_router
from dashboard.routes.world import router as world_router
@@ -251,36 +252,6 @@ async def _loop_qa_scheduler() -> None:
await asyncio.sleep(interval)
async def _dreaming_scheduler() -> None:
"""Background task: run idle-time dreaming cycles.
When the system has been idle for ``dreaming_idle_threshold_minutes``,
the dreaming engine replays a past session and simulates alternatives.
"""
from timmy.dreaming import dreaming_engine
await asyncio.sleep(15) # Stagger after loop QA scheduler
while True:
try:
if settings.dreaming_enabled:
await asyncio.wait_for(
dreaming_engine.dream_once(),
timeout=settings.dreaming_timeout_seconds + 10,
)
except TimeoutError:
logger.warning(
"Dreaming cycle timed out after %ds",
settings.dreaming_timeout_seconds,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error("Dreaming scheduler error: %s", exc)
await asyncio.sleep(settings.dreaming_cycle_seconds)
_PRESENCE_POLL_SECONDS = 30
_PRESENCE_INITIAL_DELAY = 3
@@ -441,7 +412,6 @@ def _startup_background_tasks() -> list[asyncio.Task]:
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_dreaming_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
asyncio.create_task(_hermes_scheduler()),
@@ -582,12 +552,28 @@ async def lifespan(app: FastAPI):
except Exception:
logger.debug("Failed to register error recorder")
# Mark session start for sovereignty duration tracking
try:
from timmy.sovereignty import mark_session_start
mark_session_start()
except Exception:
logger.debug("Failed to mark sovereignty session start")
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
# Generate and commit sovereignty session report
try:
from timmy.sovereignty import generate_and_commit_report
await generate_and_commit_report()
except Exception as exc:
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
app = FastAPI(
title="Mission Control",
@@ -705,12 +691,13 @@ app.include_router(matrix_router)
app.include_router(tower_router)
app.include_router(daily_run_router)
app.include_router(hermes_router)
app.include_router(energy_router)
app.include_router(quests_router)
app.include_router(scorecards_router)
app.include_router(sovereignty_metrics_router)
app.include_router(sovereignty_ws_router)
app.include_router(three_strike_router)
app.include_router(dreaming_router)
app.include_router(self_correction_router)
@app.websocket("/ws")

View File

@@ -1,84 +0,0 @@
"""Dreaming mode dashboard routes.
GET /dreaming/api/status — JSON status of the dreaming engine
GET /dreaming/api/recent — JSON list of recent dream records
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
GET /dreaming/partial — HTMX partial: dreaming status panel
"""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from dashboard.templating import templates
from timmy.dreaming import dreaming_engine
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
@router.get("/api/status", response_class=JSONResponse)
async def dreaming_status():
"""Return current dreaming engine status as JSON."""
return dreaming_engine.get_status()
@router.get("/api/recent", response_class=JSONResponse)
async def dreaming_recent(limit: int = 10):
"""Return recent dream records as JSON."""
dreams = dreaming_engine.get_recent_dreams(limit=limit)
return [
{
"id": d.id,
"session_excerpt": d.session_excerpt[:200],
"decision_point": d.decision_point[:200],
"simulation": d.simulation,
"proposed_rule": d.proposed_rule,
"created_at": d.created_at,
}
for d in dreams
]
@router.post("/api/trigger", response_class=JSONResponse)
async def dreaming_trigger():
"""Manually trigger a dream cycle (bypasses idle check).
Useful for testing and manual inspection. Forces idle state temporarily.
"""
from datetime import UTC, datetime, timedelta
from config import settings
# Temporarily back-date last activity to appear idle
original_time = dreaming_engine._last_activity_time
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
minutes=settings.dreaming_idle_threshold_minutes + 1
)
try:
dream = await dreaming_engine.dream_once()
finally:
dreaming_engine._last_activity_time = original_time
if dream:
return {
"status": "ok",
"dream_id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
}
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
@router.get("/partial", response_class=HTMLResponse)
async def dreaming_partial(request: Request):
"""HTMX partial: dreaming status panel for the dashboard."""
status = dreaming_engine.get_status()
recent = dreaming_engine.get_recent_dreams(limit=5)
return templates.TemplateResponse(
request,
"partials/dreaming_status.html",
{"status": status, "recent_dreams": recent},
)

View File

@@ -0,0 +1,121 @@
"""Energy Budget Monitoring routes.
Exposes the energy budget monitor via REST API so the dashboard and
external tools can query power draw, efficiency scores, and toggle
low power mode.
Refs: #1009
"""
import logging
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from config import settings
from infrastructure.energy.monitor import energy_monitor
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/energy", tags=["energy"])
class LowPowerRequest(BaseModel):
"""Request body for toggling low power mode."""
enabled: bool
class InferenceEventRequest(BaseModel):
"""Request body for recording an inference event."""
model: str
tokens_per_second: float
@router.get("/status")
async def energy_status():
"""Return the current energy budget status.
Returns the live power estimate, efficiency score (010), recent
inference samples, and whether low power mode is active.
"""
if not getattr(settings, "energy_budget_enabled", True):
return {
"enabled": False,
"message": "Energy budget monitoring is disabled (ENERGY_BUDGET_ENABLED=false)",
}
report = await energy_monitor.get_report()
return {**report.to_dict(), "enabled": True}
@router.get("/report")
async def energy_report():
"""Detailed energy budget report with all recent samples.
Same as /energy/status but always includes the full sample history.
"""
if not getattr(settings, "energy_budget_enabled", True):
raise HTTPException(status_code=503, detail="Energy budget monitoring is disabled")
report = await energy_monitor.get_report()
data = report.to_dict()
# Override recent_samples to include the full window (not just last 10)
data["recent_samples"] = [
{
"timestamp": s.timestamp,
"model": s.model,
"tokens_per_second": round(s.tokens_per_second, 1),
"estimated_watts": round(s.estimated_watts, 2),
"efficiency": round(s.efficiency, 3),
"efficiency_score": round(s.efficiency_score, 2),
}
for s in list(energy_monitor._samples)
]
return {**data, "enabled": True}
@router.post("/low-power")
async def set_low_power_mode(body: LowPowerRequest):
"""Enable or disable low power mode.
In low power mode the cascade router is advised to prefer the
configured energy_low_power_model (see settings).
"""
if not getattr(settings, "energy_budget_enabled", True):
raise HTTPException(status_code=503, detail="Energy budget monitoring is disabled")
energy_monitor.set_low_power_mode(body.enabled)
low_power_model = getattr(settings, "energy_low_power_model", "qwen3:1b")
return {
"low_power_mode": body.enabled,
"preferred_model": low_power_model if body.enabled else None,
"message": (
f"Low power mode {'enabled' if body.enabled else 'disabled'}. "
+ (f"Routing to {low_power_model}." if body.enabled else "Routing restored to default.")
),
}
@router.post("/record")
async def record_inference_event(body: InferenceEventRequest):
"""Record an inference event for efficiency tracking.
Called after each LLM inference completes. Updates the rolling
efficiency score and may auto-activate low power mode if watts
exceed the configured threshold.
"""
if not getattr(settings, "energy_budget_enabled", True):
return {"recorded": False, "message": "Energy budget monitoring is disabled"}
if body.tokens_per_second <= 0:
raise HTTPException(status_code=422, detail="tokens_per_second must be positive")
sample = energy_monitor.record_inference(body.model, body.tokens_per_second)
return {
"recorded": True,
"efficiency_score": round(sample.efficiency_score, 2),
"estimated_watts": round(sample.estimated_watts, 2),
"low_power_mode": energy_monitor.low_power_mode,
}

View File

@@ -0,0 +1,58 @@
"""Self-Correction Dashboard routes.
GET /self-correction/ui — HTML dashboard
GET /self-correction/timeline — HTMX partial: recent event timeline
GET /self-correction/patterns — HTMX partial: recurring failure patterns
"""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from dashboard.templating import templates
from infrastructure.self_correction import get_corrections, get_patterns, get_stats
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/self-correction", tags=["self-correction"])
@router.get("/ui", response_class=HTMLResponse)
async def self_correction_ui(request: Request):
"""Render the Self-Correction Dashboard."""
stats = get_stats()
corrections = get_corrections(limit=20)
patterns = get_patterns(top_n=10)
return templates.TemplateResponse(
request,
"self_correction.html",
{
"stats": stats,
"corrections": corrections,
"patterns": patterns,
},
)
@router.get("/timeline", response_class=HTMLResponse)
async def self_correction_timeline(request: Request):
"""HTMX partial: recent self-correction event timeline."""
corrections = get_corrections(limit=30)
return templates.TemplateResponse(
request,
"partials/self_correction_timeline.html",
{"corrections": corrections},
)
@router.get("/patterns", response_class=HTMLResponse)
async def self_correction_patterns(request: Request):
"""HTMX partial: recurring failure patterns."""
patterns = get_patterns(top_n=10)
stats = get_stats()
return templates.TemplateResponse(
request,
"partials/self_correction_patterns.html",
{"patterns": patterns, "stats": stats},
)

View File

@@ -71,6 +71,7 @@
<a href="/spark/ui" class="mc-test-link">SPARK</a>
<a href="/memory" class="mc-test-link">MEMORY</a>
<a href="/marketplace/ui" class="mc-test-link">MARKET</a>
<a href="/self-correction/ui" class="mc-test-link">SELF-CORRECT</a>
</div>
</div>
<div class="mc-nav-dropdown">
@@ -132,6 +133,7 @@
<a href="/spark/ui" class="mc-mobile-link">SPARK</a>
<a href="/memory" class="mc-mobile-link">MEMORY</a>
<a href="/marketplace/ui" class="mc-mobile-link">MARKET</a>
<a href="/self-correction/ui" class="mc-mobile-link">SELF-CORRECT</a>
<div class="mc-mobile-section-label">AGENTS</div>
<a href="/hands" class="mc-mobile-link">HANDS</a>
<a href="/work-orders/queue" class="mc-mobile-link">WORK ORDERS</a>

View File

@@ -1,32 +0,0 @@
{% if not status.enabled %}
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
{% elif status.dreaming %}
<div class="dream-active">
<span class="dream-pulse"></span>
<span class="dream-label">DREAMING</span>
<div class="dream-summary">{{ status.current_summary }}</div>
</div>
{% elif status.idle %}
<div class="dream-idle">
<span class="dream-dot dream-dot-idle"></span>
<span class="dream-label-idle">IDLE</span>
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
</div>
{% else %}
<div class="dream-standby">
<span class="dream-dot dream-dot-standby"></span>
<span class="dream-label-standby">STANDBY</span>
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
</div>
{% endif %}
{% if recent_dreams %}
<div class="dream-history mt-2">
{% for d in recent_dreams %}
<div class="dream-record">
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
</div>
{% endfor %}
</div>
{% endif %}

View File

@@ -0,0 +1,28 @@
{% if patterns %}
<table class="mc-table w-100">
<thead>
<tr>
<th>ERROR TYPE</th>
<th class="text-center">COUNT</th>
<th class="text-center">CORRECTED</th>
<th class="text-center">FAILED</th>
<th>LAST SEEN</th>
</tr>
</thead>
<tbody>
{% for p in patterns %}
<tr>
<td class="sc-pattern-type">{{ p.error_type }}</td>
<td class="text-center">
<span class="badge {% if p.count >= 5 %}badge-error{% elif p.count >= 3 %}badge-warning{% else %}badge-info{% endif %}">{{ p.count }}</span>
</td>
<td class="text-center text-success">{{ p.success_count }}</td>
<td class="text-center {% if p.failed_count > 0 %}text-danger{% else %}text-muted{% endif %}">{{ p.failed_count }}</td>
<td class="sc-event-time">{{ p.last_seen[:16] if p.last_seen else '—' }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% else %}
<div class="text-center text-muted py-3">No patterns detected yet.</div>
{% endif %}

View File

@@ -0,0 +1,26 @@
{% if corrections %}
{% for ev in corrections %}
<div class="sc-event sc-status-{{ ev.outcome_status }}">
<div class="sc-event-header">
<span class="sc-status-badge sc-status-{{ ev.outcome_status }}">
{% if ev.outcome_status == 'success' %}&#10003; CORRECTED
{% elif ev.outcome_status == 'partial' %}&#9679; PARTIAL
{% else %}&#10007; FAILED
{% endif %}
</span>
<span class="sc-source-badge">{{ ev.source }}</span>
<span class="sc-event-time">{{ ev.created_at[:19] }}</span>
</div>
<div class="sc-event-error-type">{{ ev.error_type }}</div>
<div class="sc-event-intent"><span class="sc-label">INTENT:</span> {{ ev.original_intent[:120] }}{% if ev.original_intent | length > 120 %}&hellip;{% endif %}</div>
<div class="sc-event-error"><span class="sc-label">ERROR:</span> {{ ev.detected_error[:120] }}{% if ev.detected_error | length > 120 %}&hellip;{% endif %}</div>
<div class="sc-event-strategy"><span class="sc-label">STRATEGY:</span> {{ ev.correction_strategy[:120] }}{% if ev.correction_strategy | length > 120 %}&hellip;{% endif %}</div>
<div class="sc-event-outcome"><span class="sc-label">OUTCOME:</span> {{ ev.final_outcome[:120] }}{% if ev.final_outcome | length > 120 %}&hellip;{% endif %}</div>
{% if ev.task_id %}
<div class="sc-event-meta">task: {{ ev.task_id[:8] }}</div>
{% endif %}
</div>
{% endfor %}
{% else %}
<div class="text-center text-muted py-3">No self-correction events recorded yet.</div>
{% endif %}

View File

@@ -0,0 +1,102 @@
{% extends "base.html" %}
{% from "macros.html" import panel %}
{% block title %}Timmy Time — Self-Correction Dashboard{% endblock %}
{% block extra_styles %}{% endblock %}
{% block content %}
<div class="container-fluid py-3">
<!-- Header -->
<div class="spark-header mb-3">
<div class="spark-title">SELF-CORRECTION</div>
<div class="spark-subtitle">
Agent error detection &amp; recovery &mdash;
<span class="spark-status-val">{{ stats.total }}</span> events,
<span class="spark-status-val">{{ stats.success_rate }}%</span> correction rate,
<span class="spark-status-val">{{ stats.unique_error_types }}</span> distinct error types
</div>
</div>
<div class="row g-3">
<!-- Left column: stats + patterns -->
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
<!-- Stats panel -->
<div class="card mc-panel">
<div class="card-header mc-panel-header">// CORRECTION STATS</div>
<div class="card-body p-3">
<div class="spark-stat-grid">
<div class="spark-stat">
<span class="spark-stat-label">TOTAL</span>
<span class="spark-stat-value">{{ stats.total }}</span>
</div>
<div class="spark-stat">
<span class="spark-stat-label">CORRECTED</span>
<span class="spark-stat-value text-success">{{ stats.success_count }}</span>
</div>
<div class="spark-stat">
<span class="spark-stat-label">PARTIAL</span>
<span class="spark-stat-value text-warning">{{ stats.partial_count }}</span>
</div>
<div class="spark-stat">
<span class="spark-stat-label">FAILED</span>
<span class="spark-stat-value {% if stats.failed_count > 0 %}text-danger{% else %}text-muted{% endif %}">{{ stats.failed_count }}</span>
</div>
</div>
<div class="mt-3">
<div class="d-flex justify-content-between mb-1">
<small class="text-muted">Correction Rate</small>
<small class="{% if stats.success_rate >= 70 %}text-success{% elif stats.success_rate >= 40 %}text-warning{% else %}text-danger{% endif %}">{{ stats.success_rate }}%</small>
</div>
<div class="progress" style="height:6px;">
<div class="progress-bar {% if stats.success_rate >= 70 %}bg-success{% elif stats.success_rate >= 40 %}bg-warning{% else %}bg-danger{% endif %}"
role="progressbar"
style="width:{{ stats.success_rate }}%"
aria-valuenow="{{ stats.success_rate }}"
aria-valuemin="0"
aria-valuemax="100"></div>
</div>
</div>
</div>
</div>
<!-- Patterns panel -->
<div class="card mc-panel"
hx-get="/self-correction/patterns"
hx-trigger="load, every 60s"
hx-target="#sc-patterns-body"
hx-swap="innerHTML">
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
<span>// RECURRING PATTERNS</span>
<span class="badge badge-info">{{ patterns | length }}</span>
</div>
<div class="card-body p-0" id="sc-patterns-body">
{% include "partials/self_correction_patterns.html" %}
</div>
</div>
</div>
<!-- Right column: timeline -->
<div class="col-12 col-lg-8">
<div class="card mc-panel"
hx-get="/self-correction/timeline"
hx-trigger="load, every 30s"
hx-target="#sc-timeline-body"
hx-swap="innerHTML">
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
<span>// CORRECTION TIMELINE</span>
<span class="badge badge-info">{{ corrections | length }}</span>
</div>
<div class="card-body p-3" id="sc-timeline-body">
{% include "partials/self_correction_timeline.html" %}
</div>
</div>
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,8 @@
"""Energy Budget Monitoring — power-draw estimation for LLM inference.
Refs: #1009
"""
from infrastructure.energy.monitor import EnergyBudgetMonitor, energy_monitor
__all__ = ["EnergyBudgetMonitor", "energy_monitor"]

View File

@@ -0,0 +1,371 @@
"""Energy Budget Monitor — estimates GPU/CPU power draw during LLM inference.
Tracks estimated power consumption to optimize for "metabolic efficiency".
Three estimation strategies attempted in priority order:
1. Battery discharge via ioreg (macOS — works without sudo, on-battery only)
2. CPU utilisation proxy via sysctl hw.cpufrequency + top
3. Model-size heuristic (tokens/s × model_size_gb × 2W/GB estimate)
Energy Efficiency score (010):
efficiency = tokens_per_second / estimated_watts, normalised to 010.
Low Power Mode:
Activated manually or automatically when draw exceeds the configured
threshold. In low power mode the cascade router is advised to prefer the
configured low_power_model (e.g. qwen3:1b or similar compact model).
Refs: #1009
"""
import asyncio
import json
import logging
import subprocess
import time
from collections import deque
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# Approximate model-size lookup (GB) used for heuristic power estimate.
# Keys are lowercase substring matches against the model name.
_MODEL_SIZE_GB: dict[str, float] = {
"qwen3:1b": 0.8,
"qwen3:3b": 2.0,
"qwen3:4b": 2.5,
"qwen3:8b": 5.5,
"qwen3:14b": 9.0,
"qwen3:30b": 20.0,
"qwen3:32b": 20.0,
"llama3:8b": 5.5,
"llama3:70b": 45.0,
"mistral:7b": 4.5,
"gemma3:4b": 2.5,
"gemma3:12b": 8.0,
"gemma3:27b": 17.0,
"phi4:14b": 9.0,
}
_DEFAULT_MODEL_SIZE_GB = 5.0 # fallback when model not in table
_WATTS_PER_GB_HEURISTIC = 2.0 # rough W/GB for Apple Silicon unified memory
# Efficiency score normalisation: score 10 at this efficiency (tok/s per W).
_EFFICIENCY_SCORE_CEILING = 5.0 # tok/s per W → score 10
# Rolling window for recent samples
_HISTORY_MAXLEN = 60
@dataclass
class InferenceSample:
"""A single inference event captured by record_inference()."""
timestamp: str
model: str
tokens_per_second: float
estimated_watts: float
efficiency: float # tokens/s per watt
efficiency_score: float # 010
@dataclass
class EnergyReport:
"""Snapshot of current energy budget state."""
timestamp: str
low_power_mode: bool
current_watts: float
strategy: str # "battery", "cpu_proxy", "heuristic", "unavailable"
efficiency_score: float # 010; -1 if no inference samples yet
recent_samples: list[InferenceSample]
recommendation: str
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"timestamp": self.timestamp,
"low_power_mode": self.low_power_mode,
"current_watts": round(self.current_watts, 2),
"strategy": self.strategy,
"efficiency_score": round(self.efficiency_score, 2),
"recent_samples": [
{
"timestamp": s.timestamp,
"model": s.model,
"tokens_per_second": round(s.tokens_per_second, 1),
"estimated_watts": round(s.estimated_watts, 2),
"efficiency": round(s.efficiency, 3),
"efficiency_score": round(s.efficiency_score, 2),
}
for s in self.recent_samples
],
"recommendation": self.recommendation,
"details": self.details,
}
class EnergyBudgetMonitor:
"""Estimates power consumption and tracks LLM inference efficiency.
All blocking I/O (subprocess calls) is wrapped in asyncio.to_thread()
so the event loop is never blocked. Results are cached.
Usage::
# Record an inference event
energy_monitor.record_inference("qwen3:8b", tokens_per_second=42.0)
# Get the current report
report = await energy_monitor.get_report()
# Toggle low power mode
energy_monitor.set_low_power_mode(True)
"""
_POWER_CACHE_TTL = 10.0 # seconds between fresh power readings
def __init__(self) -> None:
self._low_power_mode: bool = False
self._samples: deque[InferenceSample] = deque(maxlen=_HISTORY_MAXLEN)
self._cached_watts: float = 0.0
self._cached_strategy: str = "unavailable"
self._cache_ts: float = 0.0
# ── Public API ────────────────────────────────────────────────────────────
@property
def low_power_mode(self) -> bool:
return self._low_power_mode
def set_low_power_mode(self, enabled: bool) -> None:
"""Enable or disable low power mode."""
self._low_power_mode = enabled
state = "enabled" if enabled else "disabled"
logger.info("Energy budget: low power mode %s", state)
def record_inference(self, model: str, tokens_per_second: float) -> InferenceSample:
"""Record an inference event for efficiency tracking.
Call this after each LLM inference completes with the model name and
measured throughput. The current power estimate is used to compute
the efficiency score.
Args:
model: Ollama model name (e.g. "qwen3:8b").
tokens_per_second: Measured decode throughput.
Returns:
The recorded InferenceSample.
"""
watts = self._cached_watts if self._cached_watts > 0 else self._estimate_watts_sync(model)
efficiency = tokens_per_second / max(watts, 0.1)
score = min(10.0, (efficiency / _EFFICIENCY_SCORE_CEILING) * 10.0)
sample = InferenceSample(
timestamp=datetime.now(UTC).isoformat(),
model=model,
tokens_per_second=tokens_per_second,
estimated_watts=watts,
efficiency=efficiency,
efficiency_score=score,
)
self._samples.append(sample)
# Auto-engage low power mode if above threshold and budget is enabled
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
if watts > threshold and not self._low_power_mode:
logger.info(
"Energy budget: %.1fW exceeds threshold %.1fW — auto-engaging low power mode",
watts,
threshold,
)
self.set_low_power_mode(True)
return sample
async def get_report(self) -> EnergyReport:
"""Return the current energy budget report.
Refreshes the power estimate if the cache is stale.
"""
await self._refresh_power_cache()
score = self._compute_mean_efficiency_score()
recommendation = self._build_recommendation(score)
return EnergyReport(
timestamp=datetime.now(UTC).isoformat(),
low_power_mode=self._low_power_mode,
current_watts=self._cached_watts,
strategy=self._cached_strategy,
efficiency_score=score,
recent_samples=list(self._samples)[-10:],
recommendation=recommendation,
details={"sample_count": len(self._samples)},
)
# ── Power estimation ──────────────────────────────────────────────────────
async def _refresh_power_cache(self) -> None:
"""Refresh the cached power reading if stale."""
now = time.monotonic()
if now - self._cache_ts < self._POWER_CACHE_TTL:
return
try:
watts, strategy = await asyncio.to_thread(self._read_power)
except Exception as exc:
logger.debug("Energy: power read failed: %s", exc)
watts, strategy = 0.0, "unavailable"
self._cached_watts = watts
self._cached_strategy = strategy
self._cache_ts = now
def _read_power(self) -> tuple[float, str]:
"""Synchronous power reading — tries strategies in priority order.
Returns:
Tuple of (watts, strategy_name).
"""
# Strategy 1: battery discharge via ioreg (on-battery Macs)
try:
watts = self._read_battery_watts()
if watts > 0:
return watts, "battery"
except Exception:
pass
# Strategy 2: CPU utilisation proxy via top
try:
cpu_pct = self._read_cpu_pct()
if cpu_pct >= 0:
# M3 Max TDP ≈ 40W; scale linearly
watts = (cpu_pct / 100.0) * 40.0
return watts, "cpu_proxy"
except Exception:
pass
# Strategy 3: heuristic from loaded model size
return 0.0, "unavailable"
def _estimate_watts_sync(self, model: str) -> float:
"""Estimate watts from model size when no live reading is available."""
size_gb = self._model_size_gb(model)
return size_gb * _WATTS_PER_GB_HEURISTIC
def _read_battery_watts(self) -> float:
"""Read instantaneous battery discharge via ioreg.
Returns watts if on battery, 0.0 if plugged in or unavailable.
Requires macOS; no sudo needed.
"""
result = subprocess.run(
["ioreg", "-r", "-c", "AppleSmartBattery", "-d", "1"],
capture_output=True,
text=True,
timeout=3,
)
amperage_ma = 0.0
voltage_mv = 0.0
is_charging = True # assume charging unless we see ExternalConnected = No
for line in result.stdout.splitlines():
stripped = line.strip()
if '"InstantAmperage"' in stripped:
try:
amperage_ma = float(stripped.split("=")[-1].strip())
except ValueError:
pass
elif '"Voltage"' in stripped:
try:
voltage_mv = float(stripped.split("=")[-1].strip())
except ValueError:
pass
elif '"ExternalConnected"' in stripped:
is_charging = "Yes" in stripped
if is_charging or voltage_mv == 0 or amperage_ma <= 0:
return 0.0
# ioreg reports amperage in mA, voltage in mV
return (abs(amperage_ma) * voltage_mv) / 1_000_000
def _read_cpu_pct(self) -> float:
"""Read CPU utilisation from macOS top.
Returns aggregate CPU% (0100), or -1.0 on failure.
"""
result = subprocess.run(
["top", "-l", "1", "-n", "0", "-stats", "cpu"],
capture_output=True,
text=True,
timeout=5,
)
for line in result.stdout.splitlines():
if "CPU usage:" in line:
# "CPU usage: 12.5% user, 8.3% sys, 79.1% idle"
parts = line.split()
try:
user = float(parts[2].rstrip("%"))
sys_ = float(parts[4].rstrip("%"))
return user + sys_
except (IndexError, ValueError):
pass
return -1.0
# ── Helpers ───────────────────────────────────────────────────────────────
@staticmethod
def _model_size_gb(model: str) -> float:
"""Look up approximate model size in GB by name substring."""
lower = model.lower()
# Exact match first
if lower in _MODEL_SIZE_GB:
return _MODEL_SIZE_GB[lower]
# Substring match
for key, size in _MODEL_SIZE_GB.items():
if key in lower:
return size
return _DEFAULT_MODEL_SIZE_GB
def _compute_mean_efficiency_score(self) -> float:
"""Mean efficiency score over recent samples, or -1 if none."""
if not self._samples:
return -1.0
recent = list(self._samples)[-10:]
return sum(s.efficiency_score for s in recent) / len(recent)
def _build_recommendation(self, score: float) -> str:
"""Generate a human-readable recommendation from the efficiency score."""
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
low_power_model = getattr(settings, "energy_low_power_model", "qwen3:1b")
if score < 0:
return "No inference data yet — run some tasks to populate efficiency metrics."
if self._low_power_mode:
return (
f"Low power mode active — routing to {low_power_model}. "
"Disable when power draw normalises."
)
if score < 3.0:
return (
f"Low efficiency (score {score:.1f}/10). "
f"Consider enabling low power mode to favour smaller models "
f"(threshold: {threshold}W)."
)
if score < 6.0:
return f"Moderate efficiency (score {score:.1f}/10). System operating normally."
return f"Good efficiency (score {score:.1f}/10). No action needed."
# Module-level singleton
energy_monitor = EnergyBudgetMonitor()

View File

@@ -0,0 +1,247 @@
"""Self-correction event logger.
Records instances where the agent detected its own errors and the steps
it took to correct them. Used by the Self-Correction Dashboard to visualise
these events and surface recurring failure patterns.
Usage::
from infrastructure.self_correction import log_self_correction, get_corrections, get_patterns
log_self_correction(
source="agentic_loop",
original_intent="Execute step 3: deploy service",
detected_error="ConnectionRefusedError: port 8080 unavailable",
correction_strategy="Retry on alternate port 8081",
final_outcome="Success on retry",
task_id="abc123",
)
"""
from __future__ import annotations
import json
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from datetime import UTC, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Database
# ---------------------------------------------------------------------------
_DB_PATH: Path | None = None
def _get_db_path() -> Path:
global _DB_PATH
if _DB_PATH is None:
from config import settings
_DB_PATH = Path(settings.repo_root) / "data" / "self_correction.db"
return _DB_PATH
@contextmanager
def _get_db() -> Generator[sqlite3.Connection, None, None]:
db_path = _get_db_path()
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS self_correction_events (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
task_id TEXT DEFAULT '',
original_intent TEXT NOT NULL,
detected_error TEXT NOT NULL,
correction_strategy TEXT NOT NULL,
final_outcome TEXT NOT NULL,
outcome_status TEXT DEFAULT 'success',
error_type TEXT DEFAULT '',
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_sc_created ON self_correction_events(created_at)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_sc_error_type ON self_correction_events(error_type)"
)
conn.commit()
yield conn
# ---------------------------------------------------------------------------
# Write
# ---------------------------------------------------------------------------
def log_self_correction(
*,
source: str,
original_intent: str,
detected_error: str,
correction_strategy: str,
final_outcome: str,
task_id: str = "",
outcome_status: str = "success",
error_type: str = "",
) -> str:
"""Record a self-correction event and return its ID.
Args:
source: Module or component that triggered the correction.
original_intent: What the agent was trying to do.
detected_error: The error or problem that was detected.
correction_strategy: How the agent attempted to correct the error.
final_outcome: What the result of the correction attempt was.
task_id: Optional task/session ID for correlation.
outcome_status: 'success', 'partial', or 'failed'.
error_type: Short category label for pattern analysis (e.g.
'ConnectionError', 'TimeoutError').
Returns:
The ID of the newly created record.
"""
event_id = str(uuid.uuid4())
if not error_type:
# Derive a simple type from the first word of the detected error
error_type = detected_error.split(":")[0].strip()[:64]
try:
with _get_db() as conn:
conn.execute(
"""
INSERT INTO self_correction_events
(id, source, task_id, original_intent, detected_error,
correction_strategy, final_outcome, outcome_status, error_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
event_id,
source,
task_id,
original_intent[:2000],
detected_error[:2000],
correction_strategy[:2000],
final_outcome[:2000],
outcome_status,
error_type,
),
)
conn.commit()
logger.info(
"Self-correction logged [%s] source=%s error_type=%s status=%s",
event_id[:8],
source,
error_type,
outcome_status,
)
except Exception as exc:
logger.warning("Failed to log self-correction event: %s", exc)
return event_id
# ---------------------------------------------------------------------------
# Read
# ---------------------------------------------------------------------------
def get_corrections(limit: int = 50) -> list[dict]:
"""Return the most recent self-correction events, newest first."""
try:
with _get_db() as conn:
rows = conn.execute(
"""
SELECT * FROM self_correction_events
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
except Exception as exc:
logger.warning("Failed to fetch self-correction events: %s", exc)
return []
def get_patterns(top_n: int = 10) -> list[dict]:
"""Return the most common recurring error types with counts.
Each entry has:
- error_type: category label
- count: total occurrences
- success_count: corrected successfully
- failed_count: correction also failed
- last_seen: ISO timestamp of most recent occurrence
"""
try:
with _get_db() as conn:
rows = conn.execute(
"""
SELECT
error_type,
COUNT(*) AS count,
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
MAX(created_at) AS last_seen
FROM self_correction_events
GROUP BY error_type
ORDER BY count DESC
LIMIT ?
""",
(top_n,),
).fetchall()
return [dict(r) for r in rows]
except Exception as exc:
logger.warning("Failed to fetch self-correction patterns: %s", exc)
return []
def get_stats() -> dict:
"""Return aggregate statistics for the summary panel."""
try:
with _get_db() as conn:
row = conn.execute(
"""
SELECT
COUNT(*) AS total,
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN outcome_status = 'partial' THEN 1 ELSE 0 END) AS partial_count,
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
COUNT(DISTINCT error_type) AS unique_error_types,
COUNT(DISTINCT source) AS sources
FROM self_correction_events
"""
).fetchone()
if row is None:
return _empty_stats()
d = dict(row)
total = d.get("total") or 0
if total:
d["success_rate"] = round((d.get("success_count") or 0) / total * 100)
else:
d["success_rate"] = 0
return d
except Exception as exc:
logger.warning("Failed to fetch self-correction stats: %s", exc)
return _empty_stats()
def _empty_stats() -> dict:
return {
"total": 0,
"success_count": 0,
"partial_count": 0,
"failed_count": 0,
"unique_error_types": 0,
"sources": 0,
"success_rate": 0,
}

View File

@@ -0,0 +1,7 @@
"""Self-coding package — Timmy's self-modification capability.
Provides the branch→edit→test→commit/revert loop that allows Timmy
to propose and apply code changes autonomously, gated by the test suite.
Main entry point: ``self_coding.self_modify.loop``
"""

View File

@@ -0,0 +1,129 @@
"""Gitea REST client — thin wrapper for PR creation and issue commenting.
Uses ``settings.gitea_url``, ``settings.gitea_token``, and
``settings.gitea_repo`` (owner/repo) from config. Degrades gracefully
when the token is absent or the server is unreachable.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class PullRequest:
"""Minimal representation of a created pull request."""
number: int
title: str
html_url: str
class GiteaClient:
"""HTTP client for Gitea's REST API v1.
All methods return structured results and never raise — errors are
logged at WARNING level and indicated via return value.
"""
def __init__(
self,
base_url: str | None = None,
token: str | None = None,
repo: str | None = None,
) -> None:
from config import settings
self._base_url = (base_url or settings.gitea_url).rstrip("/")
self._token = token or settings.gitea_token
self._repo = repo or settings.gitea_repo
# ── internal ────────────────────────────────────────────────────────────
def _headers(self) -> dict[str, str]:
return {
"Authorization": f"token {self._token}",
"Content-Type": "application/json",
}
def _api(self, path: str) -> str:
return f"{self._base_url}/api/v1/{path.lstrip('/')}"
# ── public API ───────────────────────────────────────────────────────────
def create_pull_request(
self,
title: str,
body: str,
head: str,
base: str = "main",
) -> PullRequest | None:
"""Open a pull request.
Args:
title: PR title (keep under 70 chars).
body: PR body in markdown.
head: Source branch (e.g. ``self-modify/issue-983``).
base: Target branch (default ``main``).
Returns:
A ``PullRequest`` dataclass on success, ``None`` on failure.
"""
if not self._token:
logger.warning("Gitea token not configured — skipping PR creation")
return None
try:
import requests as _requests
resp = _requests.post(
self._api(f"repos/{self._repo}/pulls"),
headers=self._headers(),
json={"title": title, "body": body, "head": head, "base": base},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
pr = PullRequest(
number=data["number"],
title=data["title"],
html_url=data["html_url"],
)
logger.info("PR #%d created: %s", pr.number, pr.html_url)
return pr
except Exception as exc:
logger.warning("Failed to create PR: %s", exc)
return None
def add_issue_comment(self, issue_number: int, body: str) -> bool:
"""Post a comment on an issue or PR.
Returns:
True on success, False on failure.
"""
if not self._token:
logger.warning("Gitea token not configured — skipping issue comment")
return False
try:
import requests as _requests
resp = _requests.post(
self._api(f"repos/{self._repo}/issues/{issue_number}/comments"),
headers=self._headers(),
json={"body": body},
timeout=15,
)
resp.raise_for_status()
logger.info("Comment posted on issue #%d", issue_number)
return True
except Exception as exc:
logger.warning("Failed to post comment on issue #%d: %s", issue_number, exc)
return False
# Module-level singleton
gitea_client = GiteaClient()

View File

@@ -0,0 +1 @@
"""Self-modification loop sub-package."""

View File

@@ -0,0 +1,301 @@
"""Self-modification loop — branch → edit → test → commit/revert.
Timmy's self-coding capability, restored after deletion in
Operation Darling Purge (commit 584eeb679e88).
## Cycle
1. **Branch** — create ``self-modify/<slug>`` from ``main``
2. **Edit** — apply the proposed change (patch string or callable)
3. **Test** — run ``pytest tests/ -x -q``; never commit on failure
4. **Commit** — stage and commit on green; revert branch on red
5. **PR** — open a Gitea pull request (requires no direct push to main)
## Guards
- Never push directly to ``main`` or ``master``
- All changes land via PR (enforced by ``_guard_branch``)
- Test gate is mandatory; ``skip_tests=True`` is for unit-test use only
- Commits only happen when ``pytest tests/ -x -q`` exits 0
## Usage::
from self_coding.self_modify.loop import SelfModifyLoop
loop = SelfModifyLoop()
result = await loop.run(
slug="add-hello-tool",
description="Add hello() convenience tool",
edit_fn=my_edit_function, # callable(repo_root: str) -> None
)
if result.success:
print(f"PR: {result.pr_url}")
else:
print(f"Failed: {result.error}")
"""
from __future__ import annotations
import logging
import subprocess
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
# Branches that must never receive direct commits
_PROTECTED_BRANCHES = frozenset({"main", "master", "develop"})
# Test command used as the commit gate
_TEST_COMMAND = ["pytest", "tests/", "-x", "-q", "--tb=short"]
# Max time (seconds) to wait for the test suite
_TEST_TIMEOUT = 300
@dataclass
class LoopResult:
"""Result from one self-modification cycle."""
success: bool
branch: str = ""
commit_sha: str = ""
pr_url: str = ""
pr_number: int = 0
test_output: str = ""
error: str = ""
elapsed_ms: float = 0.0
metadata: dict = field(default_factory=dict)
class SelfModifyLoop:
"""Orchestrate branch → edit → test → commit/revert → PR.
Args:
repo_root: Absolute path to the git repository (defaults to
``settings.repo_root``).
remote: Git remote name (default ``origin``).
base_branch: Branch to fork from and target for the PR
(default ``main``).
"""
def __init__(
self,
repo_root: str | None = None,
remote: str = "origin",
base_branch: str = "main",
) -> None:
self._repo_root = Path(repo_root or settings.repo_root)
self._remote = remote
self._base_branch = base_branch
# ── public ──────────────────────────────────────────────────────────────
async def run(
self,
slug: str,
description: str,
edit_fn: Callable[[str], None],
issue_number: int | None = None,
skip_tests: bool = False,
) -> LoopResult:
"""Execute one full self-modification cycle.
Args:
slug: Short identifier used for the branch name
(e.g. ``"add-hello-tool"``).
description: Human-readable description for commit message
and PR body.
edit_fn: Callable that receives the repo root path (str)
and applies the desired code changes in-place.
issue_number: Optional Gitea issue number to reference in PR.
skip_tests: If ``True``, skip the test gate (unit-test use
only — never use in production).
Returns:
:class:`LoopResult` describing the outcome.
"""
start = time.time()
branch = f"self-modify/{slug}"
try:
self._guard_branch(branch)
self._checkout_base()
self._create_branch(branch)
try:
edit_fn(str(self._repo_root))
except Exception as exc:
self._revert_branch(branch)
return LoopResult(
success=False,
branch=branch,
error=f"edit_fn raised: {exc}",
elapsed_ms=self._elapsed(start),
)
if not skip_tests:
test_output, passed = self._run_tests()
if not passed:
self._revert_branch(branch)
return LoopResult(
success=False,
branch=branch,
test_output=test_output,
error="Tests failed — branch reverted",
elapsed_ms=self._elapsed(start),
)
else:
test_output = "(tests skipped)"
sha = self._commit_all(description)
self._push_branch(branch)
pr = self._create_pr(
branch=branch,
description=description,
test_output=test_output,
issue_number=issue_number,
)
return LoopResult(
success=True,
branch=branch,
commit_sha=sha,
pr_url=pr.html_url if pr else "",
pr_number=pr.number if pr else 0,
test_output=test_output,
elapsed_ms=self._elapsed(start),
)
except Exception as exc:
logger.warning("Self-modify loop failed: %s", exc)
return LoopResult(
success=False,
branch=branch,
error=str(exc),
elapsed_ms=self._elapsed(start),
)
# ── private helpers ──────────────────────────────────────────────────────
@staticmethod
def _elapsed(start: float) -> float:
return (time.time() - start) * 1000
def _git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
"""Run a git command in the repo root."""
cmd = ["git", *args]
logger.debug("git %s", " ".join(args))
return subprocess.run(
cmd,
cwd=str(self._repo_root),
capture_output=True,
text=True,
check=check,
)
def _guard_branch(self, branch: str) -> None:
"""Raise if the target branch is a protected branch name."""
if branch in _PROTECTED_BRANCHES:
raise ValueError(
f"Refusing to operate on protected branch '{branch}'. "
"All self-modifications must go via PR."
)
def _checkout_base(self) -> None:
"""Checkout the base branch and pull latest."""
self._git("checkout", self._base_branch)
# Best-effort pull; ignore failures (e.g. no remote configured)
self._git("pull", self._remote, self._base_branch, check=False)
def _create_branch(self, branch: str) -> None:
"""Create and checkout a new branch, deleting an old one if needed."""
# Delete local branch if it already exists (stale prior attempt)
self._git("branch", "-D", branch, check=False)
self._git("checkout", "-b", branch)
logger.info("Created branch: %s", branch)
def _revert_branch(self, branch: str) -> None:
"""Checkout base and delete the failed branch."""
try:
self._git("checkout", self._base_branch, check=False)
self._git("branch", "-D", branch, check=False)
logger.info("Reverted and deleted branch: %s", branch)
except Exception as exc:
logger.warning("Failed to revert branch %s: %s", branch, exc)
def _run_tests(self) -> tuple[str, bool]:
"""Run the test suite. Returns (output, passed)."""
logger.info("Running test suite: %s", " ".join(_TEST_COMMAND))
try:
result = subprocess.run(
_TEST_COMMAND,
cwd=str(self._repo_root),
capture_output=True,
text=True,
timeout=_TEST_TIMEOUT,
)
output = (result.stdout + "\n" + result.stderr).strip()
passed = result.returncode == 0
logger.info(
"Test suite %s (exit %d)", "PASSED" if passed else "FAILED", result.returncode
)
return output, passed
except subprocess.TimeoutExpired:
msg = f"Test suite timed out after {_TEST_TIMEOUT}s"
logger.warning(msg)
return msg, False
except FileNotFoundError:
msg = "pytest not found on PATH"
logger.warning(msg)
return msg, False
def _commit_all(self, message: str) -> str:
"""Stage all changes and create a commit. Returns the new SHA."""
self._git("add", "-A")
self._git("commit", "-m", message)
result = self._git("rev-parse", "HEAD")
sha = result.stdout.strip()
logger.info("Committed: %s sha=%s", message[:60], sha[:12])
return sha
def _push_branch(self, branch: str) -> None:
"""Push the branch to the remote."""
self._git("push", "-u", self._remote, branch)
logger.info("Pushed branch: %s -> %s", branch, self._remote)
def _create_pr(
self,
branch: str,
description: str,
test_output: str,
issue_number: int | None,
):
"""Open a Gitea PR. Returns PullRequest or None on failure."""
from self_coding.gitea_client import GiteaClient
client = GiteaClient()
issue_ref = f"\n\nFixes #{issue_number}" if issue_number else ""
test_section = (
f"\n\n## Test results\n```\n{test_output[:2000]}\n```"
if test_output and test_output != "(tests skipped)"
else ""
)
body = (
f"## Summary\n{description}"
f"{issue_ref}"
f"{test_section}"
"\n\n🤖 Generated by Timmy's self-modification loop"
)
return client.create_pull_request(
title=f"[self-modify] {description[:60]}",
body=body,
head=branch,
base=self._base_branch,
)

View File

@@ -312,6 +312,13 @@ async def _handle_step_failure(
"adaptation": step.result[:200],
},
)
_log_self_correction(
task_id=task_id,
step_desc=step_desc,
exc=exc,
outcome=step.result,
outcome_status="success",
)
if on_progress:
await on_progress(f"[Adapted] {step_desc}", step_num, total_steps)
except Exception as adapt_exc: # broad catch intentional
@@ -325,9 +332,42 @@ async def _handle_step_failure(
duration_ms=int((time.monotonic() - step_start) * 1000),
)
)
_log_self_correction(
task_id=task_id,
step_desc=step_desc,
exc=exc,
outcome=f"Adaptation also failed: {adapt_exc}",
outcome_status="failed",
)
completed_results.append(f"Step {step_num}: FAILED")
def _log_self_correction(
*,
task_id: str,
step_desc: str,
exc: Exception,
outcome: str,
outcome_status: str,
) -> None:
"""Best-effort: log a self-correction event (never raises)."""
try:
from infrastructure.self_correction import log_self_correction
log_self_correction(
source="agentic_loop",
original_intent=step_desc,
detected_error=f"{type(exc).__name__}: {exc}",
correction_strategy="Adaptive re-plan via LLM",
final_outcome=outcome[:500],
task_id=task_id,
outcome_status=outcome_status,
error_type=type(exc).__name__,
)
except Exception as log_exc:
logger.debug("Self-correction log failed: %s", log_exc)
# ---------------------------------------------------------------------------
# Core loop
# ---------------------------------------------------------------------------

View File

@@ -1,435 +0,0 @@
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
When the dashboard has been idle for a configurable period, this engine
selects a past chat session, identifies key agent response points, and
asks the LLM to simulate alternative approaches. Insights are stored as
proposed rules that can feed the auto-crystallizer or memory system.
Usage::
from timmy.dreaming import dreaming_engine
# Run one dream cycle (called by the background scheduler)
await dreaming_engine.dream_once()
# Query recent dreams
dreams = dreaming_engine.get_recent_dreams(limit=10)
# Get current status dict for API/dashboard
status = dreaming_engine.get_status()
"""
import json
import logging
import re
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
_DEFAULT_DB = Path("data/dreams.db")
# Strip <think> tags from reasoning model output
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
# Minimum messages in a session to be worth replaying
_MIN_SESSION_MESSAGES = 3
# Gap in seconds between messages that signals a new session
_SESSION_GAP_SECONDS = 1800 # 30 minutes
@dataclass
class DreamRecord:
"""A single completed dream cycle."""
id: str
session_excerpt: str # Short excerpt from the replayed session
decision_point: str # The agent message that was re-simulated
simulation: str # The alternative response generated
proposed_rule: str # Rule extracted from the simulation
created_at: str
@contextmanager
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("""
CREATE TABLE IF NOT EXISTS dreams (
id TEXT PRIMARY KEY,
session_excerpt TEXT NOT NULL,
decision_point TEXT NOT NULL,
simulation TEXT NOT NULL,
proposed_rule TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
conn.commit()
yield conn
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
return DreamRecord(
id=row["id"],
session_excerpt=row["session_excerpt"],
decision_point=row["decision_point"],
simulation=row["simulation"],
proposed_rule=row["proposed_rule"],
created_at=row["created_at"],
)
class DreamingEngine:
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
self._db_path = db_path
self._last_activity_time: datetime = datetime.now(UTC)
self._is_dreaming: bool = False
self._current_dream_summary: str = ""
self._dreaming_agent = None # Lazy-initialised
# ── Public API ────────────────────────────────────────────────────────
def record_activity(self) -> None:
"""Reset the idle timer — call this on every user/agent interaction."""
self._last_activity_time = datetime.now(UTC)
def is_idle(self) -> bool:
"""Return True if the system has been idle long enough to start dreaming."""
threshold = settings.dreaming_idle_threshold_minutes
if threshold <= 0:
return False
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
def get_status(self) -> dict[str, Any]:
"""Return a status dict suitable for API/dashboard consumption."""
return {
"enabled": settings.dreaming_enabled,
"dreaming": self._is_dreaming,
"idle": self.is_idle(),
"current_summary": self._current_dream_summary,
"idle_minutes": int(
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
),
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
"dream_count": self.count_dreams(),
}
async def dream_once(self) -> DreamRecord | None:
"""Execute one dream cycle.
Returns the stored DreamRecord, or None if the cycle was skipped
(not idle, dreaming disabled, no suitable session, or LLM error).
"""
if not settings.dreaming_enabled:
return None
if not self.is_idle():
logger.debug(
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
settings.dreaming_idle_threshold_minutes,
)
return None
if self._is_dreaming:
logger.debug("Dreaming skipped — cycle already in progress")
return None
self._is_dreaming = True
self._current_dream_summary = "Selecting a past session…"
await self._broadcast_status()
try:
return await self._run_dream_cycle()
except Exception as exc:
logger.warning("Dream cycle failed: %s", exc)
return None
finally:
self._is_dreaming = False
self._current_dream_summary = ""
await self._broadcast_status()
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
"""Retrieve the most recent dream records."""
with _get_conn(self._db_path) as conn:
rows = conn.execute(
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
(limit,),
).fetchall()
return [_row_to_dream(r) for r in rows]
def count_dreams(self) -> int:
"""Return total number of stored dream records."""
with _get_conn(self._db_path) as conn:
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
return row["c"] if row else 0
# ── Private helpers ───────────────────────────────────────────────────
async def _run_dream_cycle(self) -> DreamRecord | None:
"""Core dream logic: select → simulate → store."""
# 1. Select a past session from the chat log
session = await self._select_session()
if not session:
logger.debug("No suitable chat session found for dreaming")
self._current_dream_summary = "No past sessions to replay"
return None
decision_point, session_excerpt = session
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}"
await self._broadcast_status()
# 2. Simulate an alternative response
simulation = await self._simulate_alternative(decision_point, session_excerpt)
if not simulation:
logger.debug("Dream simulation produced no output")
return None
# 3. Extract a proposed rule
proposed_rule = await self._extract_rule(decision_point, simulation)
# 4. Store and broadcast
dream = self._store_dream(
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
)
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
logger.info(
"Dream [%s]: replayed session, proposed rule: %s",
dream.id[:8],
proposed_rule[:80] if proposed_rule else "(none)",
)
await self._broadcast_status()
await self._broadcast_dream(dream)
return dream
async def _select_session(self) -> tuple[str, str] | None:
"""Select a past chat session and return (decision_point, session_excerpt).
Uses the SQLite chat store. Groups messages into sessions by time
gap. Picks a random session with enough messages, then selects one
agent response as the decision point.
"""
try:
from infrastructure.chat_store import DB_PATH
if not DB_PATH.exists():
return None
import asyncio
rows = await asyncio.to_thread(self._load_chat_rows)
if not rows:
return None
sessions = self._group_into_sessions(rows)
if not sessions:
return None
# Filter sessions with enough messages
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
if not valid:
return None
import random
session = random.choice(valid) # noqa: S311 (not cryptographic)
# Build a short text excerpt (last N messages)
excerpt_msgs = session[-6:]
excerpt = "\n".join(
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
)
# Find agent responses as candidate decision points
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
if not agent_msgs:
return None
decision = random.choice(agent_msgs) # noqa: S311
return decision["content"], excerpt
except Exception as exc:
logger.warning("Session selection failed: %s", exc)
return None
def _load_chat_rows(self) -> list[dict]:
"""Synchronously load chat messages from SQLite."""
from infrastructure.chat_store import DB_PATH
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT role, content, timestamp FROM chat_messages "
"ORDER BY timestamp ASC"
).fetchall()
return [dict(r) for r in rows]
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
"""Group chat rows into sessions based on time gaps."""
if not rows:
return []
sessions: list[list[dict]] = []
current: list[dict] = [rows[0]]
for prev, curr in zip(rows, rows[1:]):
try:
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
gap = (t_curr - t_prev).total_seconds()
except Exception:
gap = 0
if gap > _SESSION_GAP_SECONDS:
sessions.append(current)
current = [curr]
else:
current.append(curr)
sessions.append(current)
return sessions
async def _simulate_alternative(
self, decision_point: str, session_excerpt: str
) -> str:
"""Ask the LLM to simulate an alternative response."""
prompt = (
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
"You are replaying a past conversation and exploring what you could "
"have done differently at a key decision point.\n\n"
"PAST SESSION EXCERPT:\n"
f"{session_excerpt}\n\n"
"KEY DECISION POINT (your past response):\n"
f"{decision_point[:500]}\n\n"
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
"you could have taken at this decision point that would have been "
"more helpful, more accurate, or more efficient.\n"
"Be specific — reference the actual content of the conversation.\n"
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
"Alternative approach:"
)
raw = await self._call_agent(prompt)
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
"""Extract a proposed behaviour rule from the simulation."""
prompt = (
"Given this pair of agent responses:\n\n"
f"ORIGINAL: {decision_point[:300]}\n\n"
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
"Extract ONE concise rule (max 20 words) that captures what to do "
"differently next time. Format: 'When X, do Y instead of Z.'\n"
"Rule:"
)
raw = await self._call_agent(prompt)
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
# Keep only the first sentence/line
rule = rule.split("\n")[0].strip().rstrip(".")
return rule[:200] # Safety cap
async def _call_agent(self, prompt: str) -> str:
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
import asyncio
if self._dreaming_agent is None:
from timmy.agent import create_timmy
self._dreaming_agent = create_timmy(skip_mcp=True)
try:
async with asyncio.timeout(settings.dreaming_timeout_seconds):
run = await self._dreaming_agent.arun(prompt, stream=False)
except TimeoutError:
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
return ""
except Exception as exc:
logger.warning("Dreaming LLM call failed: %s", exc)
return ""
raw = run.content if hasattr(run, "content") else str(run)
return raw or ""
def _store_dream(
self,
*,
session_excerpt: str,
decision_point: str,
simulation: str,
proposed_rule: str,
) -> DreamRecord:
dream = DreamRecord(
id=str(uuid.uuid4()),
session_excerpt=session_excerpt,
decision_point=decision_point,
simulation=simulation,
proposed_rule=proposed_rule,
created_at=datetime.now(UTC).isoformat(),
)
with _get_conn(self._db_path) as conn:
conn.execute(
"""
INSERT INTO dreams
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
dream.id,
dream.session_excerpt,
dream.decision_point,
dream.simulation,
dream.proposed_rule,
dream.created_at,
),
)
conn.commit()
return dream
async def _broadcast_status(self) -> None:
"""Push current dreaming status via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast("dreaming_state", self.get_status())
except Exception as exc:
logger.debug("Dreaming status broadcast failed: %s", exc)
async def _broadcast_dream(self, dream: DreamRecord) -> None:
"""Push a completed dream record via WebSocket."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"dreaming_complete",
{
"id": dream.id,
"proposed_rule": dream.proposed_rule,
"simulation": dream.simulation[:200],
"created_at": dream.created_at,
},
)
except Exception as exc:
logger.debug("Dreaming complete broadcast failed: %s", exc)
# Module-level singleton
dreaming_engine = DreamingEngine()

View File

@@ -8,4 +8,23 @@ Refs: #954, #953
Three-strike detector and automation enforcement.
Refs: #962
Session reporting: auto-generates markdown scorecards at session end
and commits them to the Gitea repo for institutional memory.
Refs: #957 (Session Sovereignty Report Generator)
"""
from timmy.sovereignty.session_report import (
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
__all__ = [
"generate_report",
"commit_report",
"generate_and_commit_report",
"mark_session_start",
]

View File

@@ -0,0 +1,442 @@
"""Session Sovereignty Report Generator.
Auto-generates a sovereignty scorecard at the end of each play session
and commits it as a markdown file to the Gitea repo under
``reports/sovereignty/``.
Report contents (per issue #957):
- Session duration + game played
- Total model calls by type (VLM, LLM, TTS, API)
- Total cache/rule hits by type
- New skills crystallized (placeholder — pending skill-tracking impl)
- Sovereignty delta (change from session start → end)
- Cost breakdown (actual API spend)
- Per-layer sovereignty %: perception, decision, narration
- Trend comparison vs previous session
Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop)
"""
import base64
import json
import logging
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import httpx
from config import settings
# Optional module-level imports — degrade gracefully if unavailable at import time
try:
from timmy.session_logger import get_session_logger
except Exception: # ImportError or circular import during early startup
get_session_logger = None # type: ignore[assignment]
try:
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
except Exception:
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
get_sovereignty_store = None # type: ignore[assignment]
logger = logging.getLogger(__name__)
# Module-level session start time; set by mark_session_start()
_SESSION_START: datetime | None = None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def mark_session_start() -> None:
"""Record the session start wall-clock time.
Call once during application startup so ``generate_report()`` can
compute accurate session durations.
"""
global _SESSION_START
_SESSION_START = datetime.now(UTC)
logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat())
def generate_report(session_id: str = "dashboard") -> str:
"""Render a sovereignty scorecard as a markdown string.
Pulls from:
- ``timmy.session_logger`` — message/tool-call/error counts
- ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost,
graduation phase, and trend data
Args:
session_id: The session identifier (default: "dashboard").
Returns:
Markdown-formatted sovereignty report string.
"""
now = datetime.now(UTC)
session_start = _SESSION_START or now
duration_secs = (now - session_start).total_seconds()
session_data = _gather_session_data()
sov_data = _gather_sovereignty_data()
return _render_markdown(now, session_id, duration_secs, session_data, sov_data)
def commit_report(report_md: str, session_id: str = "dashboard") -> bool:
"""Commit a sovereignty report to the Gitea repo.
Creates or updates ``reports/sovereignty/{date}_{session_id}.md``
via the Gitea Contents API. Degrades gracefully: logs a warning
and returns ``False`` if Gitea is unreachable or misconfigured.
Args:
report_md: Markdown content to commit.
session_id: Session identifier used in the filename.
Returns:
``True`` on success, ``False`` on failure.
"""
if not settings.gitea_enabled:
logger.info("Sovereignty: Gitea disabled — skipping report commit")
return False
if not settings.gitea_token:
logger.warning("Sovereignty: no Gitea token — skipping report commit")
return False
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
file_path = f"reports/sovereignty/{date_str}_{session_id}.md"
url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}"
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
encoded_content = base64.b64encode(report_md.encode()).decode()
commit_message = (
f"report: sovereignty session {session_id} ({date_str})\n\n"
f"Auto-generated by Timmy. Refs #957"
)
payload: dict[str, Any] = {
"message": commit_message,
"content": encoded_content,
}
try:
with httpx.Client(timeout=10.0) as client:
# Fetch existing file SHA so we can update rather than create
check = client.get(url, headers=headers)
if check.status_code == 200:
existing = check.json()
payload["sha"] = existing.get("sha", "")
resp = client.put(url, headers=headers, json=payload)
resp.raise_for_status()
logger.info("Sovereignty: report committed to %s", file_path)
return True
except httpx.HTTPStatusError as exc:
logger.warning(
"Sovereignty: commit failed (HTTP %s): %s",
exc.response.status_code,
exc,
)
return False
except Exception as exc:
logger.warning("Sovereignty: commit failed: %s", exc)
return False
async def generate_and_commit_report(session_id: str = "dashboard") -> bool:
"""Generate and commit a sovereignty report for the current session.
Primary entry point — call at session end / application shutdown.
Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread``
so it does not block the event loop.
Args:
session_id: The session identifier.
Returns:
``True`` if the report was generated and committed successfully.
"""
import asyncio
try:
report_md = generate_report(session_id)
logger.info("Sovereignty: report generated (%d chars)", len(report_md))
committed = await asyncio.to_thread(commit_report, report_md, session_id)
return committed
except Exception as exc:
logger.warning("Sovereignty: report generation failed: %s", exc)
return False
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _format_duration(seconds: float) -> str:
"""Format a duration in seconds as a human-readable string."""
total = int(seconds)
hours, remainder = divmod(total, 3600)
minutes, secs = divmod(remainder, 60)
if hours:
return f"{hours}h {minutes}m {secs}s"
if minutes:
return f"{minutes}m {secs}s"
return f"{secs}s"
def _gather_session_data() -> dict[str, Any]:
"""Pull session statistics from the session logger.
Returns a dict with:
- ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors``
- ``tool_call_breakdown``: dict[tool_name, count]
"""
default: dict[str, Any] = {
"user_messages": 0,
"timmy_messages": 0,
"tool_calls": 0,
"errors": 0,
"tool_call_breakdown": {},
}
try:
if get_session_logger is None:
return default
sl = get_session_logger()
sl.flush()
# Read today's session file directly for accurate counts
if not sl.session_file.exists():
return default
entries: list[dict] = []
with open(sl.session_file) as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
tool_breakdown: dict[str, int] = {}
user_msgs = timmy_msgs = tool_calls = errors = 0
for entry in entries:
etype = entry.get("type")
if etype == "message":
if entry.get("role") == "user":
user_msgs += 1
elif entry.get("role") == "timmy":
timmy_msgs += 1
elif etype == "tool_call":
tool_calls += 1
tool_name = entry.get("tool", "unknown")
tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1
elif etype == "error":
errors += 1
return {
"user_messages": user_msgs,
"timmy_messages": timmy_msgs,
"tool_calls": tool_calls,
"errors": errors,
"tool_call_breakdown": tool_breakdown,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather session data: %s", exc)
return default
def _gather_sovereignty_data() -> dict[str, Any]:
"""Pull sovereignty metrics from the SQLite store.
Returns a dict with:
- ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()``
- ``deltas``: per-metric start/end values within recent history window
- ``previous_session``: most recent prior value for each metric
"""
try:
if get_sovereignty_store is None:
return {"metrics": {}, "deltas": {}, "previous_session": {}}
store = get_sovereignty_store()
summary = store.get_summary()
deltas: dict[str, dict[str, Any]] = {}
previous_session: dict[str, float | None] = {}
for metric_type in GRADUATION_TARGETS:
history = store.get_latest(metric_type, limit=10)
if len(history) >= 2:
deltas[metric_type] = {
"start": history[-1]["value"],
"end": history[0]["value"],
}
previous_session[metric_type] = history[1]["value"]
elif len(history) == 1:
deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]}
previous_session[metric_type] = None
else:
deltas[metric_type] = {"start": None, "end": None}
previous_session[metric_type] = None
return {
"metrics": summary,
"deltas": deltas,
"previous_session": previous_session,
}
except Exception as exc:
logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc)
return {"metrics": {}, "deltas": {}, "previous_session": {}}
def _render_markdown(
now: datetime,
session_id: str,
duration_secs: float,
session_data: dict[str, Any],
sov_data: dict[str, Any],
) -> str:
"""Assemble the full sovereignty report in markdown."""
lines: list[str] = []
# Header
lines += [
"# Sovereignty Session Report",
"",
f"**Session ID:** `{session_id}` ",
f"**Date:** {now.strftime('%Y-%m-%d')} ",
f"**Duration:** {_format_duration(duration_secs)} ",
f"**Generated:** {now.isoformat()}",
"",
"---",
"",
]
# Session activity
lines += [
"## Session Activity",
"",
"| Metric | Count |",
"|--------|-------|",
f"| User messages | {session_data['user_messages']} |",
f"| Timmy responses | {session_data['timmy_messages']} |",
f"| Tool calls | {session_data['tool_calls']} |",
f"| Errors | {session_data['errors']} |",
"",
]
tool_breakdown = session_data.get("tool_call_breakdown", {})
if tool_breakdown:
lines += ["### Model Calls by Tool", ""]
for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
lines.append(f"- `{tool_name}`: {count}")
lines.append("")
# Sovereignty scorecard
lines += [
"## Sovereignty Scorecard",
"",
"| Metric | Current | Target (graduation) | Phase |",
"|--------|---------|---------------------|-------|",
]
for metric_type, data in sov_data["metrics"].items():
current = data.get("current")
current_str = f"{current:.4f}" if current is not None else "N/A"
grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation")
grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A"
phase = data.get("phase", "unknown")
lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |")
lines += ["", "### Sovereignty Delta (This Session)", ""]
for metric_type, delta_info in sov_data.get("deltas", {}).items():
start_val = delta_info.get("start")
end_val = delta_info.get("end")
if start_val is not None and end_val is not None:
diff = end_val - start_val
sign = "+" if diff >= 0 else ""
lines.append(
f"- **{metric_type}**: {start_val:.4f}{end_val:.4f} ({sign}{diff:.4f})"
)
else:
lines.append(f"- **{metric_type}**: N/A (no data recorded)")
# Cost breakdown
lines += ["", "## Cost Breakdown", ""]
api_cost_data = sov_data["metrics"].get("api_cost", {})
current_cost = api_cost_data.get("current")
if current_cost is not None:
lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}")
else:
lines.append("- **Total API spend:** N/A (no data recorded)")
lines.append("")
# Per-layer sovereignty
lines += [
"## Per-Layer Sovereignty",
"",
"| Layer | Sovereignty % |",
"|-------|--------------|",
"| Perception (VLM) | N/A |",
"| Decision (LLM) | N/A |",
"| Narration (TTS) | N/A |",
"",
"> Per-layer tracking requires instrumented inference calls. See #957.",
"",
]
# Skills crystallized
lines += [
"## Skills Crystallized",
"",
"_Skill crystallization tracking not yet implemented. See #957._",
"",
]
# Trend vs previous session
lines += ["## Trend vs Previous Session", ""]
prev_data = sov_data.get("previous_session", {})
has_prev = any(v is not None for v in prev_data.values())
if has_prev:
lines += [
"| Metric | Previous | Current | Change |",
"|--------|----------|---------|--------|",
]
for metric_type, curr_info in sov_data["metrics"].items():
curr_val = curr_info.get("current")
prev_val = prev_data.get(metric_type)
curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A"
prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A"
if curr_val is not None and prev_val is not None:
diff = curr_val - prev_val
sign = "+" if diff >= 0 else ""
change_str = f"{sign}{diff:.4f}"
else:
change_str = "N/A"
lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |")
lines.append("")
else:
lines += ["_No previous session data available for comparison._", ""]
# Footer
lines += [
"---",
"_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_",
]
return "\n".join(lines)

View File

@@ -2549,7 +2549,6 @@
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
/* ── Voice settings ───────────────────────────────────────── */
.voice-settings-page { max-width: 600px; margin: 0 auto; }
@@ -2716,44 +2715,73 @@
margin-bottom: 0.5rem;
}
/* ═══════════════════════════════════════════════════════════════
Dreaming Mode
═══════════════════════════════════════════════════════════════ */
.dream-active {
display: flex; align-items: center; gap: 8px;
padding: 6px 0;
/* ── Self-Correction Dashboard ─────────────────────────────── */
.sc-event {
border-left: 3px solid var(--border);
padding: 0.6rem 0.8rem;
margin-bottom: 0.75rem;
background: rgba(255,255,255,0.02);
border-radius: 0 4px 4px 0;
font-size: 0.82rem;
}
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
.sc-event.sc-status-success { border-left-color: var(--green); }
.sc-event.sc-status-partial { border-left-color: var(--amber); }
.sc-event.sc-status-failed { border-left-color: var(--red); }
.dream-pulse {
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
background: var(--purple);
animation: dream-pulse 1.8s ease-in-out infinite;
.sc-event-header {
display: flex;
align-items: center;
gap: 0.5rem;
margin-bottom: 0.4rem;
flex-wrap: wrap;
}
@keyframes dream-pulse {
0%, 100% { opacity: 1; transform: scale(1); }
50% { opacity: 0.4; transform: scale(0.7); }
.sc-status-badge {
font-size: 0.68rem;
font-weight: 700;
letter-spacing: 0.06em;
padding: 0.15rem 0.45rem;
border-radius: 3px;
}
.sc-status-badge.sc-status-success { color: var(--green); background: rgba(0,255,136,0.08); }
.sc-status-badge.sc-status-partial { color: var(--amber); background: rgba(255,179,0,0.08); }
.sc-status-badge.sc-status-failed { color: var(--red); background: rgba(255,59,59,0.08); }
.dream-dot {
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
.sc-source-badge {
font-size: 0.68rem;
color: var(--purple);
background: rgba(168,85,247,0.1);
padding: 0.1rem 0.4rem;
border-radius: 3px;
}
.dream-dot-idle { background: var(--amber); }
.dream-dot-standby { background: var(--text-dim); }
.dream-idle, .dream-standby {
display: flex; align-items: center; gap: 6px; padding: 4px 0;
.sc-event-time { font-size: 0.68rem; color: var(--text-dim); margin-left: auto; }
.sc-event-error-type {
font-size: 0.72rem;
color: var(--amber);
font-weight: 600;
margin-bottom: 0.3rem;
letter-spacing: 0.04em;
}
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
.dream-record:last-child { border-bottom: none; }
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
.sc-label {
font-size: 0.65rem;
font-weight: 700;
letter-spacing: 0.06em;
color: var(--text-dim);
margin-right: 0.3rem;
}
.sc-event-intent, .sc-event-error, .sc-event-strategy, .sc-event-outcome {
color: var(--text);
margin-bottom: 0.2rem;
line-height: 1.4;
word-break: break-word;
}
.sc-event-error { color: var(--red); }
.sc-event-strategy { color: var(--text-dim); font-style: italic; }
.sc-event-outcome { color: var(--text-bright); }
.sc-event-meta { font-size: 0.68rem; color: var(--text-dim); margin-top: 0.3rem; }
.sc-pattern-type {
font-family: var(--font);
font-size: 0.8rem;
color: var(--text-bright);
word-break: break-all;
}

View File

@@ -7,6 +7,8 @@ from unittest.mock import patch
import pytest
import infrastructure.events.bus as bus_module
pytestmark = pytest.mark.unit
from infrastructure.events.bus import (
Event,
EventBus,
@@ -352,6 +354,14 @@ class TestEventBusPersistence:
events = bus.replay()
assert events == []
def test_init_persistence_db_noop_when_path_is_none(self):
"""_init_persistence_db() is a no-op when _persistence_db_path is None."""
bus = EventBus()
# _persistence_db_path is None by default; calling _init_persistence_db
# should silently return without touching the filesystem.
bus._init_persistence_db() # must not raise
assert bus._persistence_db_path is None
async def test_wal_mode_on_persistence_db(self, persistent_bus):
"""Persistence database should use WAL mode."""
conn = sqlite3.connect(str(persistent_bus._persistence_db_path))

View File

View File

@@ -0,0 +1,363 @@
"""Unit tests for the self-modification loop.
Covers:
- Protected branch guard
- Successful cycle (mocked git + tests)
- Edit function failure → branch reverted, no commit
- Test failure → branch reverted, no commit
- Gitea PR creation plumbing
- GiteaClient graceful degradation (no token, network error)
All git and subprocess calls are mocked so these run offline without
a real repo or test suite.
"""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_loop(repo_root="/tmp/fake-repo"):
"""Construct a SelfModifyLoop with a fake repo root."""
from self_coding.self_modify.loop import SelfModifyLoop
return SelfModifyLoop(repo_root=repo_root, remote="origin", base_branch="main")
def _noop_edit(repo_root: str) -> None:
"""Edit function that does nothing."""
def _failing_edit(repo_root: str) -> None:
"""Edit function that raises."""
raise RuntimeError("edit exploded")
# ---------------------------------------------------------------------------
# Guard tests (sync — no git calls needed)
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_guard_blocks_main():
loop = _make_loop()
with pytest.raises(ValueError, match="protected branch"):
loop._guard_branch("main")
@pytest.mark.unit
def test_guard_blocks_master():
loop = _make_loop()
with pytest.raises(ValueError, match="protected branch"):
loop._guard_branch("master")
@pytest.mark.unit
def test_guard_allows_feature_branch():
loop = _make_loop()
# Should not raise
loop._guard_branch("self-modify/some-feature")
@pytest.mark.unit
def test_guard_allows_self_modify_prefix():
loop = _make_loop()
loop._guard_branch("self-modify/issue-983")
# ---------------------------------------------------------------------------
# Full cycle — success path
# ---------------------------------------------------------------------------
@pytest.mark.unit
@pytest.mark.asyncio
async def test_run_success():
"""Happy path: edit succeeds, tests pass, PR created."""
loop = _make_loop()
fake_completed = MagicMock()
fake_completed.stdout = "abc1234\n"
fake_completed.returncode = 0
fake_test_result = MagicMock()
fake_test_result.stdout = "3 passed"
fake_test_result.stderr = ""
fake_test_result.returncode = 0
from self_coding.gitea_client import PullRequest as _PR
fake_pr = _PR(number=42, title="test PR", html_url="http://gitea/pr/42")
with (
patch.object(loop, "_git", return_value=fake_completed),
patch("subprocess.run", return_value=fake_test_result),
patch.object(loop, "_create_pr", return_value=fake_pr),
):
result = await loop.run(
slug="test-feature",
description="Add test feature",
edit_fn=_noop_edit,
issue_number=983,
)
assert result.success is True
assert result.branch == "self-modify/test-feature"
assert result.pr_url == "http://gitea/pr/42"
assert result.pr_number == 42
assert "3 passed" in result.test_output
@pytest.mark.unit
@pytest.mark.asyncio
async def test_run_skips_tests_when_flag_set():
"""skip_tests=True should bypass the test gate."""
loop = _make_loop()
fake_completed = MagicMock()
fake_completed.stdout = "deadbeef\n"
fake_completed.returncode = 0
with (
patch.object(loop, "_git", return_value=fake_completed),
patch.object(loop, "_create_pr", return_value=None),
patch("subprocess.run") as mock_run,
):
result = await loop.run(
slug="skip-test-feature",
description="Skip test feature",
edit_fn=_noop_edit,
skip_tests=True,
)
# subprocess.run should NOT be called for tests
mock_run.assert_not_called()
assert result.success is True
assert "(tests skipped)" in result.test_output
# ---------------------------------------------------------------------------
# Failure paths
# ---------------------------------------------------------------------------
@pytest.mark.unit
@pytest.mark.asyncio
async def test_run_reverts_on_edit_failure():
"""If edit_fn raises, the branch should be reverted and no commit made."""
loop = _make_loop()
fake_completed = MagicMock()
fake_completed.stdout = ""
fake_completed.returncode = 0
revert_called = []
def _fake_revert(branch):
revert_called.append(branch)
with (
patch.object(loop, "_git", return_value=fake_completed),
patch.object(loop, "_revert_branch", side_effect=_fake_revert),
patch.object(loop, "_commit_all") as mock_commit,
):
result = await loop.run(
slug="broken-edit",
description="This will fail",
edit_fn=_failing_edit,
skip_tests=True,
)
assert result.success is False
assert "edit exploded" in result.error
assert "self-modify/broken-edit" in revert_called
mock_commit.assert_not_called()
@pytest.mark.unit
@pytest.mark.asyncio
async def test_run_reverts_on_test_failure():
"""If tests fail, branch should be reverted and no commit made."""
loop = _make_loop()
fake_completed = MagicMock()
fake_completed.stdout = ""
fake_completed.returncode = 0
fake_test_result = MagicMock()
fake_test_result.stdout = "FAILED test_foo"
fake_test_result.stderr = "1 failed"
fake_test_result.returncode = 1
revert_called = []
def _fake_revert(branch):
revert_called.append(branch)
with (
patch.object(loop, "_git", return_value=fake_completed),
patch("subprocess.run", return_value=fake_test_result),
patch.object(loop, "_revert_branch", side_effect=_fake_revert),
patch.object(loop, "_commit_all") as mock_commit,
):
result = await loop.run(
slug="tests-will-fail",
description="This will fail tests",
edit_fn=_noop_edit,
)
assert result.success is False
assert "Tests failed" in result.error
assert "self-modify/tests-will-fail" in revert_called
mock_commit.assert_not_called()
@pytest.mark.unit
@pytest.mark.asyncio
async def test_run_slug_with_main_creates_safe_branch():
"""A slug of 'main' produces branch 'self-modify/main', which is not protected."""
loop = _make_loop()
fake_completed = MagicMock()
fake_completed.stdout = "deadbeef\n"
fake_completed.returncode = 0
# 'self-modify/main' is NOT in _PROTECTED_BRANCHES so the run should succeed
with (
patch.object(loop, "_git", return_value=fake_completed),
patch.object(loop, "_create_pr", return_value=None),
):
result = await loop.run(
slug="main",
description="try to write to self-modify/main",
edit_fn=_noop_edit,
skip_tests=True,
)
assert result.branch == "self-modify/main"
assert result.success is True
# ---------------------------------------------------------------------------
# GiteaClient tests
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_gitea_client_returns_none_without_token():
"""GiteaClient should return None gracefully when no token is set."""
from self_coding.gitea_client import GiteaClient
client = GiteaClient(base_url="http://localhost:3000", token="", repo="owner/repo")
pr = client.create_pull_request(
title="Test PR",
body="body",
head="self-modify/test",
)
assert pr is None
@pytest.mark.unit
def test_gitea_client_comment_returns_false_without_token():
"""add_issue_comment should return False gracefully when no token is set."""
from self_coding.gitea_client import GiteaClient
client = GiteaClient(base_url="http://localhost:3000", token="", repo="owner/repo")
result = client.add_issue_comment(123, "hello")
assert result is False
@pytest.mark.unit
def test_gitea_client_create_pr_handles_network_error():
"""create_pull_request should return None on network failure."""
from self_coding.gitea_client import GiteaClient
client = GiteaClient(base_url="http://localhost:3000", token="fake-token", repo="owner/repo")
mock_requests = MagicMock()
mock_requests.post.side_effect = Exception("Connection refused")
mock_requests.exceptions.ConnectionError = Exception
with patch.dict("sys.modules", {"requests": mock_requests}):
pr = client.create_pull_request(
title="Test PR",
body="body",
head="self-modify/test",
)
assert pr is None
@pytest.mark.unit
def test_gitea_client_comment_handles_network_error():
"""add_issue_comment should return False on network failure."""
from self_coding.gitea_client import GiteaClient
client = GiteaClient(base_url="http://localhost:3000", token="fake-token", repo="owner/repo")
mock_requests = MagicMock()
mock_requests.post.side_effect = Exception("Connection refused")
with patch.dict("sys.modules", {"requests": mock_requests}):
result = client.add_issue_comment(456, "hello")
assert result is False
@pytest.mark.unit
def test_gitea_client_create_pr_success():
"""create_pull_request should return a PullRequest on HTTP 201."""
from self_coding.gitea_client import GiteaClient, PullRequest
client = GiteaClient(base_url="http://localhost:3000", token="tok", repo="owner/repo")
fake_resp = MagicMock()
fake_resp.raise_for_status = MagicMock()
fake_resp.json.return_value = {
"number": 77,
"title": "Test PR",
"html_url": "http://localhost:3000/owner/repo/pulls/77",
}
mock_requests = MagicMock()
mock_requests.post.return_value = fake_resp
with patch.dict("sys.modules", {"requests": mock_requests}):
pr = client.create_pull_request("Test PR", "body", "self-modify/feat")
assert isinstance(pr, PullRequest)
assert pr.number == 77
assert pr.html_url == "http://localhost:3000/owner/repo/pulls/77"
# ---------------------------------------------------------------------------
# LoopResult dataclass
# ---------------------------------------------------------------------------
@pytest.mark.unit
def test_loop_result_defaults():
from self_coding.self_modify.loop import LoopResult
r = LoopResult(success=True)
assert r.branch == ""
assert r.commit_sha == ""
assert r.pr_url == ""
assert r.pr_number == 0
assert r.test_output == ""
assert r.error == ""
assert r.elapsed_ms == 0.0
assert r.metadata == {}
@pytest.mark.unit
def test_loop_result_failure():
from self_coding.self_modify.loop import LoopResult
r = LoopResult(success=False, error="something broke", branch="self-modify/test")
assert r.success is False
assert r.error == "something broke"

View File

@@ -0,0 +1,444 @@
"""Tests for timmy.sovereignty.session_report.
Refs: #957 (Session Sovereignty Report Generator)
"""
import base64
import json
import time
from datetime import UTC, datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
from timmy.sovereignty.session_report import (
_format_duration,
_gather_session_data,
_gather_sovereignty_data,
_render_markdown,
commit_report,
generate_and_commit_report,
generate_report,
mark_session_start,
)
# ---------------------------------------------------------------------------
# _format_duration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds_only(self):
assert _format_duration(45) == "45s"
def test_minutes_and_seconds(self):
assert _format_duration(125) == "2m 5s"
def test_hours_minutes_seconds(self):
assert _format_duration(3661) == "1h 1m 1s"
def test_zero(self):
assert _format_duration(0) == "0s"
# ---------------------------------------------------------------------------
# mark_session_start + generate_report (smoke)
# ---------------------------------------------------------------------------
class TestMarkSessionStart:
def test_sets_session_start(self):
import timmy.sovereignty.session_report as sr
sr._SESSION_START = None
mark_session_start()
assert sr._SESSION_START is not None
assert sr._SESSION_START.tzinfo == UTC
def test_idempotent_overwrite(self):
import timmy.sovereignty.session_report as sr
mark_session_start()
first = sr._SESSION_START
time.sleep(0.01)
mark_session_start()
second = sr._SESSION_START
assert second >= first
# ---------------------------------------------------------------------------
# _gather_session_data
# ---------------------------------------------------------------------------
class TestGatherSessionData:
def test_returns_defaults_when_no_file(self, tmp_path):
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = tmp_path / "nonexistent.jsonl"
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 0
assert data["timmy_messages"] == 0
assert data["tool_calls"] == 0
assert data["errors"] == 0
assert data["tool_call_breakdown"] == {}
def test_counts_entries_correctly(self, tmp_path):
session_file = tmp_path / "session_2026-03-23.jsonl"
entries = [
{"type": "message", "role": "user", "content": "hello"},
{"type": "message", "role": "timmy", "content": "hi"},
{"type": "message", "role": "user", "content": "test"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"},
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"},
{"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"},
{"type": "error", "error": "boom"},
]
with open(session_file, "w") as f:
for e in entries:
f.write(json.dumps(e) + "\n")
mock_logger = MagicMock()
mock_logger.flush.return_value = None
mock_logger.session_file = session_file
with patch(
"timmy.sovereignty.session_report.get_session_logger",
return_value=mock_logger,
):
data = _gather_session_data()
assert data["user_messages"] == 2
assert data["timmy_messages"] == 1
assert data["tool_calls"] == 3
assert data["errors"] == 1
assert data["tool_call_breakdown"]["memory_search"] == 2
assert data["tool_call_breakdown"]["shell"] == 1
def test_graceful_on_import_error(self):
with patch(
"timmy.sovereignty.session_report.get_session_logger",
side_effect=ImportError("no session_logger"),
):
data = _gather_session_data()
assert data["tool_calls"] == 0
# ---------------------------------------------------------------------------
# _gather_sovereignty_data
# ---------------------------------------------------------------------------
class TestGatherSovereigntyData:
def test_returns_empty_on_import_error(self):
with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}):
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
side_effect=ImportError("no store"),
):
data = _gather_sovereignty_data()
assert data["metrics"] == {}
assert data["deltas"] == {}
assert data["previous_session"] == {}
def test_populates_deltas_from_history(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {
"cache_hit_rate": {"current": 0.5, "phase": "week1"},
}
# get_latest returns newest-first
mock_store.get_latest.return_value = [
{"value": 0.5},
{"value": 0.3},
{"value": 0.1},
]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"cache_hit_rate": {"graduation": 0.9}},
):
data = _gather_sovereignty_data()
delta = data["deltas"].get("cache_hit_rate")
assert delta is not None
assert delta["start"] == 0.1 # oldest in window
assert delta["end"] == 0.5 # most recent
assert data["previous_session"]["cache_hit_rate"] == 0.3
def test_single_data_point_no_delta(self):
mock_store = MagicMock()
mock_store.get_summary.return_value = {}
mock_store.get_latest.return_value = [{"value": 0.4}]
with patch(
"timmy.sovereignty.session_report.get_sovereignty_store",
return_value=mock_store,
):
with patch(
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
{"api_cost": {"graduation": 0.01}},
):
data = _gather_sovereignty_data()
delta = data["deltas"]["api_cost"]
assert delta["start"] == 0.4
assert delta["end"] == 0.4
assert data["previous_session"]["api_cost"] is None
# ---------------------------------------------------------------------------
# generate_report (integration — smoke test)
# ---------------------------------------------------------------------------
class TestGenerateReport:
def _minimal_session_data(self):
return {
"user_messages": 3,
"timmy_messages": 3,
"tool_calls": 2,
"errors": 0,
"tool_call_breakdown": {"memory_search": 2},
}
def _minimal_sov_data(self):
return {
"metrics": {
"cache_hit_rate": {"current": 0.45, "phase": "week1"},
"api_cost": {"current": 0.12, "phase": "pre-start"},
},
"deltas": {
"cache_hit_rate": {"start": 0.40, "end": 0.45},
"api_cost": {"start": 0.10, "end": 0.12},
},
"previous_session": {
"cache_hit_rate": 0.40,
"api_cost": 0.10,
},
}
def test_smoke_produces_markdown(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report("test-session")
assert "# Sovereignty Session Report" in report
assert "test-session" in report
assert "## Session Activity" in report
assert "## Sovereignty Scorecard" in report
assert "## Cost Breakdown" in report
assert "## Trend vs Previous Session" in report
def test_report_contains_session_stats(self):
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=self._minimal_sov_data(),
),
):
report = generate_report()
assert "| User messages | 3 |" in report
assert "memory_search" in report
def test_report_no_previous_session(self):
sov = self._minimal_sov_data()
sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None}
with (
patch(
"timmy.sovereignty.session_report._gather_session_data",
return_value=self._minimal_session_data(),
),
patch(
"timmy.sovereignty.session_report._gather_sovereignty_data",
return_value=sov,
),
):
report = generate_report()
assert "No previous session data" in report
# ---------------------------------------------------------------------------
# commit_report
# ---------------------------------------------------------------------------
class TestCommitReport:
def test_returns_false_when_gitea_disabled(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = False
result = commit_report("# test", "dashboard")
assert result is False
def test_returns_false_when_no_token(self):
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
result = commit_report("# test", "dashboard")
assert result is False
def test_creates_file_via_put(self):
mock_response = MagicMock()
mock_response.status_code = 201
mock_response.raise_for_status.return_value = None
mock_check = MagicMock()
mock_check.status_code = 404 # file does not exist yet
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# report content", "dashboard")
assert result is True
mock_client.put.assert_called_once()
call_kwargs = mock_client.put.call_args
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
decoded = base64.b64decode(payload["content"]).decode()
assert "# report content" in decoded
def test_updates_existing_file_with_sha(self):
mock_check = MagicMock()
mock_check.status_code = 200
mock_check.json.return_value = {"sha": "abc123"}
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.return_value = mock_response
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# updated", "dashboard")
assert result is True
payload = mock_client.put.call_args.kwargs.get("json", {})
assert payload.get("sha") == "abc123"
def test_returns_false_on_http_error(self):
import httpx
mock_check = MagicMock()
mock_check.status_code = 404
mock_client = MagicMock()
mock_client.__enter__ = MagicMock(return_value=mock_client)
mock_client.__exit__ = MagicMock(return_value=False)
mock_client.get.return_value = mock_check
mock_client.put.side_effect = httpx.HTTPStatusError(
"403", request=MagicMock(), response=MagicMock(status_code=403)
)
with (
patch("timmy.sovereignty.session_report.settings") as mock_settings,
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.gitea_repo = "owner/repo"
result = commit_report("# test", "dashboard")
assert result is False
# ---------------------------------------------------------------------------
# generate_and_commit_report (async)
# ---------------------------------------------------------------------------
class TestGenerateAndCommitReport:
async def test_returns_true_on_success(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=True,
),
):
result = await generate_and_commit_report("test")
assert result is True
async def test_returns_false_when_commit_fails(self):
with (
patch(
"timmy.sovereignty.session_report.generate_report",
return_value="# mock report",
),
patch(
"timmy.sovereignty.session_report.commit_report",
return_value=False,
),
):
result = await generate_and_commit_report()
assert result is False
async def test_graceful_on_exception(self):
with patch(
"timmy.sovereignty.session_report.generate_report",
side_effect=RuntimeError("explode"),
):
result = await generate_and_commit_report()
assert result is False

View File

@@ -1,217 +0,0 @@
"""Unit tests for the Dreaming mode engine."""
import sqlite3
from contextlib import closing
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dreaming import DreamingEngine, DreamRecord, _SESSION_GAP_SECONDS
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture()
def tmp_dreams_db(tmp_path):
"""Return a temporary path for the dreams database."""
return tmp_path / "dreams.db"
@pytest.fixture()
def engine(tmp_dreams_db):
"""DreamingEngine backed by a temp database."""
return DreamingEngine(db_path=tmp_dreams_db)
@pytest.fixture()
def chat_db(tmp_path):
"""Create a minimal chat database with some messages."""
db_path = tmp_path / "chat.db"
with closing(sqlite3.connect(str(db_path))) as conn:
conn.execute("""
CREATE TABLE chat_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'browser'
)
""")
now = datetime.now(UTC)
messages = [
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
]
conn.executemany(
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
messages,
)
conn.commit()
return db_path
# ── Idle detection ─────────────────────────────────────────────────────────────
class TestIdleDetection:
def test_not_idle_immediately(self, engine):
assert engine.is_idle() is False
def test_idle_after_threshold(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is True
def test_not_idle_when_threshold_zero(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 0
assert engine.is_idle() is False
def test_record_activity_resets_timer(self, engine):
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
engine.record_activity()
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.is_idle() is False
# ── Status dict ───────────────────────────────────────────────────────────────
class TestGetStatus:
def test_status_shape(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
status = engine.get_status()
assert "enabled" in status
assert "dreaming" in status
assert "idle" in status
assert "dream_count" in status
assert "idle_minutes" in status
def test_dream_count_starts_at_zero(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
assert engine.get_status()["dream_count"] == 0
# ── Session grouping ──────────────────────────────────────────────────────────
class TestGroupIntoSessions:
def test_single_session(self, engine):
now = datetime.now(UTC)
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 1
assert len(sessions[0]) == 2
def test_splits_on_large_gap(self, engine):
now = datetime.now(UTC)
gap = _SESSION_GAP_SECONDS + 100
rows = [
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
]
sessions = engine._group_into_sessions(rows)
assert len(sessions) == 2
def test_empty_input(self, engine):
assert engine._group_into_sessions([]) == []
# ── Dream storage ─────────────────────────────────────────────────────────────
class TestDreamStorage:
def test_store_and_retrieve(self, engine):
dream = engine._store_dream(
session_excerpt="User asked about Python.",
decision_point="Python uses try/except blocks.",
simulation="I could have given a code example.",
proposed_rule="When explaining errors, include a code snippet.",
)
assert dream.id
assert dream.proposed_rule == "When explaining errors, include a code snippet."
retrieved = engine.get_recent_dreams(limit=1)
assert len(retrieved) == 1
assert retrieved[0].id == dream.id
def test_count_increments(self, engine):
assert engine.count_dreams() == 0
engine._store_dream(
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
)
assert engine.count_dreams() == 1
# ── dream_once integration ─────────────────────────────────────────────────────
class TestDreamOnce:
@pytest.mark.asyncio
async def test_skips_when_disabled(self, engine):
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = False
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_not_idle(self, engine):
engine._last_activity_time = datetime.now(UTC)
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 60
result = await engine.dream_once()
assert result is None
@pytest.mark.asyncio
async def test_skips_when_already_dreaming(self, engine):
engine._is_dreaming = True
with patch("timmy.dreaming.settings") as mock_settings:
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 0
result = await engine.dream_once()
# Reset for cleanliness
engine._is_dreaming = False
assert result is None
@pytest.mark.asyncio
async def test_dream_produces_record_when_idle(self, engine, chat_db):
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
with (
patch("timmy.dreaming.settings") as mock_settings,
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
patch("infrastructure.chat_store.DB_PATH", chat_db),
):
mock_settings.dreaming_enabled = True
mock_settings.dreaming_idle_threshold_minutes = 10
mock_settings.dreaming_timeout_seconds = 30
mock_agent.side_effect = [
"I could have provided a concrete try/except example.", # simulation
"When explaining errors, always include a runnable code snippet.", # rule
]
result = await engine.dream_once()
assert result is not None
assert isinstance(result, DreamRecord)
assert result.simulation
assert result.proposed_rule
assert engine.count_dreams() == 1

View File

@@ -0,0 +1,297 @@
"""Unit tests for the Energy Budget Monitor.
Tests power estimation strategies, inference recording, efficiency scoring,
and low power mode logic — all without real subprocesses.
Refs: #1009
"""
from unittest.mock import MagicMock, patch
import pytest
from infrastructure.energy.monitor import (
EnergyBudgetMonitor,
InferenceSample,
_DEFAULT_MODEL_SIZE_GB,
_EFFICIENCY_SCORE_CEILING,
_WATTS_PER_GB_HEURISTIC,
)
@pytest.fixture()
def monitor():
return EnergyBudgetMonitor()
# ── Model size lookup ─────────────────────────────────────────────────────────
def test_model_size_exact_match(monitor):
assert monitor._model_size_gb("qwen3:8b") == 5.5
def test_model_size_substring_match(monitor):
assert monitor._model_size_gb("some-qwen3:14b-custom") == 9.0
def test_model_size_unknown_returns_default(monitor):
assert monitor._model_size_gb("unknownmodel:99b") == _DEFAULT_MODEL_SIZE_GB
# ── Battery power reading ─────────────────────────────────────────────────────
def test_read_battery_watts_on_battery(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 2500\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = No\n'
"}"
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
with patch("subprocess.run", return_value=mock_result):
watts = monitor._read_battery_watts()
# 2500 mA * 12000 mV / 1_000_000 = 30 W
assert watts == pytest.approx(30.0, abs=0.01)
def test_read_battery_watts_plugged_in_returns_zero(monitor):
ioreg_output = (
"{\n"
' "InstantAmperage" = 1000\n'
' "Voltage" = 12000\n'
' "ExternalConnected" = Yes\n'
"}"
)
mock_result = MagicMock()
mock_result.stdout = ioreg_output
with patch("subprocess.run", return_value=mock_result):
watts = monitor._read_battery_watts()
assert watts == 0.0
def test_read_battery_watts_subprocess_failure_raises(monitor):
with patch("subprocess.run", side_effect=OSError("no ioreg")):
with pytest.raises(OSError):
monitor._read_battery_watts()
# ── CPU proxy reading ─────────────────────────────────────────────────────────
def test_read_cpu_pct_parses_top(monitor):
top_output = (
"Processes: 450 total\n"
"CPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
)
mock_result = MagicMock()
mock_result.stdout = top_output
with patch("subprocess.run", return_value=mock_result):
pct = monitor._read_cpu_pct()
assert pct == pytest.approx(24.0, abs=0.1)
def test_read_cpu_pct_no_match_returns_negative(monitor):
mock_result = MagicMock()
mock_result.stdout = "No CPU line here\n"
with patch("subprocess.run", return_value=mock_result):
pct = monitor._read_cpu_pct()
assert pct == -1.0
# ── Power strategy selection ──────────────────────────────────────────────────
def test_read_power_uses_battery_first(monitor):
with patch.object(monitor, "_read_battery_watts", return_value=25.0):
watts, strategy = monitor._read_power()
assert watts == 25.0
assert strategy == "battery"
def test_read_power_falls_back_to_cpu_proxy(monitor):
with (
patch.object(monitor, "_read_battery_watts", return_value=0.0),
patch.object(monitor, "_read_cpu_pct", return_value=50.0),
):
watts, strategy = monitor._read_power()
assert strategy == "cpu_proxy"
assert watts == pytest.approx(20.0, abs=0.1) # 50% of 40W TDP
def test_read_power_unavailable_when_both_fail(monitor):
with (
patch.object(monitor, "_read_battery_watts", side_effect=OSError),
patch.object(monitor, "_read_cpu_pct", return_value=-1.0),
):
watts, strategy = monitor._read_power()
assert strategy == "unavailable"
assert watts == 0.0
# ── Inference recording ───────────────────────────────────────────────────────
def test_record_inference_produces_sample(monitor):
monitor._cached_watts = 10.0
monitor._cache_ts = 9999999999.0 # far future — cache won't expire
sample = monitor.record_inference("qwen3:8b", tokens_per_second=40.0)
assert isinstance(sample, InferenceSample)
assert sample.model == "qwen3:8b"
assert sample.tokens_per_second == 40.0
assert sample.estimated_watts == pytest.approx(10.0)
# efficiency = 40 / 10 = 4.0 tok/s per W
assert sample.efficiency == pytest.approx(4.0)
# score = min(10, (4.0 / 5.0) * 10) = 8.0
assert sample.efficiency_score == pytest.approx(8.0)
def test_record_inference_stores_in_history(monitor):
monitor._cached_watts = 5.0
monitor._cache_ts = 9999999999.0
monitor.record_inference("qwen3:8b", 30.0)
monitor.record_inference("qwen3:14b", 20.0)
assert len(monitor._samples) == 2
def test_record_inference_auto_activates_low_power(monitor):
monitor._cached_watts = 20.0 # above default 15W threshold
monitor._cache_ts = 9999999999.0
assert not monitor.low_power_mode
monitor.record_inference("qwen3:30b", 8.0)
assert monitor.low_power_mode
def test_record_inference_no_auto_low_power_below_threshold(monitor):
monitor._cached_watts = 10.0 # below default 15W threshold
monitor._cache_ts = 9999999999.0
monitor.record_inference("qwen3:8b", 40.0)
assert not monitor.low_power_mode
# ── Efficiency score ──────────────────────────────────────────────────────────
def test_efficiency_score_caps_at_10(monitor):
monitor._cached_watts = 1.0
monitor._cache_ts = 9999999999.0
sample = monitor.record_inference("qwen3:1b", tokens_per_second=1000.0)
assert sample.efficiency_score == pytest.approx(10.0)
def test_efficiency_score_no_samples_returns_negative_one(monitor):
assert monitor._compute_mean_efficiency_score() == -1.0
def test_mean_efficiency_score_averages_last_10(monitor):
monitor._cached_watts = 10.0
monitor._cache_ts = 9999999999.0
for _ in range(15):
monitor.record_inference("qwen3:8b", tokens_per_second=25.0) # efficiency=2.5 → score=5.0
score = monitor._compute_mean_efficiency_score()
assert score == pytest.approx(5.0, abs=0.01)
# ── Low power mode ────────────────────────────────────────────────────────────
def test_set_low_power_mode_toggle(monitor):
assert not monitor.low_power_mode
monitor.set_low_power_mode(True)
assert monitor.low_power_mode
monitor.set_low_power_mode(False)
assert not monitor.low_power_mode
# ── get_report ────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_get_report_structure(monitor):
with patch.object(monitor, "_read_power", return_value=(8.0, "battery")):
report = await monitor.get_report()
assert report.timestamp
assert isinstance(report.low_power_mode, bool)
assert isinstance(report.current_watts, float)
assert report.strategy in ("battery", "cpu_proxy", "heuristic", "unavailable")
assert isinstance(report.recommendation, str)
@pytest.mark.asyncio
async def test_get_report_to_dict(monitor):
with patch.object(monitor, "_read_power", return_value=(5.0, "cpu_proxy")):
report = await monitor.get_report()
data = report.to_dict()
assert "timestamp" in data
assert "low_power_mode" in data
assert "current_watts" in data
assert "strategy" in data
assert "efficiency_score" in data
assert "recent_samples" in data
assert "recommendation" in data
@pytest.mark.asyncio
async def test_get_report_caches_power_reading(monitor):
call_count = 0
def counting_read_power():
nonlocal call_count
call_count += 1
return (10.0, "battery")
with patch.object(monitor, "_read_power", side_effect=counting_read_power):
await monitor.get_report()
await monitor.get_report()
# Cache TTL is 10s — should only call once
assert call_count == 1
# ── Recommendation text ───────────────────────────────────────────────────────
def test_recommendation_no_data(monitor):
rec = monitor._build_recommendation(-1.0)
assert "No inference data" in rec
def test_recommendation_low_power_mode(monitor):
monitor.set_low_power_mode(True)
rec = monitor._build_recommendation(2.0)
assert "Low power mode active" in rec
def test_recommendation_low_efficiency(monitor):
rec = monitor._build_recommendation(1.5)
assert "Low efficiency" in rec
def test_recommendation_good_efficiency(monitor):
rec = monitor._build_recommendation(8.0)
assert "Good efficiency" in rec

View File

@@ -0,0 +1,269 @@
"""Unit tests for infrastructure.self_correction."""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _isolated_db(tmp_path, monkeypatch):
"""Point the self-correction module at a fresh temp database per test."""
import infrastructure.self_correction as sc_mod
# Reset the cached path so each test gets a clean DB
sc_mod._DB_PATH = tmp_path / "self_correction.db"
yield
sc_mod._DB_PATH = None
# ---------------------------------------------------------------------------
# log_self_correction
# ---------------------------------------------------------------------------
class TestLogSelfCorrection:
def test_returns_event_id(self):
from infrastructure.self_correction import log_self_correction
eid = log_self_correction(
source="test",
original_intent="Do X",
detected_error="ValueError: bad input",
correction_strategy="Try Y instead",
final_outcome="Y succeeded",
)
assert isinstance(eid, str)
assert len(eid) == 36 # UUID format
def test_derives_error_type_from_error_string(self):
from infrastructure.self_correction import get_corrections, log_self_correction
log_self_correction(
source="test",
original_intent="Connect",
detected_error="ConnectionRefusedError: port 80",
correction_strategy="Use port 8080",
final_outcome="ok",
)
rows = get_corrections(limit=1)
assert rows[0]["error_type"] == "ConnectionRefusedError"
def test_explicit_error_type_preserved(self):
from infrastructure.self_correction import get_corrections, log_self_correction
log_self_correction(
source="test",
original_intent="Run task",
detected_error="Some weird error",
correction_strategy="Fix it",
final_outcome="done",
error_type="CustomError",
)
rows = get_corrections(limit=1)
assert rows[0]["error_type"] == "CustomError"
def test_task_id_stored(self):
from infrastructure.self_correction import get_corrections, log_self_correction
log_self_correction(
source="test",
original_intent="intent",
detected_error="err",
correction_strategy="strat",
final_outcome="outcome",
task_id="task-abc-123",
)
rows = get_corrections(limit=1)
assert rows[0]["task_id"] == "task-abc-123"
def test_outcome_status_stored(self):
from infrastructure.self_correction import get_corrections, log_self_correction
log_self_correction(
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
outcome_status="failed",
)
rows = get_corrections(limit=1)
assert rows[0]["outcome_status"] == "failed"
def test_long_strings_truncated(self):
from infrastructure.self_correction import get_corrections, log_self_correction
long = "x" * 3000
log_self_correction(
source="test",
original_intent=long,
detected_error=long,
correction_strategy=long,
final_outcome=long,
)
rows = get_corrections(limit=1)
assert len(rows[0]["original_intent"]) <= 2000
# ---------------------------------------------------------------------------
# get_corrections
# ---------------------------------------------------------------------------
class TestGetCorrections:
def test_empty_db_returns_empty_list(self):
from infrastructure.self_correction import get_corrections
assert get_corrections() == []
def test_returns_newest_first(self):
from infrastructure.self_correction import get_corrections, log_self_correction
for i in range(3):
log_self_correction(
source="test",
original_intent=f"intent {i}",
detected_error="err",
correction_strategy="fix",
final_outcome="done",
error_type=f"Type{i}",
)
rows = get_corrections(limit=10)
assert len(rows) == 3
# Newest first — Type2 should appear before Type0
types = [r["error_type"] for r in rows]
assert types.index("Type2") < types.index("Type0")
def test_limit_respected(self):
from infrastructure.self_correction import get_corrections, log_self_correction
for _ in range(5):
log_self_correction(
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
)
rows = get_corrections(limit=3)
assert len(rows) == 3
# ---------------------------------------------------------------------------
# get_patterns
# ---------------------------------------------------------------------------
class TestGetPatterns:
def test_empty_db_returns_empty_list(self):
from infrastructure.self_correction import get_patterns
assert get_patterns() == []
def test_counts_by_error_type(self):
from infrastructure.self_correction import get_patterns, log_self_correction
for _ in range(3):
log_self_correction(
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="TimeoutError",
)
log_self_correction(
source="test",
original_intent="i",
detected_error="e",
correction_strategy="s",
final_outcome="o",
error_type="ValueError",
)
patterns = get_patterns(top_n=10)
by_type = {p["error_type"]: p for p in patterns}
assert by_type["TimeoutError"]["count"] == 3
assert by_type["ValueError"]["count"] == 1
def test_success_vs_failed_counts(self):
from infrastructure.self_correction import get_patterns, log_self_correction
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="success",
)
log_self_correction(
source="test", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o",
error_type="Foo", outcome_status="failed",
)
patterns = get_patterns(top_n=5)
foo = next(p for p in patterns if p["error_type"] == "Foo")
assert foo["success_count"] == 1
assert foo["failed_count"] == 1
def test_ordered_by_count_desc(self):
from infrastructure.self_correction import get_patterns, log_self_correction
for _ in range(2):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Rare",
)
for _ in range(5):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", error_type="Common",
)
patterns = get_patterns(top_n=5)
assert patterns[0]["error_type"] == "Common"
# ---------------------------------------------------------------------------
# get_stats
# ---------------------------------------------------------------------------
class TestGetStats:
def test_empty_db_returns_zeroes(self):
from infrastructure.self_correction import get_stats
stats = get_stats()
assert stats["total"] == 0
assert stats["success_rate"] == 0
def test_counts_outcomes(self):
from infrastructure.self_correction import get_stats, log_self_correction
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
)
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="failed",
)
stats = get_stats()
assert stats["total"] == 2
assert stats["success_count"] == 1
assert stats["failed_count"] == 1
assert stats["success_rate"] == 50
def test_success_rate_100_when_all_succeed(self):
from infrastructure.self_correction import get_stats, log_self_correction
for _ in range(4):
log_self_correction(
source="t", original_intent="i", detected_error="e",
correction_strategy="s", final_outcome="o", outcome_status="success",
)
stats = get_stats()
assert stats["success_rate"] == 100