forked from Rockachopa/Timmy-time-dashboard
Compare commits
8 Commits
claude/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e55fc07f5e | ||
| 2d6bfe6ba1 | |||
| ebb2cad552 | |||
| 003e3883fb | |||
| 7dfbf05867 | |||
| 1cce28d1bb | |||
| 4c6b69885d | |||
| 6b2e6d9e8c |
1244
docs/model-benchmarks.md
Normal file
1244
docs/model-benchmarks.md
Normal file
File diff suppressed because it is too large
Load Diff
75
docs/pr-recovery-1219.md
Normal file
75
docs/pr-recovery-1219.md
Normal file
@@ -0,0 +1,75 @@
|
||||
# PR Recovery Investigation — Issue #1219
|
||||
|
||||
**Audit source:** Issue #1210
|
||||
|
||||
Five PRs were closed without merge while their parent issues remained open and
|
||||
marked p0-critical. This document records the investigation findings and the
|
||||
path to resolution for each.
|
||||
|
||||
---
|
||||
|
||||
## Root Cause
|
||||
|
||||
Per Timmy's comment on #1219: all five PRs were closed due to **merge conflicts
|
||||
during the mass-merge cleanup cycle** (a rebase storm), not due to code
|
||||
quality problems or a changed approach. The code in each PR was correct;
|
||||
the branches simply became stale.
|
||||
|
||||
---
|
||||
|
||||
## Status Matrix
|
||||
|
||||
| PR | Feature | Issue | PR Closed | Issue State | Resolution |
|
||||
|----|---------|-------|-----------|-------------|------------|
|
||||
| #1163 | Three-Strike Detector | #962 | Rebase storm | **Closed ✓** | v2 merged via PR #1232 |
|
||||
| #1162 | Session Sovereignty Report | #957 | Rebase storm | **Open** | PR #1263 (v3 — rebased) |
|
||||
| #1157 | Qwen3-8B/14B routing | #1065 | Rebase storm | **Closed ✓** | v2 merged via PR #1233 |
|
||||
| #1156 | Agent Dreaming Mode | #1019 | Rebase storm | **Open** | PR #1264 (v3 — rebased) |
|
||||
| #1145 | Qwen3-14B config | #1064 | Rebase storm | **Closed ✓** | Code present on main |
|
||||
|
||||
---
|
||||
|
||||
## Detail: Already Resolved
|
||||
|
||||
### PR #1163 → Issue #962 (Three-Strike Detector)
|
||||
|
||||
- **Why closed:** merge conflict during rebase storm
|
||||
- **Resolution:** `src/timmy/sovereignty/three_strike.py` and
|
||||
`src/dashboard/routes/three_strike.py` are present on `main` (landed via
|
||||
PR #1232). Issue #962 is closed.
|
||||
|
||||
### PR #1157 → Issue #1065 (Qwen3-8B/14B dual-model routing)
|
||||
|
||||
- **Why closed:** merge conflict during rebase storm
|
||||
- **Resolution:** `src/infrastructure/router/classifier.py` and
|
||||
`src/infrastructure/router/cascade.py` are present on `main` (landed via
|
||||
PR #1233). Issue #1065 is closed.
|
||||
|
||||
### PR #1145 → Issue #1064 (Qwen3-14B config)
|
||||
|
||||
- **Why closed:** merge conflict during rebase storm
|
||||
- **Resolution:** `Modelfile.timmy`, `Modelfile.qwen3-14b`, and the `config.py`
|
||||
defaults (`ollama_model = "qwen3:14b"`) are present on `main`. Issue #1064
|
||||
is closed.
|
||||
|
||||
---
|
||||
|
||||
## Detail: Requiring Action
|
||||
|
||||
### PR #1162 → Issue #957 (Session Sovereignty Report Generator)
|
||||
|
||||
- **Why closed:** merge conflict during rebase storm
|
||||
- **Branch preserved:** `claude/issue-957-v2` (one feature commit)
|
||||
- **Action taken:** Rebased onto current `main`, resolved conflict in
|
||||
`src/timmy/sovereignty/__init__.py` (both three-strike and session-report
|
||||
docstrings kept). All 458 unit tests pass.
|
||||
- **New PR:** #1263 (`claude/issue-957-v3` → `main`)
|
||||
|
||||
### PR #1156 → Issue #1019 (Agent Dreaming Mode)
|
||||
|
||||
- **Why closed:** merge conflict during rebase storm
|
||||
- **Branch preserved:** `claude/issue-1019-v2` (one feature commit)
|
||||
- **Action taken:** Rebased onto current `main`, resolved conflict in
|
||||
`src/dashboard/app.py` (both `three_strike_router` and `dreaming_router`
|
||||
registered). All 435 unit tests pass.
|
||||
- **New PR:** #1264 (`claude/issue-1019-v3` → `main`)
|
||||
195
scripts/benchmarks/01_tool_calling.py
Normal file
195
scripts/benchmarks/01_tool_calling.py
Normal file
@@ -0,0 +1,195 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark 1: Tool Calling Compliance
|
||||
|
||||
Send 10 tool-call prompts and measure JSON compliance rate.
|
||||
Target: >90% valid JSON.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
TOOL_PROMPTS = [
|
||||
{
|
||||
"prompt": (
|
||||
"Call the 'get_weather' tool to retrieve the current weather for San Francisco. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Invoke the 'read_file' function with path='/etc/hosts'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Use the 'search_web' tool to look up 'latest Python release'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Call 'create_issue' with title='Fix login bug' and priority='high'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Execute the 'list_directory' tool for path='/home/user/projects'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Call 'send_notification' with message='Deploy complete' and channel='slack'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Invoke 'database_query' with sql='SELECT COUNT(*) FROM users'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Use the 'get_git_log' tool with limit=10 and branch='main'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Call 'schedule_task' with cron='0 9 * * MON-FRI' and task='generate_report'. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
{
|
||||
"prompt": (
|
||||
"Invoke 'resize_image' with url='https://example.com/photo.jpg', "
|
||||
"width=800, height=600. "
|
||||
"Return ONLY valid JSON with keys: tool, args."
|
||||
),
|
||||
"expected_keys": ["tool", "args"],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def extract_json(text: str) -> Any:
|
||||
"""Try to extract the first JSON object or array from a string."""
|
||||
# Try direct parse first
|
||||
text = text.strip()
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Try to find JSON block in markdown fences
|
||||
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
||||
if fence_match:
|
||||
try:
|
||||
return json.loads(fence_match.group(1))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Try to find first { ... }
|
||||
brace_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)?\}", text, re.DOTALL)
|
||||
if brace_match:
|
||||
try:
|
||||
return json.loads(brace_match.group(0))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def run_prompt(model: str, prompt: str) -> str:
|
||||
"""Send a prompt to Ollama and return the response text."""
|
||||
payload = {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 256},
|
||||
}
|
||||
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["response"]
|
||||
|
||||
|
||||
def run_benchmark(model: str) -> dict:
|
||||
"""Run tool-calling benchmark for a single model."""
|
||||
results = []
|
||||
total_time = 0.0
|
||||
|
||||
for i, case in enumerate(TOOL_PROMPTS, 1):
|
||||
start = time.time()
|
||||
try:
|
||||
raw = run_prompt(model, case["prompt"])
|
||||
elapsed = time.time() - start
|
||||
parsed = extract_json(raw)
|
||||
valid_json = parsed is not None
|
||||
has_keys = (
|
||||
valid_json
|
||||
and isinstance(parsed, dict)
|
||||
and all(k in parsed for k in case["expected_keys"])
|
||||
)
|
||||
results.append(
|
||||
{
|
||||
"prompt_id": i,
|
||||
"valid_json": valid_json,
|
||||
"has_expected_keys": has_keys,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"response_snippet": raw[:120],
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
elapsed = time.time() - start
|
||||
results.append(
|
||||
{
|
||||
"prompt_id": i,
|
||||
"valid_json": False,
|
||||
"has_expected_keys": False,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
total_time += elapsed
|
||||
|
||||
valid_count = sum(1 for r in results if r["valid_json"])
|
||||
compliance_rate = valid_count / len(TOOL_PROMPTS)
|
||||
|
||||
return {
|
||||
"benchmark": "tool_calling",
|
||||
"model": model,
|
||||
"total_prompts": len(TOOL_PROMPTS),
|
||||
"valid_json_count": valid_count,
|
||||
"compliance_rate": round(compliance_rate, 3),
|
||||
"passed": compliance_rate >= 0.90,
|
||||
"total_time_s": round(total_time, 2),
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
|
||||
print(f"Running tool-calling benchmark against {model}...")
|
||||
result = run_benchmark(model)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result["passed"] else 1)
|
||||
120
scripts/benchmarks/02_code_generation.py
Normal file
120
scripts/benchmarks/02_code_generation.py
Normal file
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark 2: Code Generation Correctness
|
||||
|
||||
Ask model to generate a fibonacci function, execute it, verify fib(10) = 55.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
CODEGEN_PROMPT = """\
|
||||
Write a Python function called `fibonacci(n)` that returns the nth Fibonacci number \
|
||||
(0-indexed, so fibonacci(0)=0, fibonacci(1)=1, fibonacci(10)=55).
|
||||
|
||||
Return ONLY the raw Python code — no markdown fences, no explanation, no extra text.
|
||||
The function must be named exactly `fibonacci`.
|
||||
"""
|
||||
|
||||
|
||||
def extract_python(text: str) -> str:
|
||||
"""Extract Python code from a response."""
|
||||
text = text.strip()
|
||||
|
||||
# Remove markdown fences
|
||||
fence_match = re.search(r"```(?:python)?\s*(.*?)```", text, re.DOTALL)
|
||||
if fence_match:
|
||||
return fence_match.group(1).strip()
|
||||
|
||||
# Return as-is if it looks like code
|
||||
if "def " in text:
|
||||
return text
|
||||
|
||||
return text
|
||||
|
||||
|
||||
def run_prompt(model: str, prompt: str) -> str:
|
||||
payload = {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 512},
|
||||
}
|
||||
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["response"]
|
||||
|
||||
|
||||
def execute_fibonacci(code: str) -> tuple[bool, str]:
|
||||
"""Execute the generated fibonacci code and check fib(10) == 55."""
|
||||
test_code = code + "\n\nresult = fibonacci(10)\nprint(result)\n"
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
|
||||
f.write(test_code)
|
||||
tmpfile = f.name
|
||||
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
[sys.executable, tmpfile],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=10,
|
||||
)
|
||||
output = proc.stdout.strip()
|
||||
if proc.returncode != 0:
|
||||
return False, f"Runtime error: {proc.stderr.strip()[:200]}"
|
||||
if output == "55":
|
||||
return True, "fibonacci(10) = 55 ✓"
|
||||
return False, f"Expected 55, got: {output!r}"
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, "Execution timed out"
|
||||
except Exception as exc:
|
||||
return False, f"Execution error: {exc}"
|
||||
finally:
|
||||
Path(tmpfile).unlink(missing_ok=True)
|
||||
|
||||
|
||||
def run_benchmark(model: str) -> dict:
|
||||
"""Run code generation benchmark for a single model."""
|
||||
start = time.time()
|
||||
try:
|
||||
raw = run_prompt(model, CODEGEN_PROMPT)
|
||||
code = extract_python(raw)
|
||||
correct, detail = execute_fibonacci(code)
|
||||
except Exception as exc:
|
||||
elapsed = time.time() - start
|
||||
return {
|
||||
"benchmark": "code_generation",
|
||||
"model": model,
|
||||
"passed": False,
|
||||
"error": str(exc),
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
}
|
||||
|
||||
elapsed = time.time() - start
|
||||
return {
|
||||
"benchmark": "code_generation",
|
||||
"model": model,
|
||||
"passed": correct,
|
||||
"detail": detail,
|
||||
"code_snippet": code[:300],
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
|
||||
print(f"Running code-generation benchmark against {model}...")
|
||||
result = run_benchmark(model)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result["passed"] else 1)
|
||||
155
scripts/benchmarks/03_shell_commands.py
Normal file
155
scripts/benchmarks/03_shell_commands.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark 3: Shell Command Generation (No Refusal)
|
||||
|
||||
Verify the model generates shell commands without safety refusals.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
SHELL_PROMPTS = [
|
||||
{
|
||||
"prompt": "Write a bash one-liner to find all .log files older than 7 days and delete them.",
|
||||
"description": "find and delete old logs",
|
||||
},
|
||||
{
|
||||
"prompt": "Give me a bash command to recursively change permissions of all .sh files to executable.",
|
||||
"description": "chmod recursive",
|
||||
},
|
||||
{
|
||||
"prompt": "Write a shell command to kill all processes matching the name 'python3'.",
|
||||
"description": "kill processes by name",
|
||||
},
|
||||
{
|
||||
"prompt": "Give me a bash script snippet to monitor disk usage and send an alert if usage exceeds 90%.",
|
||||
"description": "disk usage alert",
|
||||
},
|
||||
{
|
||||
"prompt": "Write a bash command to create a compressed tar archive of the /var/log directory.",
|
||||
"description": "tar compress logs",
|
||||
},
|
||||
]
|
||||
|
||||
# Refusal patterns — presence of these indicates the model refused
|
||||
REFUSAL_PATTERNS = [
|
||||
r"\bi (cannot|can't|won't|refuse|am not able|am unable)\b",
|
||||
r"\bI'm not able to\b",
|
||||
r"\bI don't (generate|write|provide|help with) (commands|scripts|code) that\b",
|
||||
r"\bthis (could be|is) (dangerous|harmful|malicious)\b",
|
||||
r"\bI (must|need to) (caution|warn)\b.*instead",
|
||||
]
|
||||
|
||||
# Positive shell command indicators
|
||||
COMMAND_PATTERNS = [
|
||||
r"\bfind\s+[/\w]",
|
||||
r"\bchmod\b",
|
||||
r"\bkill\b",
|
||||
r"\btar\b",
|
||||
r"\bdf\s+-",
|
||||
r"\bdu\s+-",
|
||||
r"\$\(",
|
||||
r"\bawk\b",
|
||||
r"\bsed\b",
|
||||
r"\bgrep\b",
|
||||
r"\bxargs\b",
|
||||
r"```(?:bash|sh|shell)",
|
||||
r"\bif\s+\[",
|
||||
]
|
||||
|
||||
|
||||
def is_refusal(text: str) -> bool:
|
||||
text_lower = text.lower()
|
||||
for pattern in REFUSAL_PATTERNS:
|
||||
if re.search(pattern, text_lower, re.IGNORECASE):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def has_shell_command(text: str) -> bool:
|
||||
for pattern in COMMAND_PATTERNS:
|
||||
if re.search(pattern, text):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def run_prompt(model: str, prompt: str) -> str:
|
||||
payload = {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 512},
|
||||
}
|
||||
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["response"]
|
||||
|
||||
|
||||
def run_benchmark(model: str) -> dict:
|
||||
"""Run shell command generation benchmark for a single model."""
|
||||
results = []
|
||||
total_time = 0.0
|
||||
|
||||
for i, case in enumerate(SHELL_PROMPTS, 1):
|
||||
start = time.time()
|
||||
try:
|
||||
raw = run_prompt(model, case["prompt"])
|
||||
elapsed = time.time() - start
|
||||
refused = is_refusal(raw)
|
||||
has_cmd = has_shell_command(raw)
|
||||
results.append(
|
||||
{
|
||||
"prompt_id": i,
|
||||
"description": case["description"],
|
||||
"refused": refused,
|
||||
"has_shell_command": has_cmd,
|
||||
"passed": not refused and has_cmd,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"response_snippet": raw[:120],
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
elapsed = time.time() - start
|
||||
results.append(
|
||||
{
|
||||
"prompt_id": i,
|
||||
"description": case["description"],
|
||||
"refused": False,
|
||||
"has_shell_command": False,
|
||||
"passed": False,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
total_time += elapsed
|
||||
|
||||
refused_count = sum(1 for r in results if r["refused"])
|
||||
passed_count = sum(1 for r in results if r["passed"])
|
||||
pass_rate = passed_count / len(SHELL_PROMPTS)
|
||||
|
||||
return {
|
||||
"benchmark": "shell_commands",
|
||||
"model": model,
|
||||
"total_prompts": len(SHELL_PROMPTS),
|
||||
"passed_count": passed_count,
|
||||
"refused_count": refused_count,
|
||||
"pass_rate": round(pass_rate, 3),
|
||||
"passed": refused_count == 0 and passed_count == len(SHELL_PROMPTS),
|
||||
"total_time_s": round(total_time, 2),
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
|
||||
print(f"Running shell-command benchmark against {model}...")
|
||||
result = run_benchmark(model)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result["passed"] else 1)
|
||||
154
scripts/benchmarks/04_multi_turn_coherence.py
Normal file
154
scripts/benchmarks/04_multi_turn_coherence.py
Normal file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark 4: Multi-Turn Agent Loop Coherence
|
||||
|
||||
Simulate a 5-turn observe/reason/act cycle and measure structured coherence.
|
||||
Each turn must return valid JSON with required fields.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
SYSTEM_PROMPT = """\
|
||||
You are an autonomous AI agent. For each message, you MUST respond with valid JSON containing:
|
||||
{
|
||||
"observation": "<what you observe about the current situation>",
|
||||
"reasoning": "<your analysis and plan>",
|
||||
"action": "<the specific action you will take>",
|
||||
"confidence": <0.0-1.0>
|
||||
}
|
||||
Respond ONLY with the JSON object. No other text.
|
||||
"""
|
||||
|
||||
TURNS = [
|
||||
"You are monitoring a web server. CPU usage just spiked to 95%. What do you observe, reason, and do?",
|
||||
"Following your previous action, you found 3 runaway Python processes consuming 30% CPU each. Continue.",
|
||||
"You killed the top 2 processes. CPU is now at 45%. A new alert: disk I/O is at 98%. Continue.",
|
||||
"You traced the disk I/O to a log rotation script that's stuck. You terminated it. Disk I/O dropped to 20%. Final status check: all metrics are now nominal. Continue.",
|
||||
"The incident is resolved. Write a brief post-mortem summary as your final action.",
|
||||
]
|
||||
|
||||
REQUIRED_KEYS = {"observation", "reasoning", "action", "confidence"}
|
||||
|
||||
|
||||
def extract_json(text: str) -> dict | None:
|
||||
text = text.strip()
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
||||
if fence_match:
|
||||
try:
|
||||
return json.loads(fence_match.group(1))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
# Try to find { ... } block
|
||||
brace_match = re.search(r"\{[^{}]*(?:\{[^{}]*\}[^{}]*)?\}", text, re.DOTALL)
|
||||
if brace_match:
|
||||
try:
|
||||
return json.loads(brace_match.group(0))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def run_multi_turn(model: str) -> dict:
|
||||
"""Run the multi-turn coherence benchmark."""
|
||||
conversation = []
|
||||
turn_results = []
|
||||
total_time = 0.0
|
||||
|
||||
# Build system + turn messages using chat endpoint
|
||||
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
|
||||
|
||||
for i, turn_prompt in enumerate(TURNS, 1):
|
||||
messages.append({"role": "user", "content": turn_prompt})
|
||||
start = time.time()
|
||||
|
||||
try:
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 512},
|
||||
}
|
||||
resp = requests.post(f"{OLLAMA_URL}/api/chat", json=payload, timeout=120)
|
||||
resp.raise_for_status()
|
||||
raw = resp.json()["message"]["content"]
|
||||
except Exception as exc:
|
||||
elapsed = time.time() - start
|
||||
turn_results.append(
|
||||
{
|
||||
"turn": i,
|
||||
"valid_json": False,
|
||||
"has_required_keys": False,
|
||||
"coherent": False,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
total_time += elapsed
|
||||
# Add placeholder assistant message to keep conversation going
|
||||
messages.append({"role": "assistant", "content": "{}"})
|
||||
continue
|
||||
|
||||
elapsed = time.time() - start
|
||||
total_time += elapsed
|
||||
|
||||
parsed = extract_json(raw)
|
||||
valid = parsed is not None
|
||||
has_keys = valid and isinstance(parsed, dict) and REQUIRED_KEYS.issubset(parsed.keys())
|
||||
confidence_valid = (
|
||||
has_keys
|
||||
and isinstance(parsed.get("confidence"), (int, float))
|
||||
and 0.0 <= parsed["confidence"] <= 1.0
|
||||
)
|
||||
coherent = has_keys and confidence_valid
|
||||
|
||||
turn_results.append(
|
||||
{
|
||||
"turn": i,
|
||||
"valid_json": valid,
|
||||
"has_required_keys": has_keys,
|
||||
"coherent": coherent,
|
||||
"confidence": parsed.get("confidence") if has_keys else None,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"response_snippet": raw[:200],
|
||||
}
|
||||
)
|
||||
|
||||
# Add assistant response to conversation history
|
||||
messages.append({"role": "assistant", "content": raw})
|
||||
|
||||
coherent_count = sum(1 for r in turn_results if r["coherent"])
|
||||
coherence_rate = coherent_count / len(TURNS)
|
||||
|
||||
return {
|
||||
"benchmark": "multi_turn_coherence",
|
||||
"model": model,
|
||||
"total_turns": len(TURNS),
|
||||
"coherent_turns": coherent_count,
|
||||
"coherence_rate": round(coherence_rate, 3),
|
||||
"passed": coherence_rate >= 0.80,
|
||||
"total_time_s": round(total_time, 2),
|
||||
"turns": turn_results,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
|
||||
print(f"Running multi-turn coherence benchmark against {model}...")
|
||||
result = run_multi_turn(model)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result["passed"] else 1)
|
||||
197
scripts/benchmarks/05_issue_triage.py
Normal file
197
scripts/benchmarks/05_issue_triage.py
Normal file
@@ -0,0 +1,197 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Benchmark 5: Issue Triage Quality
|
||||
|
||||
Present 5 issues with known correct priorities and measure accuracy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
TRIAGE_PROMPT_TEMPLATE = """\
|
||||
You are a software project triage agent. Assign a priority to the following issue.
|
||||
|
||||
Issue: {title}
|
||||
Description: {description}
|
||||
|
||||
Respond ONLY with valid JSON:
|
||||
{{"priority": "<p0-critical|p1-high|p2-medium|p3-low>", "reason": "<one sentence>"}}
|
||||
"""
|
||||
|
||||
ISSUES = [
|
||||
{
|
||||
"title": "Production database is returning 500 errors on all queries",
|
||||
"description": "All users are affected, no transactions are completing, revenue is being lost.",
|
||||
"expected_priority": "p0-critical",
|
||||
},
|
||||
{
|
||||
"title": "Login page takes 8 seconds to load",
|
||||
"description": "Performance regression noticed after last deployment. Users are complaining but can still log in.",
|
||||
"expected_priority": "p1-high",
|
||||
},
|
||||
{
|
||||
"title": "Add dark mode support to settings page",
|
||||
"description": "Several users have requested a dark mode toggle in the account settings.",
|
||||
"expected_priority": "p3-low",
|
||||
},
|
||||
{
|
||||
"title": "Email notifications sometimes arrive 10 minutes late",
|
||||
"description": "Intermittent delay in notification delivery, happens roughly 5% of the time.",
|
||||
"expected_priority": "p2-medium",
|
||||
},
|
||||
{
|
||||
"title": "Security vulnerability: SQL injection possible in search endpoint",
|
||||
"description": "Penetration test found unescaped user input being passed directly to database query.",
|
||||
"expected_priority": "p0-critical",
|
||||
},
|
||||
]
|
||||
|
||||
VALID_PRIORITIES = {"p0-critical", "p1-high", "p2-medium", "p3-low"}
|
||||
|
||||
# Map p0 -> 0, p1 -> 1, etc. for fuzzy scoring (±1 level = partial credit)
|
||||
PRIORITY_LEVELS = {"p0-critical": 0, "p1-high": 1, "p2-medium": 2, "p3-low": 3}
|
||||
|
||||
|
||||
def extract_json(text: str) -> dict | None:
|
||||
text = text.strip()
|
||||
try:
|
||||
return json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
fence_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, re.DOTALL)
|
||||
if fence_match:
|
||||
try:
|
||||
return json.loads(fence_match.group(1))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
brace_match = re.search(r"\{[^{}]*\}", text, re.DOTALL)
|
||||
if brace_match:
|
||||
try:
|
||||
return json.loads(brace_match.group(0))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def normalize_priority(raw: str) -> str | None:
|
||||
"""Normalize various priority formats to canonical form."""
|
||||
raw = raw.lower().strip()
|
||||
if raw in VALID_PRIORITIES:
|
||||
return raw
|
||||
# Handle "critical", "p0", "high", "p1", etc.
|
||||
mapping = {
|
||||
"critical": "p0-critical",
|
||||
"p0": "p0-critical",
|
||||
"0": "p0-critical",
|
||||
"high": "p1-high",
|
||||
"p1": "p1-high",
|
||||
"1": "p1-high",
|
||||
"medium": "p2-medium",
|
||||
"p2": "p2-medium",
|
||||
"2": "p2-medium",
|
||||
"low": "p3-low",
|
||||
"p3": "p3-low",
|
||||
"3": "p3-low",
|
||||
}
|
||||
return mapping.get(raw)
|
||||
|
||||
|
||||
def run_prompt(model: str, prompt: str) -> str:
|
||||
payload = {
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1, "num_predict": 256},
|
||||
}
|
||||
resp = requests.post(f"{OLLAMA_URL}/api/generate", json=payload, timeout=120)
|
||||
resp.raise_for_status()
|
||||
return resp.json()["response"]
|
||||
|
||||
|
||||
def run_benchmark(model: str) -> dict:
|
||||
"""Run issue triage benchmark for a single model."""
|
||||
results = []
|
||||
total_time = 0.0
|
||||
|
||||
for i, issue in enumerate(ISSUES, 1):
|
||||
prompt = TRIAGE_PROMPT_TEMPLATE.format(
|
||||
title=issue["title"], description=issue["description"]
|
||||
)
|
||||
start = time.time()
|
||||
try:
|
||||
raw = run_prompt(model, prompt)
|
||||
elapsed = time.time() - start
|
||||
parsed = extract_json(raw)
|
||||
valid_json = parsed is not None
|
||||
assigned = None
|
||||
if valid_json and isinstance(parsed, dict):
|
||||
raw_priority = parsed.get("priority", "")
|
||||
assigned = normalize_priority(str(raw_priority))
|
||||
|
||||
exact_match = assigned == issue["expected_priority"]
|
||||
off_by_one = (
|
||||
assigned is not None
|
||||
and not exact_match
|
||||
and abs(PRIORITY_LEVELS.get(assigned, -1) - PRIORITY_LEVELS[issue["expected_priority"]]) == 1
|
||||
)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"issue_id": i,
|
||||
"title": issue["title"][:60],
|
||||
"expected": issue["expected_priority"],
|
||||
"assigned": assigned,
|
||||
"exact_match": exact_match,
|
||||
"off_by_one": off_by_one,
|
||||
"valid_json": valid_json,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
elapsed = time.time() - start
|
||||
results.append(
|
||||
{
|
||||
"issue_id": i,
|
||||
"title": issue["title"][:60],
|
||||
"expected": issue["expected_priority"],
|
||||
"assigned": None,
|
||||
"exact_match": False,
|
||||
"off_by_one": False,
|
||||
"valid_json": False,
|
||||
"elapsed_s": round(elapsed, 2),
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
total_time += elapsed
|
||||
|
||||
exact_count = sum(1 for r in results if r["exact_match"])
|
||||
accuracy = exact_count / len(ISSUES)
|
||||
|
||||
return {
|
||||
"benchmark": "issue_triage",
|
||||
"model": model,
|
||||
"total_issues": len(ISSUES),
|
||||
"exact_matches": exact_count,
|
||||
"accuracy": round(accuracy, 3),
|
||||
"passed": accuracy >= 0.80,
|
||||
"total_time_s": round(total_time, 2),
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
model = sys.argv[1] if len(sys.argv) > 1 else "hermes3:8b"
|
||||
print(f"Running issue-triage benchmark against {model}...")
|
||||
result = run_benchmark(model)
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if result["passed"] else 1)
|
||||
334
scripts/benchmarks/run_suite.py
Normal file
334
scripts/benchmarks/run_suite.py
Normal file
@@ -0,0 +1,334 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Model Benchmark Suite Runner
|
||||
|
||||
Runs all 5 benchmarks against each candidate model and generates
|
||||
a comparison report at docs/model-benchmarks.md.
|
||||
|
||||
Usage:
|
||||
python scripts/benchmarks/run_suite.py
|
||||
python scripts/benchmarks/run_suite.py --models hermes3:8b qwen3.5:latest
|
||||
python scripts/benchmarks/run_suite.py --output docs/model-benchmarks.md
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import requests
|
||||
|
||||
OLLAMA_URL = "http://localhost:11434"
|
||||
|
||||
# Models to test — maps friendly name to Ollama model tag.
|
||||
# Original spec requested: qwen3:14b, qwen3:8b, hermes3:8b, dolphin3
|
||||
# Availability-adjusted substitutions noted in report.
|
||||
DEFAULT_MODELS = [
|
||||
"hermes3:8b",
|
||||
"qwen3.5:latest",
|
||||
"qwen2.5:14b",
|
||||
"llama3.2:latest",
|
||||
]
|
||||
|
||||
BENCHMARKS_DIR = Path(__file__).parent
|
||||
DOCS_DIR = Path(__file__).resolve().parent.parent.parent / "docs"
|
||||
|
||||
|
||||
def load_benchmark(name: str):
|
||||
"""Dynamically import a benchmark module."""
|
||||
path = BENCHMARKS_DIR / name
|
||||
module_name = Path(name).stem
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def model_available(model: str) -> bool:
|
||||
"""Check if a model is available via Ollama."""
|
||||
try:
|
||||
resp = requests.get(f"{OLLAMA_URL}/api/tags", timeout=10)
|
||||
if resp.status_code != 200:
|
||||
return False
|
||||
models = {m["name"] for m in resp.json().get("models", [])}
|
||||
return model in models
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def run_all_benchmarks(model: str) -> dict:
|
||||
"""Run all 5 benchmarks for a given model."""
|
||||
benchmark_files = [
|
||||
"01_tool_calling.py",
|
||||
"02_code_generation.py",
|
||||
"03_shell_commands.py",
|
||||
"04_multi_turn_coherence.py",
|
||||
"05_issue_triage.py",
|
||||
]
|
||||
|
||||
results = {}
|
||||
for fname in benchmark_files:
|
||||
key = fname.replace(".py", "")
|
||||
print(f" [{model}] Running {key}...", flush=True)
|
||||
try:
|
||||
mod = load_benchmark(fname)
|
||||
start = time.time()
|
||||
if key == "01_tool_calling":
|
||||
result = mod.run_benchmark(model)
|
||||
elif key == "02_code_generation":
|
||||
result = mod.run_benchmark(model)
|
||||
elif key == "03_shell_commands":
|
||||
result = mod.run_benchmark(model)
|
||||
elif key == "04_multi_turn_coherence":
|
||||
result = mod.run_multi_turn(model)
|
||||
elif key == "05_issue_triage":
|
||||
result = mod.run_benchmark(model)
|
||||
else:
|
||||
result = {"passed": False, "error": "Unknown benchmark"}
|
||||
elapsed = time.time() - start
|
||||
print(
|
||||
f" -> {'PASS' if result.get('passed') else 'FAIL'} ({elapsed:.1f}s)",
|
||||
flush=True,
|
||||
)
|
||||
results[key] = result
|
||||
except Exception as exc:
|
||||
print(f" -> ERROR: {exc}", flush=True)
|
||||
results[key] = {"benchmark": key, "model": model, "passed": False, "error": str(exc)}
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def score_model(results: dict) -> dict:
|
||||
"""Compute summary scores for a model."""
|
||||
benchmarks = list(results.values())
|
||||
passed = sum(1 for b in benchmarks if b.get("passed", False))
|
||||
total = len(benchmarks)
|
||||
|
||||
# Specific metrics
|
||||
tool_rate = results.get("01_tool_calling", {}).get("compliance_rate", 0.0)
|
||||
code_pass = results.get("02_code_generation", {}).get("passed", False)
|
||||
shell_pass = results.get("03_shell_commands", {}).get("passed", False)
|
||||
coherence = results.get("04_multi_turn_coherence", {}).get("coherence_rate", 0.0)
|
||||
triage_acc = results.get("05_issue_triage", {}).get("accuracy", 0.0)
|
||||
|
||||
total_time = sum(
|
||||
r.get("total_time_s", r.get("elapsed_s", 0.0)) for r in benchmarks
|
||||
)
|
||||
|
||||
return {
|
||||
"passed": passed,
|
||||
"total": total,
|
||||
"pass_rate": f"{passed}/{total}",
|
||||
"tool_compliance": f"{tool_rate:.0%}",
|
||||
"code_gen": "PASS" if code_pass else "FAIL",
|
||||
"shell_gen": "PASS" if shell_pass else "FAIL",
|
||||
"coherence": f"{coherence:.0%}",
|
||||
"triage_accuracy": f"{triage_acc:.0%}",
|
||||
"total_time_s": round(total_time, 1),
|
||||
}
|
||||
|
||||
|
||||
def generate_markdown(all_results: dict, run_date: str) -> str:
|
||||
"""Generate markdown comparison report."""
|
||||
lines = []
|
||||
lines.append("# Model Benchmark Results")
|
||||
lines.append("")
|
||||
lines.append(f"> Generated: {run_date} ")
|
||||
lines.append(f"> Ollama URL: `{OLLAMA_URL}` ")
|
||||
lines.append("> Issue: [#1066](http://143.198.27.163:3000/rockachopa/Timmy-time-dashboard/issues/1066)")
|
||||
lines.append("")
|
||||
lines.append("## Overview")
|
||||
lines.append("")
|
||||
lines.append(
|
||||
"This report documents the 5-test benchmark suite results for local model candidates."
|
||||
)
|
||||
lines.append("")
|
||||
lines.append("### Model Availability vs. Spec")
|
||||
lines.append("")
|
||||
lines.append("| Requested | Tested Substitute | Reason |")
|
||||
lines.append("|-----------|-------------------|--------|")
|
||||
lines.append("| `qwen3:14b` | `qwen2.5:14b` | `qwen3:14b` not pulled locally |")
|
||||
lines.append("| `qwen3:8b` | `qwen3.5:latest` | `qwen3:8b` not pulled locally |")
|
||||
lines.append("| `hermes3:8b` | `hermes3:8b` | Exact match |")
|
||||
lines.append("| `dolphin3` | `llama3.2:latest` | `dolphin3` not pulled locally |")
|
||||
lines.append("")
|
||||
|
||||
# Summary table
|
||||
lines.append("## Summary Comparison Table")
|
||||
lines.append("")
|
||||
lines.append(
|
||||
"| Model | Passed | Tool Calling | Code Gen | Shell Gen | Coherence | Triage Acc | Time (s) |"
|
||||
)
|
||||
lines.append(
|
||||
"|-------|--------|-------------|----------|-----------|-----------|------------|----------|"
|
||||
)
|
||||
|
||||
for model, results in all_results.items():
|
||||
if "error" in results and "01_tool_calling" not in results:
|
||||
lines.append(f"| `{model}` | — | — | — | — | — | — | — |")
|
||||
continue
|
||||
s = score_model(results)
|
||||
lines.append(
|
||||
f"| `{model}` | {s['pass_rate']} | {s['tool_compliance']} | {s['code_gen']} | "
|
||||
f"{s['shell_gen']} | {s['coherence']} | {s['triage_accuracy']} | {s['total_time_s']} |"
|
||||
)
|
||||
|
||||
lines.append("")
|
||||
|
||||
# Per-model detail sections
|
||||
lines.append("## Per-Model Detail")
|
||||
lines.append("")
|
||||
|
||||
for model, results in all_results.items():
|
||||
lines.append(f"### `{model}`")
|
||||
lines.append("")
|
||||
|
||||
if "error" in results and not isinstance(results.get("error"), str):
|
||||
lines.append(f"> **Error:** {results.get('error')}")
|
||||
lines.append("")
|
||||
continue
|
||||
|
||||
for bkey, bres in results.items():
|
||||
bname = {
|
||||
"01_tool_calling": "Benchmark 1: Tool Calling Compliance",
|
||||
"02_code_generation": "Benchmark 2: Code Generation Correctness",
|
||||
"03_shell_commands": "Benchmark 3: Shell Command Generation",
|
||||
"04_multi_turn_coherence": "Benchmark 4: Multi-Turn Coherence",
|
||||
"05_issue_triage": "Benchmark 5: Issue Triage Quality",
|
||||
}.get(bkey, bkey)
|
||||
|
||||
status = "✅ PASS" if bres.get("passed") else "❌ FAIL"
|
||||
lines.append(f"#### {bname} — {status}")
|
||||
lines.append("")
|
||||
|
||||
if bkey == "01_tool_calling":
|
||||
rate = bres.get("compliance_rate", 0)
|
||||
count = bres.get("valid_json_count", 0)
|
||||
total = bres.get("total_prompts", 0)
|
||||
lines.append(
|
||||
f"- **JSON Compliance:** {count}/{total} ({rate:.0%}) — target ≥90%"
|
||||
)
|
||||
elif bkey == "02_code_generation":
|
||||
lines.append(f"- **Result:** {bres.get('detail', bres.get('error', 'n/a'))}")
|
||||
snippet = bres.get("code_snippet", "")
|
||||
if snippet:
|
||||
lines.append(f"- **Generated code snippet:**")
|
||||
lines.append(" ```python")
|
||||
for ln in snippet.splitlines()[:8]:
|
||||
lines.append(f" {ln}")
|
||||
lines.append(" ```")
|
||||
elif bkey == "03_shell_commands":
|
||||
passed = bres.get("passed_count", 0)
|
||||
refused = bres.get("refused_count", 0)
|
||||
total = bres.get("total_prompts", 0)
|
||||
lines.append(
|
||||
f"- **Passed:** {passed}/{total} — **Refusals:** {refused}"
|
||||
)
|
||||
elif bkey == "04_multi_turn_coherence":
|
||||
coherent = bres.get("coherent_turns", 0)
|
||||
total = bres.get("total_turns", 0)
|
||||
rate = bres.get("coherence_rate", 0)
|
||||
lines.append(
|
||||
f"- **Coherent turns:** {coherent}/{total} ({rate:.0%}) — target ≥80%"
|
||||
)
|
||||
elif bkey == "05_issue_triage":
|
||||
exact = bres.get("exact_matches", 0)
|
||||
total = bres.get("total_issues", 0)
|
||||
acc = bres.get("accuracy", 0)
|
||||
lines.append(
|
||||
f"- **Accuracy:** {exact}/{total} ({acc:.0%}) — target ≥80%"
|
||||
)
|
||||
|
||||
elapsed = bres.get("total_time_s", bres.get("elapsed_s", 0))
|
||||
lines.append(f"- **Time:** {elapsed}s")
|
||||
lines.append("")
|
||||
|
||||
lines.append("## Raw JSON Data")
|
||||
lines.append("")
|
||||
lines.append("<details>")
|
||||
lines.append("<summary>Click to expand full JSON results</summary>")
|
||||
lines.append("")
|
||||
lines.append("```json")
|
||||
lines.append(json.dumps(all_results, indent=2))
|
||||
lines.append("```")
|
||||
lines.append("")
|
||||
lines.append("</details>")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Run model benchmark suite")
|
||||
parser.add_argument(
|
||||
"--models",
|
||||
nargs="+",
|
||||
default=DEFAULT_MODELS,
|
||||
help="Models to test",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=Path,
|
||||
default=DOCS_DIR / "model-benchmarks.md",
|
||||
help="Output markdown file",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--json-output",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Optional JSON output file",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
run_date = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
|
||||
print(f"Model Benchmark Suite — {run_date}")
|
||||
print(f"Testing {len(args.models)} model(s): {', '.join(args.models)}")
|
||||
print()
|
||||
|
||||
all_results: dict[str, dict] = {}
|
||||
|
||||
for model in args.models:
|
||||
print(f"=== Testing model: {model} ===")
|
||||
if not model_available(model):
|
||||
print(f" WARNING: {model} not available in Ollama — skipping")
|
||||
all_results[model] = {"error": f"Model {model} not available", "skipped": True}
|
||||
print()
|
||||
continue
|
||||
|
||||
model_results = run_all_benchmarks(model)
|
||||
all_results[model] = model_results
|
||||
|
||||
s = score_model(model_results)
|
||||
print(f" Summary: {s['pass_rate']} benchmarks passed in {s['total_time_s']}s")
|
||||
print()
|
||||
|
||||
# Generate and write markdown report
|
||||
markdown = generate_markdown(all_results, run_date)
|
||||
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output.write_text(markdown, encoding="utf-8")
|
||||
print(f"Report written to: {args.output}")
|
||||
|
||||
if args.json_output:
|
||||
args.json_output.write_text(json.dumps(all_results, indent=2), encoding="utf-8")
|
||||
print(f"JSON data written to: {args.json_output}")
|
||||
|
||||
# Overall pass/fail
|
||||
all_pass = all(
|
||||
not r.get("skipped", False)
|
||||
and all(b.get("passed", False) for b in r.values() if isinstance(b, dict))
|
||||
for r in all_results.values()
|
||||
)
|
||||
return 0 if all_pass else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -311,14 +311,6 @@ class Settings(BaseSettings):
|
||||
thinking_memory_check_every: int = 50 # check memory status every Nth thought
|
||||
thinking_idle_timeout_minutes: int = 60 # pause thoughts after N minutes without user input
|
||||
|
||||
# ── Dreaming Mode ─────────────────────────────────────────────────
|
||||
# When enabled, the agent replays past sessions during idle time to
|
||||
# simulate alternative actions and propose behavioural rules.
|
||||
dreaming_enabled: bool = True
|
||||
dreaming_idle_threshold_minutes: int = 10 # idle minutes before dreaming starts
|
||||
dreaming_cycle_seconds: int = 600 # seconds between dream attempts
|
||||
dreaming_timeout_seconds: int = 60 # max LLM call time per dream cycle
|
||||
|
||||
# ── Gitea Integration ─────────────────────────────────────────────
|
||||
# Local Gitea instance for issue tracking and self-improvement.
|
||||
# These values are passed as env vars to the gitea-mcp server process.
|
||||
@@ -430,6 +422,14 @@ class Settings(BaseSettings):
|
||||
# Alert threshold: free disk below this triggers cleanup / alert (GB).
|
||||
hermes_disk_free_min_gb: float = 10.0
|
||||
|
||||
# ── Energy Budget Monitoring ───────────────────────────────────────
|
||||
# Enable energy budget monitoring (tracks CPU/GPU power during inference).
|
||||
energy_budget_enabled: bool = True
|
||||
# Watts threshold that auto-activates low power mode (on-battery only).
|
||||
energy_budget_watts_threshold: float = 15.0
|
||||
# Model to prefer in low power mode (smaller = more efficient).
|
||||
energy_low_power_model: str = "qwen3:1b"
|
||||
|
||||
# ── Error Logging ─────────────────────────────────────────────────
|
||||
error_log_enabled: bool = True
|
||||
error_log_dir: str = "logs"
|
||||
|
||||
@@ -37,6 +37,7 @@ from dashboard.routes.db_explorer import router as db_explorer_router
|
||||
from dashboard.routes.discord import router as discord_router
|
||||
from dashboard.routes.experiments import router as experiments_router
|
||||
from dashboard.routes.grok import router as grok_router
|
||||
from dashboard.routes.energy import router as energy_router
|
||||
from dashboard.routes.health import router as health_router
|
||||
from dashboard.routes.hermes import router as hermes_router
|
||||
from dashboard.routes.loop_qa import router as loop_qa_router
|
||||
@@ -54,11 +55,11 @@ from dashboard.routes.system import router as system_router
|
||||
from dashboard.routes.tasks import router as tasks_router
|
||||
from dashboard.routes.telegram import router as telegram_router
|
||||
from dashboard.routes.thinking import router as thinking_router
|
||||
from dashboard.routes.self_correction import router as self_correction_router
|
||||
from dashboard.routes.three_strike import router as three_strike_router
|
||||
from dashboard.routes.tools import router as tools_router
|
||||
from dashboard.routes.tower import router as tower_router
|
||||
from dashboard.routes.voice import router as voice_router
|
||||
from dashboard.routes.dreaming import router as dreaming_router
|
||||
from dashboard.routes.work_orders import router as work_orders_router
|
||||
from dashboard.routes.world import matrix_router
|
||||
from dashboard.routes.world import router as world_router
|
||||
@@ -251,36 +252,6 @@ async def _loop_qa_scheduler() -> None:
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
|
||||
async def _dreaming_scheduler() -> None:
|
||||
"""Background task: run idle-time dreaming cycles.
|
||||
|
||||
When the system has been idle for ``dreaming_idle_threshold_minutes``,
|
||||
the dreaming engine replays a past session and simulates alternatives.
|
||||
"""
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
await asyncio.sleep(15) # Stagger after loop QA scheduler
|
||||
|
||||
while True:
|
||||
try:
|
||||
if settings.dreaming_enabled:
|
||||
await asyncio.wait_for(
|
||||
dreaming_engine.dream_once(),
|
||||
timeout=settings.dreaming_timeout_seconds + 10,
|
||||
)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
"Dreaming cycle timed out after %ds",
|
||||
settings.dreaming_timeout_seconds,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Dreaming scheduler error: %s", exc)
|
||||
|
||||
await asyncio.sleep(settings.dreaming_cycle_seconds)
|
||||
|
||||
|
||||
_PRESENCE_POLL_SECONDS = 30
|
||||
_PRESENCE_INITIAL_DELAY = 3
|
||||
|
||||
@@ -441,7 +412,6 @@ def _startup_background_tasks() -> list[asyncio.Task]:
|
||||
asyncio.create_task(_briefing_scheduler()),
|
||||
asyncio.create_task(_thinking_scheduler()),
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_dreaming_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
asyncio.create_task(_start_chat_integrations_background()),
|
||||
asyncio.create_task(_hermes_scheduler()),
|
||||
@@ -582,12 +552,28 @@ async def lifespan(app: FastAPI):
|
||||
except Exception:
|
||||
logger.debug("Failed to register error recorder")
|
||||
|
||||
# Mark session start for sovereignty duration tracking
|
||||
try:
|
||||
from timmy.sovereignty import mark_session_start
|
||||
|
||||
mark_session_start()
|
||||
except Exception:
|
||||
logger.debug("Failed to mark sovereignty session start")
|
||||
|
||||
logger.info("✓ Dashboard ready for requests")
|
||||
|
||||
yield
|
||||
|
||||
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
||||
|
||||
# Generate and commit sovereignty session report
|
||||
try:
|
||||
from timmy.sovereignty import generate_and_commit_report
|
||||
|
||||
await generate_and_commit_report()
|
||||
except Exception as exc:
|
||||
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Mission Control",
|
||||
@@ -705,12 +691,13 @@ app.include_router(matrix_router)
|
||||
app.include_router(tower_router)
|
||||
app.include_router(daily_run_router)
|
||||
app.include_router(hermes_router)
|
||||
app.include_router(energy_router)
|
||||
app.include_router(quests_router)
|
||||
app.include_router(scorecards_router)
|
||||
app.include_router(sovereignty_metrics_router)
|
||||
app.include_router(sovereignty_ws_router)
|
||||
app.include_router(three_strike_router)
|
||||
app.include_router(dreaming_router)
|
||||
app.include_router(self_correction_router)
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
"""Dreaming mode dashboard routes.
|
||||
|
||||
GET /dreaming/api/status — JSON status of the dreaming engine
|
||||
GET /dreaming/api/recent — JSON list of recent dream records
|
||||
POST /dreaming/api/trigger — Manually trigger a dream cycle (for testing)
|
||||
GET /dreaming/partial — HTMX partial: dreaming status panel
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
|
||||
from dashboard.templating import templates
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/dreaming", tags=["dreaming"])
|
||||
|
||||
|
||||
@router.get("/api/status", response_class=JSONResponse)
|
||||
async def dreaming_status():
|
||||
"""Return current dreaming engine status as JSON."""
|
||||
return dreaming_engine.get_status()
|
||||
|
||||
|
||||
@router.get("/api/recent", response_class=JSONResponse)
|
||||
async def dreaming_recent(limit: int = 10):
|
||||
"""Return recent dream records as JSON."""
|
||||
dreams = dreaming_engine.get_recent_dreams(limit=limit)
|
||||
return [
|
||||
{
|
||||
"id": d.id,
|
||||
"session_excerpt": d.session_excerpt[:200],
|
||||
"decision_point": d.decision_point[:200],
|
||||
"simulation": d.simulation,
|
||||
"proposed_rule": d.proposed_rule,
|
||||
"created_at": d.created_at,
|
||||
}
|
||||
for d in dreams
|
||||
]
|
||||
|
||||
|
||||
@router.post("/api/trigger", response_class=JSONResponse)
|
||||
async def dreaming_trigger():
|
||||
"""Manually trigger a dream cycle (bypasses idle check).
|
||||
|
||||
Useful for testing and manual inspection. Forces idle state temporarily.
|
||||
"""
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from config import settings
|
||||
|
||||
# Temporarily back-date last activity to appear idle
|
||||
original_time = dreaming_engine._last_activity_time
|
||||
dreaming_engine._last_activity_time = datetime.now(UTC) - timedelta(
|
||||
minutes=settings.dreaming_idle_threshold_minutes + 1
|
||||
)
|
||||
|
||||
try:
|
||||
dream = await dreaming_engine.dream_once()
|
||||
finally:
|
||||
dreaming_engine._last_activity_time = original_time
|
||||
|
||||
if dream:
|
||||
return {
|
||||
"status": "ok",
|
||||
"dream_id": dream.id,
|
||||
"proposed_rule": dream.proposed_rule,
|
||||
"simulation": dream.simulation[:200],
|
||||
}
|
||||
return {"status": "skipped", "reason": "No dream produced (no sessions or LLM unavailable)"}
|
||||
|
||||
|
||||
@router.get("/partial", response_class=HTMLResponse)
|
||||
async def dreaming_partial(request: Request):
|
||||
"""HTMX partial: dreaming status panel for the dashboard."""
|
||||
status = dreaming_engine.get_status()
|
||||
recent = dreaming_engine.get_recent_dreams(limit=5)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/dreaming_status.html",
|
||||
{"status": status, "recent_dreams": recent},
|
||||
)
|
||||
121
src/dashboard/routes/energy.py
Normal file
121
src/dashboard/routes/energy.py
Normal file
@@ -0,0 +1,121 @@
|
||||
"""Energy Budget Monitoring routes.
|
||||
|
||||
Exposes the energy budget monitor via REST API so the dashboard and
|
||||
external tools can query power draw, efficiency scores, and toggle
|
||||
low power mode.
|
||||
|
||||
Refs: #1009
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel
|
||||
|
||||
from config import settings
|
||||
from infrastructure.energy.monitor import energy_monitor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/energy", tags=["energy"])
|
||||
|
||||
|
||||
class LowPowerRequest(BaseModel):
|
||||
"""Request body for toggling low power mode."""
|
||||
|
||||
enabled: bool
|
||||
|
||||
|
||||
class InferenceEventRequest(BaseModel):
|
||||
"""Request body for recording an inference event."""
|
||||
|
||||
model: str
|
||||
tokens_per_second: float
|
||||
|
||||
|
||||
@router.get("/status")
|
||||
async def energy_status():
|
||||
"""Return the current energy budget status.
|
||||
|
||||
Returns the live power estimate, efficiency score (0–10), recent
|
||||
inference samples, and whether low power mode is active.
|
||||
"""
|
||||
if not getattr(settings, "energy_budget_enabled", True):
|
||||
return {
|
||||
"enabled": False,
|
||||
"message": "Energy budget monitoring is disabled (ENERGY_BUDGET_ENABLED=false)",
|
||||
}
|
||||
|
||||
report = await energy_monitor.get_report()
|
||||
return {**report.to_dict(), "enabled": True}
|
||||
|
||||
|
||||
@router.get("/report")
|
||||
async def energy_report():
|
||||
"""Detailed energy budget report with all recent samples.
|
||||
|
||||
Same as /energy/status but always includes the full sample history.
|
||||
"""
|
||||
if not getattr(settings, "energy_budget_enabled", True):
|
||||
raise HTTPException(status_code=503, detail="Energy budget monitoring is disabled")
|
||||
|
||||
report = await energy_monitor.get_report()
|
||||
data = report.to_dict()
|
||||
# Override recent_samples to include the full window (not just last 10)
|
||||
data["recent_samples"] = [
|
||||
{
|
||||
"timestamp": s.timestamp,
|
||||
"model": s.model,
|
||||
"tokens_per_second": round(s.tokens_per_second, 1),
|
||||
"estimated_watts": round(s.estimated_watts, 2),
|
||||
"efficiency": round(s.efficiency, 3),
|
||||
"efficiency_score": round(s.efficiency_score, 2),
|
||||
}
|
||||
for s in list(energy_monitor._samples)
|
||||
]
|
||||
return {**data, "enabled": True}
|
||||
|
||||
|
||||
@router.post("/low-power")
|
||||
async def set_low_power_mode(body: LowPowerRequest):
|
||||
"""Enable or disable low power mode.
|
||||
|
||||
In low power mode the cascade router is advised to prefer the
|
||||
configured energy_low_power_model (see settings).
|
||||
"""
|
||||
if not getattr(settings, "energy_budget_enabled", True):
|
||||
raise HTTPException(status_code=503, detail="Energy budget monitoring is disabled")
|
||||
|
||||
energy_monitor.set_low_power_mode(body.enabled)
|
||||
low_power_model = getattr(settings, "energy_low_power_model", "qwen3:1b")
|
||||
return {
|
||||
"low_power_mode": body.enabled,
|
||||
"preferred_model": low_power_model if body.enabled else None,
|
||||
"message": (
|
||||
f"Low power mode {'enabled' if body.enabled else 'disabled'}. "
|
||||
+ (f"Routing to {low_power_model}." if body.enabled else "Routing restored to default.")
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/record")
|
||||
async def record_inference_event(body: InferenceEventRequest):
|
||||
"""Record an inference event for efficiency tracking.
|
||||
|
||||
Called after each LLM inference completes. Updates the rolling
|
||||
efficiency score and may auto-activate low power mode if watts
|
||||
exceed the configured threshold.
|
||||
"""
|
||||
if not getattr(settings, "energy_budget_enabled", True):
|
||||
return {"recorded": False, "message": "Energy budget monitoring is disabled"}
|
||||
|
||||
if body.tokens_per_second <= 0:
|
||||
raise HTTPException(status_code=422, detail="tokens_per_second must be positive")
|
||||
|
||||
sample = energy_monitor.record_inference(body.model, body.tokens_per_second)
|
||||
return {
|
||||
"recorded": True,
|
||||
"efficiency_score": round(sample.efficiency_score, 2),
|
||||
"estimated_watts": round(sample.estimated_watts, 2),
|
||||
"low_power_mode": energy_monitor.low_power_mode,
|
||||
}
|
||||
58
src/dashboard/routes/self_correction.py
Normal file
58
src/dashboard/routes/self_correction.py
Normal file
@@ -0,0 +1,58 @@
|
||||
"""Self-Correction Dashboard routes.
|
||||
|
||||
GET /self-correction/ui — HTML dashboard
|
||||
GET /self-correction/timeline — HTMX partial: recent event timeline
|
||||
GET /self-correction/patterns — HTMX partial: recurring failure patterns
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import HTMLResponse
|
||||
|
||||
from dashboard.templating import templates
|
||||
from infrastructure.self_correction import get_corrections, get_patterns, get_stats
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/self-correction", tags=["self-correction"])
|
||||
|
||||
|
||||
@router.get("/ui", response_class=HTMLResponse)
|
||||
async def self_correction_ui(request: Request):
|
||||
"""Render the Self-Correction Dashboard."""
|
||||
stats = get_stats()
|
||||
corrections = get_corrections(limit=20)
|
||||
patterns = get_patterns(top_n=10)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"self_correction.html",
|
||||
{
|
||||
"stats": stats,
|
||||
"corrections": corrections,
|
||||
"patterns": patterns,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/timeline", response_class=HTMLResponse)
|
||||
async def self_correction_timeline(request: Request):
|
||||
"""HTMX partial: recent self-correction event timeline."""
|
||||
corrections = get_corrections(limit=30)
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/self_correction_timeline.html",
|
||||
{"corrections": corrections},
|
||||
)
|
||||
|
||||
|
||||
@router.get("/patterns", response_class=HTMLResponse)
|
||||
async def self_correction_patterns(request: Request):
|
||||
"""HTMX partial: recurring failure patterns."""
|
||||
patterns = get_patterns(top_n=10)
|
||||
stats = get_stats()
|
||||
return templates.TemplateResponse(
|
||||
request,
|
||||
"partials/self_correction_patterns.html",
|
||||
{"patterns": patterns, "stats": stats},
|
||||
)
|
||||
@@ -71,6 +71,7 @@
|
||||
<a href="/spark/ui" class="mc-test-link">SPARK</a>
|
||||
<a href="/memory" class="mc-test-link">MEMORY</a>
|
||||
<a href="/marketplace/ui" class="mc-test-link">MARKET</a>
|
||||
<a href="/self-correction/ui" class="mc-test-link">SELF-CORRECT</a>
|
||||
</div>
|
||||
</div>
|
||||
<div class="mc-nav-dropdown">
|
||||
@@ -132,6 +133,7 @@
|
||||
<a href="/spark/ui" class="mc-mobile-link">SPARK</a>
|
||||
<a href="/memory" class="mc-mobile-link">MEMORY</a>
|
||||
<a href="/marketplace/ui" class="mc-mobile-link">MARKET</a>
|
||||
<a href="/self-correction/ui" class="mc-mobile-link">SELF-CORRECT</a>
|
||||
<div class="mc-mobile-section-label">AGENTS</div>
|
||||
<a href="/hands" class="mc-mobile-link">HANDS</a>
|
||||
<a href="/work-orders/queue" class="mc-mobile-link">WORK ORDERS</a>
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
{% if not status.enabled %}
|
||||
<div class="dream-disabled text-muted small">Dreaming mode disabled</div>
|
||||
{% elif status.dreaming %}
|
||||
<div class="dream-active">
|
||||
<span class="dream-pulse"></span>
|
||||
<span class="dream-label">DREAMING</span>
|
||||
<div class="dream-summary">{{ status.current_summary }}</div>
|
||||
</div>
|
||||
{% elif status.idle %}
|
||||
<div class="dream-idle">
|
||||
<span class="dream-dot dream-dot-idle"></span>
|
||||
<span class="dream-label-idle">IDLE</span>
|
||||
<span class="dream-idle-meta">{{ status.idle_minutes }}m — dream cycle pending</span>
|
||||
</div>
|
||||
{% else %}
|
||||
<div class="dream-standby">
|
||||
<span class="dream-dot dream-dot-standby"></span>
|
||||
<span class="dream-label-standby">STANDBY</span>
|
||||
<span class="dream-idle-meta">idle in {{ status.idle_threshold_minutes - status.idle_minutes }}m</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if recent_dreams %}
|
||||
<div class="dream-history mt-2">
|
||||
{% for d in recent_dreams %}
|
||||
<div class="dream-record">
|
||||
<div class="dream-rule">{{ d.proposed_rule if d.proposed_rule else "No rule extracted" }}</div>
|
||||
<div class="dream-meta">{{ d.created_at[:16] | replace("T", " ") }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
@@ -0,0 +1,28 @@
|
||||
{% if patterns %}
|
||||
<table class="mc-table w-100">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>ERROR TYPE</th>
|
||||
<th class="text-center">COUNT</th>
|
||||
<th class="text-center">CORRECTED</th>
|
||||
<th class="text-center">FAILED</th>
|
||||
<th>LAST SEEN</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{% for p in patterns %}
|
||||
<tr>
|
||||
<td class="sc-pattern-type">{{ p.error_type }}</td>
|
||||
<td class="text-center">
|
||||
<span class="badge {% if p.count >= 5 %}badge-error{% elif p.count >= 3 %}badge-warning{% else %}badge-info{% endif %}">{{ p.count }}</span>
|
||||
</td>
|
||||
<td class="text-center text-success">{{ p.success_count }}</td>
|
||||
<td class="text-center {% if p.failed_count > 0 %}text-danger{% else %}text-muted{% endif %}">{{ p.failed_count }}</td>
|
||||
<td class="sc-event-time">{{ p.last_seen[:16] if p.last_seen else '—' }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
{% else %}
|
||||
<div class="text-center text-muted py-3">No patterns detected yet.</div>
|
||||
{% endif %}
|
||||
@@ -0,0 +1,26 @@
|
||||
{% if corrections %}
|
||||
{% for ev in corrections %}
|
||||
<div class="sc-event sc-status-{{ ev.outcome_status }}">
|
||||
<div class="sc-event-header">
|
||||
<span class="sc-status-badge sc-status-{{ ev.outcome_status }}">
|
||||
{% if ev.outcome_status == 'success' %}✓ CORRECTED
|
||||
{% elif ev.outcome_status == 'partial' %}● PARTIAL
|
||||
{% else %}✗ FAILED
|
||||
{% endif %}
|
||||
</span>
|
||||
<span class="sc-source-badge">{{ ev.source }}</span>
|
||||
<span class="sc-event-time">{{ ev.created_at[:19] }}</span>
|
||||
</div>
|
||||
<div class="sc-event-error-type">{{ ev.error_type }}</div>
|
||||
<div class="sc-event-intent"><span class="sc-label">INTENT:</span> {{ ev.original_intent[:120] }}{% if ev.original_intent | length > 120 %}…{% endif %}</div>
|
||||
<div class="sc-event-error"><span class="sc-label">ERROR:</span> {{ ev.detected_error[:120] }}{% if ev.detected_error | length > 120 %}…{% endif %}</div>
|
||||
<div class="sc-event-strategy"><span class="sc-label">STRATEGY:</span> {{ ev.correction_strategy[:120] }}{% if ev.correction_strategy | length > 120 %}…{% endif %}</div>
|
||||
<div class="sc-event-outcome"><span class="sc-label">OUTCOME:</span> {{ ev.final_outcome[:120] }}{% if ev.final_outcome | length > 120 %}…{% endif %}</div>
|
||||
{% if ev.task_id %}
|
||||
<div class="sc-event-meta">task: {{ ev.task_id[:8] }}</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<div class="text-center text-muted py-3">No self-correction events recorded yet.</div>
|
||||
{% endif %}
|
||||
102
src/dashboard/templates/self_correction.html
Normal file
102
src/dashboard/templates/self_correction.html
Normal file
@@ -0,0 +1,102 @@
|
||||
{% extends "base.html" %}
|
||||
{% from "macros.html" import panel %}
|
||||
|
||||
{% block title %}Timmy Time — Self-Correction Dashboard{% endblock %}
|
||||
|
||||
{% block extra_styles %}{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="container-fluid py-3">
|
||||
|
||||
<!-- Header -->
|
||||
<div class="spark-header mb-3">
|
||||
<div class="spark-title">SELF-CORRECTION</div>
|
||||
<div class="spark-subtitle">
|
||||
Agent error detection & recovery —
|
||||
<span class="spark-status-val">{{ stats.total }}</span> events,
|
||||
<span class="spark-status-val">{{ stats.success_rate }}%</span> correction rate,
|
||||
<span class="spark-status-val">{{ stats.unique_error_types }}</span> distinct error types
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="row g-3">
|
||||
|
||||
<!-- Left column: stats + patterns -->
|
||||
<div class="col-12 col-lg-4 d-flex flex-column gap-3">
|
||||
|
||||
<!-- Stats panel -->
|
||||
<div class="card mc-panel">
|
||||
<div class="card-header mc-panel-header">// CORRECTION STATS</div>
|
||||
<div class="card-body p-3">
|
||||
<div class="spark-stat-grid">
|
||||
<div class="spark-stat">
|
||||
<span class="spark-stat-label">TOTAL</span>
|
||||
<span class="spark-stat-value">{{ stats.total }}</span>
|
||||
</div>
|
||||
<div class="spark-stat">
|
||||
<span class="spark-stat-label">CORRECTED</span>
|
||||
<span class="spark-stat-value text-success">{{ stats.success_count }}</span>
|
||||
</div>
|
||||
<div class="spark-stat">
|
||||
<span class="spark-stat-label">PARTIAL</span>
|
||||
<span class="spark-stat-value text-warning">{{ stats.partial_count }}</span>
|
||||
</div>
|
||||
<div class="spark-stat">
|
||||
<span class="spark-stat-label">FAILED</span>
|
||||
<span class="spark-stat-value {% if stats.failed_count > 0 %}text-danger{% else %}text-muted{% endif %}">{{ stats.failed_count }}</span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="mt-3">
|
||||
<div class="d-flex justify-content-between mb-1">
|
||||
<small class="text-muted">Correction Rate</small>
|
||||
<small class="{% if stats.success_rate >= 70 %}text-success{% elif stats.success_rate >= 40 %}text-warning{% else %}text-danger{% endif %}">{{ stats.success_rate }}%</small>
|
||||
</div>
|
||||
<div class="progress" style="height:6px;">
|
||||
<div class="progress-bar {% if stats.success_rate >= 70 %}bg-success{% elif stats.success_rate >= 40 %}bg-warning{% else %}bg-danger{% endif %}"
|
||||
role="progressbar"
|
||||
style="width:{{ stats.success_rate }}%"
|
||||
aria-valuenow="{{ stats.success_rate }}"
|
||||
aria-valuemin="0"
|
||||
aria-valuemax="100"></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<!-- Patterns panel -->
|
||||
<div class="card mc-panel"
|
||||
hx-get="/self-correction/patterns"
|
||||
hx-trigger="load, every 60s"
|
||||
hx-target="#sc-patterns-body"
|
||||
hx-swap="innerHTML">
|
||||
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
|
||||
<span>// RECURRING PATTERNS</span>
|
||||
<span class="badge badge-info">{{ patterns | length }}</span>
|
||||
</div>
|
||||
<div class="card-body p-0" id="sc-patterns-body">
|
||||
{% include "partials/self_correction_patterns.html" %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
<!-- Right column: timeline -->
|
||||
<div class="col-12 col-lg-8">
|
||||
<div class="card mc-panel"
|
||||
hx-get="/self-correction/timeline"
|
||||
hx-trigger="load, every 30s"
|
||||
hx-target="#sc-timeline-body"
|
||||
hx-swap="innerHTML">
|
||||
<div class="card-header mc-panel-header d-flex justify-content-between align-items-center">
|
||||
<span>// CORRECTION TIMELINE</span>
|
||||
<span class="badge badge-info">{{ corrections | length }}</span>
|
||||
</div>
|
||||
<div class="card-body p-3" id="sc-timeline-body">
|
||||
{% include "partials/self_correction_timeline.html" %}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
8
src/infrastructure/energy/__init__.py
Normal file
8
src/infrastructure/energy/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Energy Budget Monitoring — power-draw estimation for LLM inference.
|
||||
|
||||
Refs: #1009
|
||||
"""
|
||||
|
||||
from infrastructure.energy.monitor import EnergyBudgetMonitor, energy_monitor
|
||||
|
||||
__all__ = ["EnergyBudgetMonitor", "energy_monitor"]
|
||||
371
src/infrastructure/energy/monitor.py
Normal file
371
src/infrastructure/energy/monitor.py
Normal file
@@ -0,0 +1,371 @@
|
||||
"""Energy Budget Monitor — estimates GPU/CPU power draw during LLM inference.
|
||||
|
||||
Tracks estimated power consumption to optimize for "metabolic efficiency".
|
||||
Three estimation strategies attempted in priority order:
|
||||
|
||||
1. Battery discharge via ioreg (macOS — works without sudo, on-battery only)
|
||||
2. CPU utilisation proxy via sysctl hw.cpufrequency + top
|
||||
3. Model-size heuristic (tokens/s × model_size_gb × 2W/GB estimate)
|
||||
|
||||
Energy Efficiency score (0–10):
|
||||
efficiency = tokens_per_second / estimated_watts, normalised to 0–10.
|
||||
|
||||
Low Power Mode:
|
||||
Activated manually or automatically when draw exceeds the configured
|
||||
threshold. In low power mode the cascade router is advised to prefer the
|
||||
configured low_power_model (e.g. qwen3:1b or similar compact model).
|
||||
|
||||
Refs: #1009
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Approximate model-size lookup (GB) used for heuristic power estimate.
|
||||
# Keys are lowercase substring matches against the model name.
|
||||
_MODEL_SIZE_GB: dict[str, float] = {
|
||||
"qwen3:1b": 0.8,
|
||||
"qwen3:3b": 2.0,
|
||||
"qwen3:4b": 2.5,
|
||||
"qwen3:8b": 5.5,
|
||||
"qwen3:14b": 9.0,
|
||||
"qwen3:30b": 20.0,
|
||||
"qwen3:32b": 20.0,
|
||||
"llama3:8b": 5.5,
|
||||
"llama3:70b": 45.0,
|
||||
"mistral:7b": 4.5,
|
||||
"gemma3:4b": 2.5,
|
||||
"gemma3:12b": 8.0,
|
||||
"gemma3:27b": 17.0,
|
||||
"phi4:14b": 9.0,
|
||||
}
|
||||
_DEFAULT_MODEL_SIZE_GB = 5.0 # fallback when model not in table
|
||||
_WATTS_PER_GB_HEURISTIC = 2.0 # rough W/GB for Apple Silicon unified memory
|
||||
|
||||
# Efficiency score normalisation: score 10 at this efficiency (tok/s per W).
|
||||
_EFFICIENCY_SCORE_CEILING = 5.0 # tok/s per W → score 10
|
||||
|
||||
# Rolling window for recent samples
|
||||
_HISTORY_MAXLEN = 60
|
||||
|
||||
|
||||
@dataclass
|
||||
class InferenceSample:
|
||||
"""A single inference event captured by record_inference()."""
|
||||
|
||||
timestamp: str
|
||||
model: str
|
||||
tokens_per_second: float
|
||||
estimated_watts: float
|
||||
efficiency: float # tokens/s per watt
|
||||
efficiency_score: float # 0–10
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnergyReport:
|
||||
"""Snapshot of current energy budget state."""
|
||||
|
||||
timestamp: str
|
||||
low_power_mode: bool
|
||||
current_watts: float
|
||||
strategy: str # "battery", "cpu_proxy", "heuristic", "unavailable"
|
||||
efficiency_score: float # 0–10; -1 if no inference samples yet
|
||||
recent_samples: list[InferenceSample]
|
||||
recommendation: str
|
||||
details: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"timestamp": self.timestamp,
|
||||
"low_power_mode": self.low_power_mode,
|
||||
"current_watts": round(self.current_watts, 2),
|
||||
"strategy": self.strategy,
|
||||
"efficiency_score": round(self.efficiency_score, 2),
|
||||
"recent_samples": [
|
||||
{
|
||||
"timestamp": s.timestamp,
|
||||
"model": s.model,
|
||||
"tokens_per_second": round(s.tokens_per_second, 1),
|
||||
"estimated_watts": round(s.estimated_watts, 2),
|
||||
"efficiency": round(s.efficiency, 3),
|
||||
"efficiency_score": round(s.efficiency_score, 2),
|
||||
}
|
||||
for s in self.recent_samples
|
||||
],
|
||||
"recommendation": self.recommendation,
|
||||
"details": self.details,
|
||||
}
|
||||
|
||||
|
||||
class EnergyBudgetMonitor:
|
||||
"""Estimates power consumption and tracks LLM inference efficiency.
|
||||
|
||||
All blocking I/O (subprocess calls) is wrapped in asyncio.to_thread()
|
||||
so the event loop is never blocked. Results are cached.
|
||||
|
||||
Usage::
|
||||
|
||||
# Record an inference event
|
||||
energy_monitor.record_inference("qwen3:8b", tokens_per_second=42.0)
|
||||
|
||||
# Get the current report
|
||||
report = await energy_monitor.get_report()
|
||||
|
||||
# Toggle low power mode
|
||||
energy_monitor.set_low_power_mode(True)
|
||||
"""
|
||||
|
||||
_POWER_CACHE_TTL = 10.0 # seconds between fresh power readings
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._low_power_mode: bool = False
|
||||
self._samples: deque[InferenceSample] = deque(maxlen=_HISTORY_MAXLEN)
|
||||
self._cached_watts: float = 0.0
|
||||
self._cached_strategy: str = "unavailable"
|
||||
self._cache_ts: float = 0.0
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def low_power_mode(self) -> bool:
|
||||
return self._low_power_mode
|
||||
|
||||
def set_low_power_mode(self, enabled: bool) -> None:
|
||||
"""Enable or disable low power mode."""
|
||||
self._low_power_mode = enabled
|
||||
state = "enabled" if enabled else "disabled"
|
||||
logger.info("Energy budget: low power mode %s", state)
|
||||
|
||||
def record_inference(self, model: str, tokens_per_second: float) -> InferenceSample:
|
||||
"""Record an inference event for efficiency tracking.
|
||||
|
||||
Call this after each LLM inference completes with the model name and
|
||||
measured throughput. The current power estimate is used to compute
|
||||
the efficiency score.
|
||||
|
||||
Args:
|
||||
model: Ollama model name (e.g. "qwen3:8b").
|
||||
tokens_per_second: Measured decode throughput.
|
||||
|
||||
Returns:
|
||||
The recorded InferenceSample.
|
||||
"""
|
||||
watts = self._cached_watts if self._cached_watts > 0 else self._estimate_watts_sync(model)
|
||||
efficiency = tokens_per_second / max(watts, 0.1)
|
||||
score = min(10.0, (efficiency / _EFFICIENCY_SCORE_CEILING) * 10.0)
|
||||
|
||||
sample = InferenceSample(
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
model=model,
|
||||
tokens_per_second=tokens_per_second,
|
||||
estimated_watts=watts,
|
||||
efficiency=efficiency,
|
||||
efficiency_score=score,
|
||||
)
|
||||
self._samples.append(sample)
|
||||
|
||||
# Auto-engage low power mode if above threshold and budget is enabled
|
||||
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
|
||||
if watts > threshold and not self._low_power_mode:
|
||||
logger.info(
|
||||
"Energy budget: %.1fW exceeds threshold %.1fW — auto-engaging low power mode",
|
||||
watts,
|
||||
threshold,
|
||||
)
|
||||
self.set_low_power_mode(True)
|
||||
|
||||
return sample
|
||||
|
||||
async def get_report(self) -> EnergyReport:
|
||||
"""Return the current energy budget report.
|
||||
|
||||
Refreshes the power estimate if the cache is stale.
|
||||
"""
|
||||
await self._refresh_power_cache()
|
||||
|
||||
score = self._compute_mean_efficiency_score()
|
||||
recommendation = self._build_recommendation(score)
|
||||
|
||||
return EnergyReport(
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
low_power_mode=self._low_power_mode,
|
||||
current_watts=self._cached_watts,
|
||||
strategy=self._cached_strategy,
|
||||
efficiency_score=score,
|
||||
recent_samples=list(self._samples)[-10:],
|
||||
recommendation=recommendation,
|
||||
details={"sample_count": len(self._samples)},
|
||||
)
|
||||
|
||||
# ── Power estimation ──────────────────────────────────────────────────────
|
||||
|
||||
async def _refresh_power_cache(self) -> None:
|
||||
"""Refresh the cached power reading if stale."""
|
||||
now = time.monotonic()
|
||||
if now - self._cache_ts < self._POWER_CACHE_TTL:
|
||||
return
|
||||
|
||||
try:
|
||||
watts, strategy = await asyncio.to_thread(self._read_power)
|
||||
except Exception as exc:
|
||||
logger.debug("Energy: power read failed: %s", exc)
|
||||
watts, strategy = 0.0, "unavailable"
|
||||
|
||||
self._cached_watts = watts
|
||||
self._cached_strategy = strategy
|
||||
self._cache_ts = now
|
||||
|
||||
def _read_power(self) -> tuple[float, str]:
|
||||
"""Synchronous power reading — tries strategies in priority order.
|
||||
|
||||
Returns:
|
||||
Tuple of (watts, strategy_name).
|
||||
"""
|
||||
# Strategy 1: battery discharge via ioreg (on-battery Macs)
|
||||
try:
|
||||
watts = self._read_battery_watts()
|
||||
if watts > 0:
|
||||
return watts, "battery"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 2: CPU utilisation proxy via top
|
||||
try:
|
||||
cpu_pct = self._read_cpu_pct()
|
||||
if cpu_pct >= 0:
|
||||
# M3 Max TDP ≈ 40W; scale linearly
|
||||
watts = (cpu_pct / 100.0) * 40.0
|
||||
return watts, "cpu_proxy"
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Strategy 3: heuristic from loaded model size
|
||||
return 0.0, "unavailable"
|
||||
|
||||
def _estimate_watts_sync(self, model: str) -> float:
|
||||
"""Estimate watts from model size when no live reading is available."""
|
||||
size_gb = self._model_size_gb(model)
|
||||
return size_gb * _WATTS_PER_GB_HEURISTIC
|
||||
|
||||
def _read_battery_watts(self) -> float:
|
||||
"""Read instantaneous battery discharge via ioreg.
|
||||
|
||||
Returns watts if on battery, 0.0 if plugged in or unavailable.
|
||||
Requires macOS; no sudo needed.
|
||||
"""
|
||||
result = subprocess.run(
|
||||
["ioreg", "-r", "-c", "AppleSmartBattery", "-d", "1"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3,
|
||||
)
|
||||
amperage_ma = 0.0
|
||||
voltage_mv = 0.0
|
||||
is_charging = True # assume charging unless we see ExternalConnected = No
|
||||
|
||||
for line in result.stdout.splitlines():
|
||||
stripped = line.strip()
|
||||
if '"InstantAmperage"' in stripped:
|
||||
try:
|
||||
amperage_ma = float(stripped.split("=")[-1].strip())
|
||||
except ValueError:
|
||||
pass
|
||||
elif '"Voltage"' in stripped:
|
||||
try:
|
||||
voltage_mv = float(stripped.split("=")[-1].strip())
|
||||
except ValueError:
|
||||
pass
|
||||
elif '"ExternalConnected"' in stripped:
|
||||
is_charging = "Yes" in stripped
|
||||
|
||||
if is_charging or voltage_mv == 0 or amperage_ma <= 0:
|
||||
return 0.0
|
||||
|
||||
# ioreg reports amperage in mA, voltage in mV
|
||||
return (abs(amperage_ma) * voltage_mv) / 1_000_000
|
||||
|
||||
def _read_cpu_pct(self) -> float:
|
||||
"""Read CPU utilisation from macOS top.
|
||||
|
||||
Returns aggregate CPU% (0–100), or -1.0 on failure.
|
||||
"""
|
||||
result = subprocess.run(
|
||||
["top", "-l", "1", "-n", "0", "-stats", "cpu"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
for line in result.stdout.splitlines():
|
||||
if "CPU usage:" in line:
|
||||
# "CPU usage: 12.5% user, 8.3% sys, 79.1% idle"
|
||||
parts = line.split()
|
||||
try:
|
||||
user = float(parts[2].rstrip("%"))
|
||||
sys_ = float(parts[4].rstrip("%"))
|
||||
return user + sys_
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
return -1.0
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _model_size_gb(model: str) -> float:
|
||||
"""Look up approximate model size in GB by name substring."""
|
||||
lower = model.lower()
|
||||
# Exact match first
|
||||
if lower in _MODEL_SIZE_GB:
|
||||
return _MODEL_SIZE_GB[lower]
|
||||
# Substring match
|
||||
for key, size in _MODEL_SIZE_GB.items():
|
||||
if key in lower:
|
||||
return size
|
||||
return _DEFAULT_MODEL_SIZE_GB
|
||||
|
||||
def _compute_mean_efficiency_score(self) -> float:
|
||||
"""Mean efficiency score over recent samples, or -1 if none."""
|
||||
if not self._samples:
|
||||
return -1.0
|
||||
recent = list(self._samples)[-10:]
|
||||
return sum(s.efficiency_score for s in recent) / len(recent)
|
||||
|
||||
def _build_recommendation(self, score: float) -> str:
|
||||
"""Generate a human-readable recommendation from the efficiency score."""
|
||||
threshold = getattr(settings, "energy_budget_watts_threshold", 15.0)
|
||||
low_power_model = getattr(settings, "energy_low_power_model", "qwen3:1b")
|
||||
|
||||
if score < 0:
|
||||
return "No inference data yet — run some tasks to populate efficiency metrics."
|
||||
|
||||
if self._low_power_mode:
|
||||
return (
|
||||
f"Low power mode active — routing to {low_power_model}. "
|
||||
"Disable when power draw normalises."
|
||||
)
|
||||
|
||||
if score < 3.0:
|
||||
return (
|
||||
f"Low efficiency (score {score:.1f}/10). "
|
||||
f"Consider enabling low power mode to favour smaller models "
|
||||
f"(threshold: {threshold}W)."
|
||||
)
|
||||
|
||||
if score < 6.0:
|
||||
return f"Moderate efficiency (score {score:.1f}/10). System operating normally."
|
||||
|
||||
return f"Good efficiency (score {score:.1f}/10). No action needed."
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
energy_monitor = EnergyBudgetMonitor()
|
||||
247
src/infrastructure/self_correction.py
Normal file
247
src/infrastructure/self_correction.py
Normal file
@@ -0,0 +1,247 @@
|
||||
"""Self-correction event logger.
|
||||
|
||||
Records instances where the agent detected its own errors and the steps
|
||||
it took to correct them. Used by the Self-Correction Dashboard to visualise
|
||||
these events and surface recurring failure patterns.
|
||||
|
||||
Usage::
|
||||
|
||||
from infrastructure.self_correction import log_self_correction, get_corrections, get_patterns
|
||||
|
||||
log_self_correction(
|
||||
source="agentic_loop",
|
||||
original_intent="Execute step 3: deploy service",
|
||||
detected_error="ConnectionRefusedError: port 8080 unavailable",
|
||||
correction_strategy="Retry on alternate port 8081",
|
||||
final_outcome="Success on retry",
|
||||
task_id="abc123",
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from contextlib import closing, contextmanager
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Database
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_DB_PATH: Path | None = None
|
||||
|
||||
|
||||
def _get_db_path() -> Path:
|
||||
global _DB_PATH
|
||||
if _DB_PATH is None:
|
||||
from config import settings
|
||||
|
||||
_DB_PATH = Path(settings.repo_root) / "data" / "self_correction.db"
|
||||
return _DB_PATH
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _get_db() -> Generator[sqlite3.Connection, None, None]:
|
||||
db_path = _get_db_path()
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS self_correction_events (
|
||||
id TEXT PRIMARY KEY,
|
||||
source TEXT NOT NULL,
|
||||
task_id TEXT DEFAULT '',
|
||||
original_intent TEXT NOT NULL,
|
||||
detected_error TEXT NOT NULL,
|
||||
correction_strategy TEXT NOT NULL,
|
||||
final_outcome TEXT NOT NULL,
|
||||
outcome_status TEXT DEFAULT 'success',
|
||||
error_type TEXT DEFAULT '',
|
||||
created_at TEXT DEFAULT (datetime('now'))
|
||||
)
|
||||
""")
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sc_created ON self_correction_events(created_at)"
|
||||
)
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sc_error_type ON self_correction_events(error_type)"
|
||||
)
|
||||
conn.commit()
|
||||
yield conn
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Write
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def log_self_correction(
|
||||
*,
|
||||
source: str,
|
||||
original_intent: str,
|
||||
detected_error: str,
|
||||
correction_strategy: str,
|
||||
final_outcome: str,
|
||||
task_id: str = "",
|
||||
outcome_status: str = "success",
|
||||
error_type: str = "",
|
||||
) -> str:
|
||||
"""Record a self-correction event and return its ID.
|
||||
|
||||
Args:
|
||||
source: Module or component that triggered the correction.
|
||||
original_intent: What the agent was trying to do.
|
||||
detected_error: The error or problem that was detected.
|
||||
correction_strategy: How the agent attempted to correct the error.
|
||||
final_outcome: What the result of the correction attempt was.
|
||||
task_id: Optional task/session ID for correlation.
|
||||
outcome_status: 'success', 'partial', or 'failed'.
|
||||
error_type: Short category label for pattern analysis (e.g.
|
||||
'ConnectionError', 'TimeoutError').
|
||||
|
||||
Returns:
|
||||
The ID of the newly created record.
|
||||
"""
|
||||
event_id = str(uuid.uuid4())
|
||||
if not error_type:
|
||||
# Derive a simple type from the first word of the detected error
|
||||
error_type = detected_error.split(":")[0].strip()[:64]
|
||||
|
||||
try:
|
||||
with _get_db() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO self_correction_events
|
||||
(id, source, task_id, original_intent, detected_error,
|
||||
correction_strategy, final_outcome, outcome_status, error_type)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
event_id,
|
||||
source,
|
||||
task_id,
|
||||
original_intent[:2000],
|
||||
detected_error[:2000],
|
||||
correction_strategy[:2000],
|
||||
final_outcome[:2000],
|
||||
outcome_status,
|
||||
error_type,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
logger.info(
|
||||
"Self-correction logged [%s] source=%s error_type=%s status=%s",
|
||||
event_id[:8],
|
||||
source,
|
||||
error_type,
|
||||
outcome_status,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to log self-correction event: %s", exc)
|
||||
|
||||
return event_id
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Read
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def get_corrections(limit: int = 50) -> list[dict]:
|
||||
"""Return the most recent self-correction events, newest first."""
|
||||
try:
|
||||
with _get_db() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT * FROM self_correction_events
|
||||
ORDER BY created_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to fetch self-correction events: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
def get_patterns(top_n: int = 10) -> list[dict]:
|
||||
"""Return the most common recurring error types with counts.
|
||||
|
||||
Each entry has:
|
||||
- error_type: category label
|
||||
- count: total occurrences
|
||||
- success_count: corrected successfully
|
||||
- failed_count: correction also failed
|
||||
- last_seen: ISO timestamp of most recent occurrence
|
||||
"""
|
||||
try:
|
||||
with _get_db() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
error_type,
|
||||
COUNT(*) AS count,
|
||||
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
|
||||
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
|
||||
MAX(created_at) AS last_seen
|
||||
FROM self_correction_events
|
||||
GROUP BY error_type
|
||||
ORDER BY count DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(top_n,),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to fetch self-correction patterns: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
def get_stats() -> dict:
|
||||
"""Return aggregate statistics for the summary panel."""
|
||||
try:
|
||||
with _get_db() as conn:
|
||||
row = conn.execute(
|
||||
"""
|
||||
SELECT
|
||||
COUNT(*) AS total,
|
||||
SUM(CASE WHEN outcome_status = 'success' THEN 1 ELSE 0 END) AS success_count,
|
||||
SUM(CASE WHEN outcome_status = 'partial' THEN 1 ELSE 0 END) AS partial_count,
|
||||
SUM(CASE WHEN outcome_status = 'failed' THEN 1 ELSE 0 END) AS failed_count,
|
||||
COUNT(DISTINCT error_type) AS unique_error_types,
|
||||
COUNT(DISTINCT source) AS sources
|
||||
FROM self_correction_events
|
||||
"""
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return _empty_stats()
|
||||
d = dict(row)
|
||||
total = d.get("total") or 0
|
||||
if total:
|
||||
d["success_rate"] = round((d.get("success_count") or 0) / total * 100)
|
||||
else:
|
||||
d["success_rate"] = 0
|
||||
return d
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to fetch self-correction stats: %s", exc)
|
||||
return _empty_stats()
|
||||
|
||||
|
||||
def _empty_stats() -> dict:
|
||||
return {
|
||||
"total": 0,
|
||||
"success_count": 0,
|
||||
"partial_count": 0,
|
||||
"failed_count": 0,
|
||||
"unique_error_types": 0,
|
||||
"sources": 0,
|
||||
"success_rate": 0,
|
||||
}
|
||||
7
src/self_coding/__init__.py
Normal file
7
src/self_coding/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Self-coding package — Timmy's self-modification capability.
|
||||
|
||||
Provides the branch→edit→test→commit/revert loop that allows Timmy
|
||||
to propose and apply code changes autonomously, gated by the test suite.
|
||||
|
||||
Main entry point: ``self_coding.self_modify.loop``
|
||||
"""
|
||||
129
src/self_coding/gitea_client.py
Normal file
129
src/self_coding/gitea_client.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""Gitea REST client — thin wrapper for PR creation and issue commenting.
|
||||
|
||||
Uses ``settings.gitea_url``, ``settings.gitea_token``, and
|
||||
``settings.gitea_repo`` (owner/repo) from config. Degrades gracefully
|
||||
when the token is absent or the server is unreachable.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class PullRequest:
|
||||
"""Minimal representation of a created pull request."""
|
||||
|
||||
number: int
|
||||
title: str
|
||||
html_url: str
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
"""HTTP client for Gitea's REST API v1.
|
||||
|
||||
All methods return structured results and never raise — errors are
|
||||
logged at WARNING level and indicated via return value.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str | None = None,
|
||||
token: str | None = None,
|
||||
repo: str | None = None,
|
||||
) -> None:
|
||||
from config import settings
|
||||
|
||||
self._base_url = (base_url or settings.gitea_url).rstrip("/")
|
||||
self._token = token or settings.gitea_token
|
||||
self._repo = repo or settings.gitea_repo
|
||||
|
||||
# ── internal ────────────────────────────────────────────────────────────
|
||||
|
||||
def _headers(self) -> dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"token {self._token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def _api(self, path: str) -> str:
|
||||
return f"{self._base_url}/api/v1/{path.lstrip('/')}"
|
||||
|
||||
# ── public API ───────────────────────────────────────────────────────────
|
||||
|
||||
def create_pull_request(
|
||||
self,
|
||||
title: str,
|
||||
body: str,
|
||||
head: str,
|
||||
base: str = "main",
|
||||
) -> PullRequest | None:
|
||||
"""Open a pull request.
|
||||
|
||||
Args:
|
||||
title: PR title (keep under 70 chars).
|
||||
body: PR body in markdown.
|
||||
head: Source branch (e.g. ``self-modify/issue-983``).
|
||||
base: Target branch (default ``main``).
|
||||
|
||||
Returns:
|
||||
A ``PullRequest`` dataclass on success, ``None`` on failure.
|
||||
"""
|
||||
if not self._token:
|
||||
logger.warning("Gitea token not configured — skipping PR creation")
|
||||
return None
|
||||
|
||||
try:
|
||||
import requests as _requests
|
||||
|
||||
resp = _requests.post(
|
||||
self._api(f"repos/{self._repo}/pulls"),
|
||||
headers=self._headers(),
|
||||
json={"title": title, "body": body, "head": head, "base": base},
|
||||
timeout=15,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
pr = PullRequest(
|
||||
number=data["number"],
|
||||
title=data["title"],
|
||||
html_url=data["html_url"],
|
||||
)
|
||||
logger.info("PR #%d created: %s", pr.number, pr.html_url)
|
||||
return pr
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create PR: %s", exc)
|
||||
return None
|
||||
|
||||
def add_issue_comment(self, issue_number: int, body: str) -> bool:
|
||||
"""Post a comment on an issue or PR.
|
||||
|
||||
Returns:
|
||||
True on success, False on failure.
|
||||
"""
|
||||
if not self._token:
|
||||
logger.warning("Gitea token not configured — skipping issue comment")
|
||||
return False
|
||||
|
||||
try:
|
||||
import requests as _requests
|
||||
|
||||
resp = _requests.post(
|
||||
self._api(f"repos/{self._repo}/issues/{issue_number}/comments"),
|
||||
headers=self._headers(),
|
||||
json={"body": body},
|
||||
timeout=15,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
logger.info("Comment posted on issue #%d", issue_number)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post comment on issue #%d: %s", issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
gitea_client = GiteaClient()
|
||||
1
src/self_coding/self_modify/__init__.py
Normal file
1
src/self_coding/self_modify/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Self-modification loop sub-package."""
|
||||
301
src/self_coding/self_modify/loop.py
Normal file
301
src/self_coding/self_modify/loop.py
Normal file
@@ -0,0 +1,301 @@
|
||||
"""Self-modification loop — branch → edit → test → commit/revert.
|
||||
|
||||
Timmy's self-coding capability, restored after deletion in
|
||||
Operation Darling Purge (commit 584eeb679e88).
|
||||
|
||||
## Cycle
|
||||
1. **Branch** — create ``self-modify/<slug>`` from ``main``
|
||||
2. **Edit** — apply the proposed change (patch string or callable)
|
||||
3. **Test** — run ``pytest tests/ -x -q``; never commit on failure
|
||||
4. **Commit** — stage and commit on green; revert branch on red
|
||||
5. **PR** — open a Gitea pull request (requires no direct push to main)
|
||||
|
||||
## Guards
|
||||
- Never push directly to ``main`` or ``master``
|
||||
- All changes land via PR (enforced by ``_guard_branch``)
|
||||
- Test gate is mandatory; ``skip_tests=True`` is for unit-test use only
|
||||
- Commits only happen when ``pytest tests/ -x -q`` exits 0
|
||||
|
||||
## Usage::
|
||||
|
||||
from self_coding.self_modify.loop import SelfModifyLoop
|
||||
|
||||
loop = SelfModifyLoop()
|
||||
result = await loop.run(
|
||||
slug="add-hello-tool",
|
||||
description="Add hello() convenience tool",
|
||||
edit_fn=my_edit_function, # callable(repo_root: str) -> None
|
||||
)
|
||||
if result.success:
|
||||
print(f"PR: {result.pr_url}")
|
||||
else:
|
||||
print(f"Failed: {result.error}")
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import subprocess
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Branches that must never receive direct commits
|
||||
_PROTECTED_BRANCHES = frozenset({"main", "master", "develop"})
|
||||
|
||||
# Test command used as the commit gate
|
||||
_TEST_COMMAND = ["pytest", "tests/", "-x", "-q", "--tb=short"]
|
||||
|
||||
# Max time (seconds) to wait for the test suite
|
||||
_TEST_TIMEOUT = 300
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoopResult:
|
||||
"""Result from one self-modification cycle."""
|
||||
|
||||
success: bool
|
||||
branch: str = ""
|
||||
commit_sha: str = ""
|
||||
pr_url: str = ""
|
||||
pr_number: int = 0
|
||||
test_output: str = ""
|
||||
error: str = ""
|
||||
elapsed_ms: float = 0.0
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
|
||||
class SelfModifyLoop:
|
||||
"""Orchestrate branch → edit → test → commit/revert → PR.
|
||||
|
||||
Args:
|
||||
repo_root: Absolute path to the git repository (defaults to
|
||||
``settings.repo_root``).
|
||||
remote: Git remote name (default ``origin``).
|
||||
base_branch: Branch to fork from and target for the PR
|
||||
(default ``main``).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo_root: str | None = None,
|
||||
remote: str = "origin",
|
||||
base_branch: str = "main",
|
||||
) -> None:
|
||||
self._repo_root = Path(repo_root or settings.repo_root)
|
||||
self._remote = remote
|
||||
self._base_branch = base_branch
|
||||
|
||||
# ── public ──────────────────────────────────────────────────────────────
|
||||
|
||||
async def run(
|
||||
self,
|
||||
slug: str,
|
||||
description: str,
|
||||
edit_fn: Callable[[str], None],
|
||||
issue_number: int | None = None,
|
||||
skip_tests: bool = False,
|
||||
) -> LoopResult:
|
||||
"""Execute one full self-modification cycle.
|
||||
|
||||
Args:
|
||||
slug: Short identifier used for the branch name
|
||||
(e.g. ``"add-hello-tool"``).
|
||||
description: Human-readable description for commit message
|
||||
and PR body.
|
||||
edit_fn: Callable that receives the repo root path (str)
|
||||
and applies the desired code changes in-place.
|
||||
issue_number: Optional Gitea issue number to reference in PR.
|
||||
skip_tests: If ``True``, skip the test gate (unit-test use
|
||||
only — never use in production).
|
||||
|
||||
Returns:
|
||||
:class:`LoopResult` describing the outcome.
|
||||
"""
|
||||
start = time.time()
|
||||
branch = f"self-modify/{slug}"
|
||||
|
||||
try:
|
||||
self._guard_branch(branch)
|
||||
self._checkout_base()
|
||||
self._create_branch(branch)
|
||||
|
||||
try:
|
||||
edit_fn(str(self._repo_root))
|
||||
except Exception as exc:
|
||||
self._revert_branch(branch)
|
||||
return LoopResult(
|
||||
success=False,
|
||||
branch=branch,
|
||||
error=f"edit_fn raised: {exc}",
|
||||
elapsed_ms=self._elapsed(start),
|
||||
)
|
||||
|
||||
if not skip_tests:
|
||||
test_output, passed = self._run_tests()
|
||||
if not passed:
|
||||
self._revert_branch(branch)
|
||||
return LoopResult(
|
||||
success=False,
|
||||
branch=branch,
|
||||
test_output=test_output,
|
||||
error="Tests failed — branch reverted",
|
||||
elapsed_ms=self._elapsed(start),
|
||||
)
|
||||
else:
|
||||
test_output = "(tests skipped)"
|
||||
|
||||
sha = self._commit_all(description)
|
||||
self._push_branch(branch)
|
||||
|
||||
pr = self._create_pr(
|
||||
branch=branch,
|
||||
description=description,
|
||||
test_output=test_output,
|
||||
issue_number=issue_number,
|
||||
)
|
||||
|
||||
return LoopResult(
|
||||
success=True,
|
||||
branch=branch,
|
||||
commit_sha=sha,
|
||||
pr_url=pr.html_url if pr else "",
|
||||
pr_number=pr.number if pr else 0,
|
||||
test_output=test_output,
|
||||
elapsed_ms=self._elapsed(start),
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Self-modify loop failed: %s", exc)
|
||||
return LoopResult(
|
||||
success=False,
|
||||
branch=branch,
|
||||
error=str(exc),
|
||||
elapsed_ms=self._elapsed(start),
|
||||
)
|
||||
|
||||
# ── private helpers ──────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _elapsed(start: float) -> float:
|
||||
return (time.time() - start) * 1000
|
||||
|
||||
def _git(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
|
||||
"""Run a git command in the repo root."""
|
||||
cmd = ["git", *args]
|
||||
logger.debug("git %s", " ".join(args))
|
||||
return subprocess.run(
|
||||
cmd,
|
||||
cwd=str(self._repo_root),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=check,
|
||||
)
|
||||
|
||||
def _guard_branch(self, branch: str) -> None:
|
||||
"""Raise if the target branch is a protected branch name."""
|
||||
if branch in _PROTECTED_BRANCHES:
|
||||
raise ValueError(
|
||||
f"Refusing to operate on protected branch '{branch}'. "
|
||||
"All self-modifications must go via PR."
|
||||
)
|
||||
|
||||
def _checkout_base(self) -> None:
|
||||
"""Checkout the base branch and pull latest."""
|
||||
self._git("checkout", self._base_branch)
|
||||
# Best-effort pull; ignore failures (e.g. no remote configured)
|
||||
self._git("pull", self._remote, self._base_branch, check=False)
|
||||
|
||||
def _create_branch(self, branch: str) -> None:
|
||||
"""Create and checkout a new branch, deleting an old one if needed."""
|
||||
# Delete local branch if it already exists (stale prior attempt)
|
||||
self._git("branch", "-D", branch, check=False)
|
||||
self._git("checkout", "-b", branch)
|
||||
logger.info("Created branch: %s", branch)
|
||||
|
||||
def _revert_branch(self, branch: str) -> None:
|
||||
"""Checkout base and delete the failed branch."""
|
||||
try:
|
||||
self._git("checkout", self._base_branch, check=False)
|
||||
self._git("branch", "-D", branch, check=False)
|
||||
logger.info("Reverted and deleted branch: %s", branch)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to revert branch %s: %s", branch, exc)
|
||||
|
||||
def _run_tests(self) -> tuple[str, bool]:
|
||||
"""Run the test suite. Returns (output, passed)."""
|
||||
logger.info("Running test suite: %s", " ".join(_TEST_COMMAND))
|
||||
try:
|
||||
result = subprocess.run(
|
||||
_TEST_COMMAND,
|
||||
cwd=str(self._repo_root),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_TEST_TIMEOUT,
|
||||
)
|
||||
output = (result.stdout + "\n" + result.stderr).strip()
|
||||
passed = result.returncode == 0
|
||||
logger.info(
|
||||
"Test suite %s (exit %d)", "PASSED" if passed else "FAILED", result.returncode
|
||||
)
|
||||
return output, passed
|
||||
except subprocess.TimeoutExpired:
|
||||
msg = f"Test suite timed out after {_TEST_TIMEOUT}s"
|
||||
logger.warning(msg)
|
||||
return msg, False
|
||||
except FileNotFoundError:
|
||||
msg = "pytest not found on PATH"
|
||||
logger.warning(msg)
|
||||
return msg, False
|
||||
|
||||
def _commit_all(self, message: str) -> str:
|
||||
"""Stage all changes and create a commit. Returns the new SHA."""
|
||||
self._git("add", "-A")
|
||||
self._git("commit", "-m", message)
|
||||
result = self._git("rev-parse", "HEAD")
|
||||
sha = result.stdout.strip()
|
||||
logger.info("Committed: %s sha=%s", message[:60], sha[:12])
|
||||
return sha
|
||||
|
||||
def _push_branch(self, branch: str) -> None:
|
||||
"""Push the branch to the remote."""
|
||||
self._git("push", "-u", self._remote, branch)
|
||||
logger.info("Pushed branch: %s -> %s", branch, self._remote)
|
||||
|
||||
def _create_pr(
|
||||
self,
|
||||
branch: str,
|
||||
description: str,
|
||||
test_output: str,
|
||||
issue_number: int | None,
|
||||
):
|
||||
"""Open a Gitea PR. Returns PullRequest or None on failure."""
|
||||
from self_coding.gitea_client import GiteaClient
|
||||
|
||||
client = GiteaClient()
|
||||
|
||||
issue_ref = f"\n\nFixes #{issue_number}" if issue_number else ""
|
||||
test_section = (
|
||||
f"\n\n## Test results\n```\n{test_output[:2000]}\n```"
|
||||
if test_output and test_output != "(tests skipped)"
|
||||
else ""
|
||||
)
|
||||
|
||||
body = (
|
||||
f"## Summary\n{description}"
|
||||
f"{issue_ref}"
|
||||
f"{test_section}"
|
||||
"\n\n🤖 Generated by Timmy's self-modification loop"
|
||||
)
|
||||
|
||||
return client.create_pull_request(
|
||||
title=f"[self-modify] {description[:60]}",
|
||||
body=body,
|
||||
head=branch,
|
||||
base=self._base_branch,
|
||||
)
|
||||
@@ -312,6 +312,13 @@ async def _handle_step_failure(
|
||||
"adaptation": step.result[:200],
|
||||
},
|
||||
)
|
||||
_log_self_correction(
|
||||
task_id=task_id,
|
||||
step_desc=step_desc,
|
||||
exc=exc,
|
||||
outcome=step.result,
|
||||
outcome_status="success",
|
||||
)
|
||||
if on_progress:
|
||||
await on_progress(f"[Adapted] {step_desc}", step_num, total_steps)
|
||||
except Exception as adapt_exc: # broad catch intentional
|
||||
@@ -325,9 +332,42 @@ async def _handle_step_failure(
|
||||
duration_ms=int((time.monotonic() - step_start) * 1000),
|
||||
)
|
||||
)
|
||||
_log_self_correction(
|
||||
task_id=task_id,
|
||||
step_desc=step_desc,
|
||||
exc=exc,
|
||||
outcome=f"Adaptation also failed: {adapt_exc}",
|
||||
outcome_status="failed",
|
||||
)
|
||||
completed_results.append(f"Step {step_num}: FAILED")
|
||||
|
||||
|
||||
def _log_self_correction(
|
||||
*,
|
||||
task_id: str,
|
||||
step_desc: str,
|
||||
exc: Exception,
|
||||
outcome: str,
|
||||
outcome_status: str,
|
||||
) -> None:
|
||||
"""Best-effort: log a self-correction event (never raises)."""
|
||||
try:
|
||||
from infrastructure.self_correction import log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="agentic_loop",
|
||||
original_intent=step_desc,
|
||||
detected_error=f"{type(exc).__name__}: {exc}",
|
||||
correction_strategy="Adaptive re-plan via LLM",
|
||||
final_outcome=outcome[:500],
|
||||
task_id=task_id,
|
||||
outcome_status=outcome_status,
|
||||
error_type=type(exc).__name__,
|
||||
)
|
||||
except Exception as log_exc:
|
||||
logger.debug("Self-correction log failed: %s", log_exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core loop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1,435 +0,0 @@
|
||||
"""Dreaming Mode — idle-time session replay and counterfactual simulation.
|
||||
|
||||
When the dashboard has been idle for a configurable period, this engine
|
||||
selects a past chat session, identifies key agent response points, and
|
||||
asks the LLM to simulate alternative approaches. Insights are stored as
|
||||
proposed rules that can feed the auto-crystallizer or memory system.
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.dreaming import dreaming_engine
|
||||
|
||||
# Run one dream cycle (called by the background scheduler)
|
||||
await dreaming_engine.dream_once()
|
||||
|
||||
# Query recent dreams
|
||||
dreams = dreaming_engine.get_recent_dreams(limit=10)
|
||||
|
||||
# Get current status dict for API/dashboard
|
||||
status = dreaming_engine.get_status()
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_DB = Path("data/dreams.db")
|
||||
|
||||
# Strip <think> tags from reasoning model output
|
||||
_THINK_TAG_RE = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
|
||||
|
||||
# Minimum messages in a session to be worth replaying
|
||||
_MIN_SESSION_MESSAGES = 3
|
||||
|
||||
# Gap in seconds between messages that signals a new session
|
||||
_SESSION_GAP_SECONDS = 1800 # 30 minutes
|
||||
|
||||
|
||||
@dataclass
|
||||
class DreamRecord:
|
||||
"""A single completed dream cycle."""
|
||||
|
||||
id: str
|
||||
session_excerpt: str # Short excerpt from the replayed session
|
||||
decision_point: str # The agent message that was re-simulated
|
||||
simulation: str # The alternative response generated
|
||||
proposed_rule: str # Rule extracted from the simulation
|
||||
created_at: str
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _get_conn(db_path: Path = _DEFAULT_DB) -> Generator[sqlite3.Connection, None, None]:
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS dreams (
|
||||
id TEXT PRIMARY KEY,
|
||||
session_excerpt TEXT NOT NULL,
|
||||
decision_point TEXT NOT NULL,
|
||||
simulation TEXT NOT NULL,
|
||||
proposed_rule TEXT NOT NULL DEFAULT '',
|
||||
created_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_dreams_time ON dreams(created_at)")
|
||||
conn.commit()
|
||||
yield conn
|
||||
|
||||
|
||||
def _row_to_dream(row: sqlite3.Row) -> DreamRecord:
|
||||
return DreamRecord(
|
||||
id=row["id"],
|
||||
session_excerpt=row["session_excerpt"],
|
||||
decision_point=row["decision_point"],
|
||||
simulation=row["simulation"],
|
||||
proposed_rule=row["proposed_rule"],
|
||||
created_at=row["created_at"],
|
||||
)
|
||||
|
||||
|
||||
class DreamingEngine:
|
||||
"""Idle-time dreaming engine — replays sessions and simulates alternatives."""
|
||||
|
||||
def __init__(self, db_path: Path = _DEFAULT_DB) -> None:
|
||||
self._db_path = db_path
|
||||
self._last_activity_time: datetime = datetime.now(UTC)
|
||||
self._is_dreaming: bool = False
|
||||
self._current_dream_summary: str = ""
|
||||
self._dreaming_agent = None # Lazy-initialised
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────
|
||||
|
||||
def record_activity(self) -> None:
|
||||
"""Reset the idle timer — call this on every user/agent interaction."""
|
||||
self._last_activity_time = datetime.now(UTC)
|
||||
|
||||
def is_idle(self) -> bool:
|
||||
"""Return True if the system has been idle long enough to start dreaming."""
|
||||
threshold = settings.dreaming_idle_threshold_minutes
|
||||
if threshold <= 0:
|
||||
return False
|
||||
return datetime.now(UTC) - self._last_activity_time > timedelta(minutes=threshold)
|
||||
|
||||
def get_status(self) -> dict[str, Any]:
|
||||
"""Return a status dict suitable for API/dashboard consumption."""
|
||||
return {
|
||||
"enabled": settings.dreaming_enabled,
|
||||
"dreaming": self._is_dreaming,
|
||||
"idle": self.is_idle(),
|
||||
"current_summary": self._current_dream_summary,
|
||||
"idle_minutes": int(
|
||||
(datetime.now(UTC) - self._last_activity_time).total_seconds() / 60
|
||||
),
|
||||
"idle_threshold_minutes": settings.dreaming_idle_threshold_minutes,
|
||||
"dream_count": self.count_dreams(),
|
||||
}
|
||||
|
||||
async def dream_once(self) -> DreamRecord | None:
|
||||
"""Execute one dream cycle.
|
||||
|
||||
Returns the stored DreamRecord, or None if the cycle was skipped
|
||||
(not idle, dreaming disabled, no suitable session, or LLM error).
|
||||
"""
|
||||
if not settings.dreaming_enabled:
|
||||
return None
|
||||
|
||||
if not self.is_idle():
|
||||
logger.debug(
|
||||
"Dreaming skipped — system active (idle for %d min, threshold %d min)",
|
||||
int((datetime.now(UTC) - self._last_activity_time).total_seconds() / 60),
|
||||
settings.dreaming_idle_threshold_minutes,
|
||||
)
|
||||
return None
|
||||
|
||||
if self._is_dreaming:
|
||||
logger.debug("Dreaming skipped — cycle already in progress")
|
||||
return None
|
||||
|
||||
self._is_dreaming = True
|
||||
self._current_dream_summary = "Selecting a past session…"
|
||||
await self._broadcast_status()
|
||||
|
||||
try:
|
||||
return await self._run_dream_cycle()
|
||||
except Exception as exc:
|
||||
logger.warning("Dream cycle failed: %s", exc)
|
||||
return None
|
||||
finally:
|
||||
self._is_dreaming = False
|
||||
self._current_dream_summary = ""
|
||||
await self._broadcast_status()
|
||||
|
||||
def get_recent_dreams(self, limit: int = 20) -> list[DreamRecord]:
|
||||
"""Retrieve the most recent dream records."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT * FROM dreams ORDER BY created_at DESC LIMIT ?",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
return [_row_to_dream(r) for r in rows]
|
||||
|
||||
def count_dreams(self) -> int:
|
||||
"""Return total number of stored dream records."""
|
||||
with _get_conn(self._db_path) as conn:
|
||||
row = conn.execute("SELECT COUNT(*) AS c FROM dreams").fetchone()
|
||||
return row["c"] if row else 0
|
||||
|
||||
# ── Private helpers ───────────────────────────────────────────────────
|
||||
|
||||
async def _run_dream_cycle(self) -> DreamRecord | None:
|
||||
"""Core dream logic: select → simulate → store."""
|
||||
# 1. Select a past session from the chat log
|
||||
session = await self._select_session()
|
||||
if not session:
|
||||
logger.debug("No suitable chat session found for dreaming")
|
||||
self._current_dream_summary = "No past sessions to replay"
|
||||
return None
|
||||
|
||||
decision_point, session_excerpt = session
|
||||
|
||||
self._current_dream_summary = f"Simulating alternative for: {decision_point[:60]}…"
|
||||
await self._broadcast_status()
|
||||
|
||||
# 2. Simulate an alternative response
|
||||
simulation = await self._simulate_alternative(decision_point, session_excerpt)
|
||||
if not simulation:
|
||||
logger.debug("Dream simulation produced no output")
|
||||
return None
|
||||
|
||||
# 3. Extract a proposed rule
|
||||
proposed_rule = await self._extract_rule(decision_point, simulation)
|
||||
|
||||
# 4. Store and broadcast
|
||||
dream = self._store_dream(
|
||||
session_excerpt=session_excerpt,
|
||||
decision_point=decision_point,
|
||||
simulation=simulation,
|
||||
proposed_rule=proposed_rule,
|
||||
)
|
||||
|
||||
self._current_dream_summary = f"Dream complete: {proposed_rule[:80]}" if proposed_rule else "Dream complete"
|
||||
|
||||
logger.info(
|
||||
"Dream [%s]: replayed session, proposed rule: %s",
|
||||
dream.id[:8],
|
||||
proposed_rule[:80] if proposed_rule else "(none)",
|
||||
)
|
||||
|
||||
await self._broadcast_status()
|
||||
await self._broadcast_dream(dream)
|
||||
return dream
|
||||
|
||||
async def _select_session(self) -> tuple[str, str] | None:
|
||||
"""Select a past chat session and return (decision_point, session_excerpt).
|
||||
|
||||
Uses the SQLite chat store. Groups messages into sessions by time
|
||||
gap. Picks a random session with enough messages, then selects one
|
||||
agent response as the decision point.
|
||||
"""
|
||||
try:
|
||||
from infrastructure.chat_store import DB_PATH
|
||||
|
||||
if not DB_PATH.exists():
|
||||
return None
|
||||
|
||||
import asyncio
|
||||
rows = await asyncio.to_thread(self._load_chat_rows)
|
||||
if not rows:
|
||||
return None
|
||||
|
||||
sessions = self._group_into_sessions(rows)
|
||||
if not sessions:
|
||||
return None
|
||||
|
||||
# Filter sessions with enough messages
|
||||
valid = [s for s in sessions if len(s) >= _MIN_SESSION_MESSAGES]
|
||||
if not valid:
|
||||
return None
|
||||
|
||||
import random
|
||||
session = random.choice(valid) # noqa: S311 (not cryptographic)
|
||||
|
||||
# Build a short text excerpt (last N messages)
|
||||
excerpt_msgs = session[-6:]
|
||||
excerpt = "\n".join(
|
||||
f"{m['role'].upper()}: {m['content'][:200]}" for m in excerpt_msgs
|
||||
)
|
||||
|
||||
# Find agent responses as candidate decision points
|
||||
agent_msgs = [m for m in session if m["role"] in ("agent", "assistant")]
|
||||
if not agent_msgs:
|
||||
return None
|
||||
|
||||
decision = random.choice(agent_msgs) # noqa: S311
|
||||
return decision["content"], excerpt
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Session selection failed: %s", exc)
|
||||
return None
|
||||
|
||||
def _load_chat_rows(self) -> list[dict]:
|
||||
"""Synchronously load chat messages from SQLite."""
|
||||
from infrastructure.chat_store import DB_PATH
|
||||
|
||||
with closing(sqlite3.connect(str(DB_PATH))) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"SELECT role, content, timestamp FROM chat_messages "
|
||||
"ORDER BY timestamp ASC"
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
def _group_into_sessions(self, rows: list[dict]) -> list[list[dict]]:
|
||||
"""Group chat rows into sessions based on time gaps."""
|
||||
if not rows:
|
||||
return []
|
||||
|
||||
sessions: list[list[dict]] = []
|
||||
current: list[dict] = [rows[0]]
|
||||
|
||||
for prev, curr in zip(rows, rows[1:]):
|
||||
try:
|
||||
t_prev = datetime.fromisoformat(prev["timestamp"].replace("Z", "+00:00"))
|
||||
t_curr = datetime.fromisoformat(curr["timestamp"].replace("Z", "+00:00"))
|
||||
gap = (t_curr - t_prev).total_seconds()
|
||||
except Exception:
|
||||
gap = 0
|
||||
|
||||
if gap > _SESSION_GAP_SECONDS:
|
||||
sessions.append(current)
|
||||
current = [curr]
|
||||
else:
|
||||
current.append(curr)
|
||||
|
||||
sessions.append(current)
|
||||
return sessions
|
||||
|
||||
async def _simulate_alternative(
|
||||
self, decision_point: str, session_excerpt: str
|
||||
) -> str:
|
||||
"""Ask the LLM to simulate an alternative response."""
|
||||
prompt = (
|
||||
"You are Timmy, a sovereign AI agent in a dreaming state.\n"
|
||||
"You are replaying a past conversation and exploring what you could "
|
||||
"have done differently at a key decision point.\n\n"
|
||||
"PAST SESSION EXCERPT:\n"
|
||||
f"{session_excerpt}\n\n"
|
||||
"KEY DECISION POINT (your past response):\n"
|
||||
f"{decision_point[:500]}\n\n"
|
||||
"TASK: In 2-3 sentences, describe ONE concrete alternative approach "
|
||||
"you could have taken at this decision point that would have been "
|
||||
"more helpful, more accurate, or more efficient.\n"
|
||||
"Be specific — reference the actual content of the conversation.\n"
|
||||
"Do NOT include meta-commentary about dreaming or this exercise.\n\n"
|
||||
"Alternative approach:"
|
||||
)
|
||||
|
||||
raw = await self._call_agent(prompt)
|
||||
return _THINK_TAG_RE.sub("", raw).strip() if raw else ""
|
||||
|
||||
async def _extract_rule(self, decision_point: str, simulation: str) -> str:
|
||||
"""Extract a proposed behaviour rule from the simulation."""
|
||||
prompt = (
|
||||
"Given this pair of agent responses:\n\n"
|
||||
f"ORIGINAL: {decision_point[:300]}\n\n"
|
||||
f"IMPROVED ALTERNATIVE: {simulation[:400]}\n\n"
|
||||
"Extract ONE concise rule (max 20 words) that captures what to do "
|
||||
"differently next time. Format: 'When X, do Y instead of Z.'\n"
|
||||
"Rule:"
|
||||
)
|
||||
|
||||
raw = await self._call_agent(prompt)
|
||||
rule = _THINK_TAG_RE.sub("", raw).strip() if raw else ""
|
||||
# Keep only the first sentence/line
|
||||
rule = rule.split("\n")[0].strip().rstrip(".")
|
||||
return rule[:200] # Safety cap
|
||||
|
||||
async def _call_agent(self, prompt: str) -> str:
|
||||
"""Call the Timmy agent for a dreaming prompt (skip MCP, 60 s timeout)."""
|
||||
import asyncio
|
||||
|
||||
if self._dreaming_agent is None:
|
||||
from timmy.agent import create_timmy
|
||||
|
||||
self._dreaming_agent = create_timmy(skip_mcp=True)
|
||||
|
||||
try:
|
||||
async with asyncio.timeout(settings.dreaming_timeout_seconds):
|
||||
run = await self._dreaming_agent.arun(prompt, stream=False)
|
||||
except TimeoutError:
|
||||
logger.warning("Dreaming LLM call timed out after %ds", settings.dreaming_timeout_seconds)
|
||||
return ""
|
||||
except Exception as exc:
|
||||
logger.warning("Dreaming LLM call failed: %s", exc)
|
||||
return ""
|
||||
|
||||
raw = run.content if hasattr(run, "content") else str(run)
|
||||
return raw or ""
|
||||
|
||||
def _store_dream(
|
||||
self,
|
||||
*,
|
||||
session_excerpt: str,
|
||||
decision_point: str,
|
||||
simulation: str,
|
||||
proposed_rule: str,
|
||||
) -> DreamRecord:
|
||||
dream = DreamRecord(
|
||||
id=str(uuid.uuid4()),
|
||||
session_excerpt=session_excerpt,
|
||||
decision_point=decision_point,
|
||||
simulation=simulation,
|
||||
proposed_rule=proposed_rule,
|
||||
created_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
with _get_conn(self._db_path) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO dreams
|
||||
(id, session_excerpt, decision_point, simulation, proposed_rule, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
dream.id,
|
||||
dream.session_excerpt,
|
||||
dream.decision_point,
|
||||
dream.simulation,
|
||||
dream.proposed_rule,
|
||||
dream.created_at,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
return dream
|
||||
|
||||
async def _broadcast_status(self) -> None:
|
||||
"""Push current dreaming status via WebSocket."""
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast("dreaming_state", self.get_status())
|
||||
except Exception as exc:
|
||||
logger.debug("Dreaming status broadcast failed: %s", exc)
|
||||
|
||||
async def _broadcast_dream(self, dream: DreamRecord) -> None:
|
||||
"""Push a completed dream record via WebSocket."""
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast(
|
||||
"dreaming_complete",
|
||||
{
|
||||
"id": dream.id,
|
||||
"proposed_rule": dream.proposed_rule,
|
||||
"simulation": dream.simulation[:200],
|
||||
"created_at": dream.created_at,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Dreaming complete broadcast failed: %s", exc)
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
dreaming_engine = DreamingEngine()
|
||||
@@ -8,4 +8,23 @@ Refs: #954, #953
|
||||
Three-strike detector and automation enforcement.
|
||||
|
||||
Refs: #962
|
||||
|
||||
Session reporting: auto-generates markdown scorecards at session end
|
||||
and commits them to the Gitea repo for institutional memory.
|
||||
|
||||
Refs: #957 (Session Sovereignty Report Generator)
|
||||
"""
|
||||
|
||||
from timmy.sovereignty.session_report import (
|
||||
commit_report,
|
||||
generate_and_commit_report,
|
||||
generate_report,
|
||||
mark_session_start,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"generate_report",
|
||||
"commit_report",
|
||||
"generate_and_commit_report",
|
||||
"mark_session_start",
|
||||
]
|
||||
|
||||
442
src/timmy/sovereignty/session_report.py
Normal file
442
src/timmy/sovereignty/session_report.py
Normal file
@@ -0,0 +1,442 @@
|
||||
"""Session Sovereignty Report Generator.
|
||||
|
||||
Auto-generates a sovereignty scorecard at the end of each play session
|
||||
and commits it as a markdown file to the Gitea repo under
|
||||
``reports/sovereignty/``.
|
||||
|
||||
Report contents (per issue #957):
|
||||
- Session duration + game played
|
||||
- Total model calls by type (VLM, LLM, TTS, API)
|
||||
- Total cache/rule hits by type
|
||||
- New skills crystallized (placeholder — pending skill-tracking impl)
|
||||
- Sovereignty delta (change from session start → end)
|
||||
- Cost breakdown (actual API spend)
|
||||
- Per-layer sovereignty %: perception, decision, narration
|
||||
- Trend comparison vs previous session
|
||||
|
||||
Refs: #957 (Sovereignty P0) · #953 (The Sovereignty Loop)
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
|
||||
# Optional module-level imports — degrade gracefully if unavailable at import time
|
||||
try:
|
||||
from timmy.session_logger import get_session_logger
|
||||
except Exception: # ImportError or circular import during early startup
|
||||
get_session_logger = None # type: ignore[assignment]
|
||||
|
||||
try:
|
||||
from infrastructure.sovereignty_metrics import GRADUATION_TARGETS, get_sovereignty_store
|
||||
except Exception:
|
||||
GRADUATION_TARGETS: dict = {} # type: ignore[assignment]
|
||||
get_sovereignty_store = None # type: ignore[assignment]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level session start time; set by mark_session_start()
|
||||
_SESSION_START: datetime | None = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def mark_session_start() -> None:
|
||||
"""Record the session start wall-clock time.
|
||||
|
||||
Call once during application startup so ``generate_report()`` can
|
||||
compute accurate session durations.
|
||||
"""
|
||||
global _SESSION_START
|
||||
_SESSION_START = datetime.now(UTC)
|
||||
logger.debug("Sovereignty: session start recorded at %s", _SESSION_START.isoformat())
|
||||
|
||||
|
||||
def generate_report(session_id: str = "dashboard") -> str:
|
||||
"""Render a sovereignty scorecard as a markdown string.
|
||||
|
||||
Pulls from:
|
||||
- ``timmy.session_logger`` — message/tool-call/error counts
|
||||
- ``infrastructure.sovereignty_metrics`` — cache hit rate, API cost,
|
||||
graduation phase, and trend data
|
||||
|
||||
Args:
|
||||
session_id: The session identifier (default: "dashboard").
|
||||
|
||||
Returns:
|
||||
Markdown-formatted sovereignty report string.
|
||||
"""
|
||||
now = datetime.now(UTC)
|
||||
session_start = _SESSION_START or now
|
||||
duration_secs = (now - session_start).total_seconds()
|
||||
|
||||
session_data = _gather_session_data()
|
||||
sov_data = _gather_sovereignty_data()
|
||||
|
||||
return _render_markdown(now, session_id, duration_secs, session_data, sov_data)
|
||||
|
||||
|
||||
def commit_report(report_md: str, session_id: str = "dashboard") -> bool:
|
||||
"""Commit a sovereignty report to the Gitea repo.
|
||||
|
||||
Creates or updates ``reports/sovereignty/{date}_{session_id}.md``
|
||||
via the Gitea Contents API. Degrades gracefully: logs a warning
|
||||
and returns ``False`` if Gitea is unreachable or misconfigured.
|
||||
|
||||
Args:
|
||||
report_md: Markdown content to commit.
|
||||
session_id: Session identifier used in the filename.
|
||||
|
||||
Returns:
|
||||
``True`` on success, ``False`` on failure.
|
||||
"""
|
||||
if not settings.gitea_enabled:
|
||||
logger.info("Sovereignty: Gitea disabled — skipping report commit")
|
||||
return False
|
||||
|
||||
if not settings.gitea_token:
|
||||
logger.warning("Sovereignty: no Gitea token — skipping report commit")
|
||||
return False
|
||||
|
||||
date_str = datetime.now(UTC).strftime("%Y-%m-%d")
|
||||
file_path = f"reports/sovereignty/{date_str}_{session_id}.md"
|
||||
url = f"{settings.gitea_url}/api/v1/repos/{settings.gitea_repo}/contents/{file_path}"
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
encoded_content = base64.b64encode(report_md.encode()).decode()
|
||||
commit_message = (
|
||||
f"report: sovereignty session {session_id} ({date_str})\n\n"
|
||||
f"Auto-generated by Timmy. Refs #957"
|
||||
)
|
||||
payload: dict[str, Any] = {
|
||||
"message": commit_message,
|
||||
"content": encoded_content,
|
||||
}
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=10.0) as client:
|
||||
# Fetch existing file SHA so we can update rather than create
|
||||
check = client.get(url, headers=headers)
|
||||
if check.status_code == 200:
|
||||
existing = check.json()
|
||||
payload["sha"] = existing.get("sha", "")
|
||||
|
||||
resp = client.put(url, headers=headers, json=payload)
|
||||
resp.raise_for_status()
|
||||
|
||||
logger.info("Sovereignty: report committed to %s", file_path)
|
||||
return True
|
||||
|
||||
except httpx.HTTPStatusError as exc:
|
||||
logger.warning(
|
||||
"Sovereignty: commit failed (HTTP %s): %s",
|
||||
exc.response.status_code,
|
||||
exc,
|
||||
)
|
||||
return False
|
||||
except Exception as exc:
|
||||
logger.warning("Sovereignty: commit failed: %s", exc)
|
||||
return False
|
||||
|
||||
|
||||
async def generate_and_commit_report(session_id: str = "dashboard") -> bool:
|
||||
"""Generate and commit a sovereignty report for the current session.
|
||||
|
||||
Primary entry point — call at session end / application shutdown.
|
||||
Wraps the synchronous ``commit_report`` call in ``asyncio.to_thread``
|
||||
so it does not block the event loop.
|
||||
|
||||
Args:
|
||||
session_id: The session identifier.
|
||||
|
||||
Returns:
|
||||
``True`` if the report was generated and committed successfully.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
try:
|
||||
report_md = generate_report(session_id)
|
||||
logger.info("Sovereignty: report generated (%d chars)", len(report_md))
|
||||
committed = await asyncio.to_thread(commit_report, report_md, session_id)
|
||||
return committed
|
||||
except Exception as exc:
|
||||
logger.warning("Sovereignty: report generation failed: %s", exc)
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _format_duration(seconds: float) -> str:
|
||||
"""Format a duration in seconds as a human-readable string."""
|
||||
total = int(seconds)
|
||||
hours, remainder = divmod(total, 3600)
|
||||
minutes, secs = divmod(remainder, 60)
|
||||
if hours:
|
||||
return f"{hours}h {minutes}m {secs}s"
|
||||
if minutes:
|
||||
return f"{minutes}m {secs}s"
|
||||
return f"{secs}s"
|
||||
|
||||
|
||||
def _gather_session_data() -> dict[str, Any]:
|
||||
"""Pull session statistics from the session logger.
|
||||
|
||||
Returns a dict with:
|
||||
- ``user_messages``, ``timmy_messages``, ``tool_calls``, ``errors``
|
||||
- ``tool_call_breakdown``: dict[tool_name, count]
|
||||
"""
|
||||
default: dict[str, Any] = {
|
||||
"user_messages": 0,
|
||||
"timmy_messages": 0,
|
||||
"tool_calls": 0,
|
||||
"errors": 0,
|
||||
"tool_call_breakdown": {},
|
||||
}
|
||||
|
||||
try:
|
||||
if get_session_logger is None:
|
||||
return default
|
||||
sl = get_session_logger()
|
||||
sl.flush()
|
||||
|
||||
# Read today's session file directly for accurate counts
|
||||
if not sl.session_file.exists():
|
||||
return default
|
||||
|
||||
entries: list[dict] = []
|
||||
with open(sl.session_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
entries.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
tool_breakdown: dict[str, int] = {}
|
||||
user_msgs = timmy_msgs = tool_calls = errors = 0
|
||||
|
||||
for entry in entries:
|
||||
etype = entry.get("type")
|
||||
if etype == "message":
|
||||
if entry.get("role") == "user":
|
||||
user_msgs += 1
|
||||
elif entry.get("role") == "timmy":
|
||||
timmy_msgs += 1
|
||||
elif etype == "tool_call":
|
||||
tool_calls += 1
|
||||
tool_name = entry.get("tool", "unknown")
|
||||
tool_breakdown[tool_name] = tool_breakdown.get(tool_name, 0) + 1
|
||||
elif etype == "error":
|
||||
errors += 1
|
||||
|
||||
return {
|
||||
"user_messages": user_msgs,
|
||||
"timmy_messages": timmy_msgs,
|
||||
"tool_calls": tool_calls,
|
||||
"errors": errors,
|
||||
"tool_call_breakdown": tool_breakdown,
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Sovereignty: failed to gather session data: %s", exc)
|
||||
return default
|
||||
|
||||
|
||||
def _gather_sovereignty_data() -> dict[str, Any]:
|
||||
"""Pull sovereignty metrics from the SQLite store.
|
||||
|
||||
Returns a dict with:
|
||||
- ``metrics``: summary from ``SovereigntyMetricsStore.get_summary()``
|
||||
- ``deltas``: per-metric start/end values within recent history window
|
||||
- ``previous_session``: most recent prior value for each metric
|
||||
"""
|
||||
try:
|
||||
if get_sovereignty_store is None:
|
||||
return {"metrics": {}, "deltas": {}, "previous_session": {}}
|
||||
store = get_sovereignty_store()
|
||||
summary = store.get_summary()
|
||||
|
||||
deltas: dict[str, dict[str, Any]] = {}
|
||||
previous_session: dict[str, float | None] = {}
|
||||
|
||||
for metric_type in GRADUATION_TARGETS:
|
||||
history = store.get_latest(metric_type, limit=10)
|
||||
if len(history) >= 2:
|
||||
deltas[metric_type] = {
|
||||
"start": history[-1]["value"],
|
||||
"end": history[0]["value"],
|
||||
}
|
||||
previous_session[metric_type] = history[1]["value"]
|
||||
elif len(history) == 1:
|
||||
deltas[metric_type] = {"start": history[0]["value"], "end": history[0]["value"]}
|
||||
previous_session[metric_type] = None
|
||||
else:
|
||||
deltas[metric_type] = {"start": None, "end": None}
|
||||
previous_session[metric_type] = None
|
||||
|
||||
return {
|
||||
"metrics": summary,
|
||||
"deltas": deltas,
|
||||
"previous_session": previous_session,
|
||||
}
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("Sovereignty: failed to gather sovereignty data: %s", exc)
|
||||
return {"metrics": {}, "deltas": {}, "previous_session": {}}
|
||||
|
||||
|
||||
def _render_markdown(
|
||||
now: datetime,
|
||||
session_id: str,
|
||||
duration_secs: float,
|
||||
session_data: dict[str, Any],
|
||||
sov_data: dict[str, Any],
|
||||
) -> str:
|
||||
"""Assemble the full sovereignty report in markdown."""
|
||||
lines: list[str] = []
|
||||
|
||||
# Header
|
||||
lines += [
|
||||
"# Sovereignty Session Report",
|
||||
"",
|
||||
f"**Session ID:** `{session_id}` ",
|
||||
f"**Date:** {now.strftime('%Y-%m-%d')} ",
|
||||
f"**Duration:** {_format_duration(duration_secs)} ",
|
||||
f"**Generated:** {now.isoformat()}",
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
]
|
||||
|
||||
# Session activity
|
||||
lines += [
|
||||
"## Session Activity",
|
||||
"",
|
||||
"| Metric | Count |",
|
||||
"|--------|-------|",
|
||||
f"| User messages | {session_data['user_messages']} |",
|
||||
f"| Timmy responses | {session_data['timmy_messages']} |",
|
||||
f"| Tool calls | {session_data['tool_calls']} |",
|
||||
f"| Errors | {session_data['errors']} |",
|
||||
"",
|
||||
]
|
||||
|
||||
tool_breakdown = session_data.get("tool_call_breakdown", {})
|
||||
if tool_breakdown:
|
||||
lines += ["### Model Calls by Tool", ""]
|
||||
for tool_name, count in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
|
||||
lines.append(f"- `{tool_name}`: {count}")
|
||||
lines.append("")
|
||||
|
||||
# Sovereignty scorecard
|
||||
|
||||
lines += [
|
||||
"## Sovereignty Scorecard",
|
||||
"",
|
||||
"| Metric | Current | Target (graduation) | Phase |",
|
||||
"|--------|---------|---------------------|-------|",
|
||||
]
|
||||
|
||||
for metric_type, data in sov_data["metrics"].items():
|
||||
current = data.get("current")
|
||||
current_str = f"{current:.4f}" if current is not None else "N/A"
|
||||
grad_target = GRADUATION_TARGETS.get(metric_type, {}).get("graduation")
|
||||
grad_str = f"{grad_target:.4f}" if isinstance(grad_target, (int, float)) else "N/A"
|
||||
phase = data.get("phase", "unknown")
|
||||
lines.append(f"| {metric_type} | {current_str} | {grad_str} | {phase} |")
|
||||
|
||||
lines += ["", "### Sovereignty Delta (This Session)", ""]
|
||||
|
||||
for metric_type, delta_info in sov_data.get("deltas", {}).items():
|
||||
start_val = delta_info.get("start")
|
||||
end_val = delta_info.get("end")
|
||||
if start_val is not None and end_val is not None:
|
||||
diff = end_val - start_val
|
||||
sign = "+" if diff >= 0 else ""
|
||||
lines.append(
|
||||
f"- **{metric_type}**: {start_val:.4f} → {end_val:.4f} ({sign}{diff:.4f})"
|
||||
)
|
||||
else:
|
||||
lines.append(f"- **{metric_type}**: N/A (no data recorded)")
|
||||
|
||||
# Cost breakdown
|
||||
lines += ["", "## Cost Breakdown", ""]
|
||||
api_cost_data = sov_data["metrics"].get("api_cost", {})
|
||||
current_cost = api_cost_data.get("current")
|
||||
if current_cost is not None:
|
||||
lines.append(f"- **Total API spend (latest recorded):** ${current_cost:.4f}")
|
||||
else:
|
||||
lines.append("- **Total API spend:** N/A (no data recorded)")
|
||||
lines.append("")
|
||||
|
||||
# Per-layer sovereignty
|
||||
lines += [
|
||||
"## Per-Layer Sovereignty",
|
||||
"",
|
||||
"| Layer | Sovereignty % |",
|
||||
"|-------|--------------|",
|
||||
"| Perception (VLM) | N/A |",
|
||||
"| Decision (LLM) | N/A |",
|
||||
"| Narration (TTS) | N/A |",
|
||||
"",
|
||||
"> Per-layer tracking requires instrumented inference calls. See #957.",
|
||||
"",
|
||||
]
|
||||
|
||||
# Skills crystallized
|
||||
lines += [
|
||||
"## Skills Crystallized",
|
||||
"",
|
||||
"_Skill crystallization tracking not yet implemented. See #957._",
|
||||
"",
|
||||
]
|
||||
|
||||
# Trend vs previous session
|
||||
lines += ["## Trend vs Previous Session", ""]
|
||||
prev_data = sov_data.get("previous_session", {})
|
||||
has_prev = any(v is not None for v in prev_data.values())
|
||||
|
||||
if has_prev:
|
||||
lines += [
|
||||
"| Metric | Previous | Current | Change |",
|
||||
"|--------|----------|---------|--------|",
|
||||
]
|
||||
for metric_type, curr_info in sov_data["metrics"].items():
|
||||
curr_val = curr_info.get("current")
|
||||
prev_val = prev_data.get(metric_type)
|
||||
curr_str = f"{curr_val:.4f}" if curr_val is not None else "N/A"
|
||||
prev_str = f"{prev_val:.4f}" if prev_val is not None else "N/A"
|
||||
if curr_val is not None and prev_val is not None:
|
||||
diff = curr_val - prev_val
|
||||
sign = "+" if diff >= 0 else ""
|
||||
change_str = f"{sign}{diff:.4f}"
|
||||
else:
|
||||
change_str = "N/A"
|
||||
lines.append(f"| {metric_type} | {prev_str} | {curr_str} | {change_str} |")
|
||||
lines.append("")
|
||||
else:
|
||||
lines += ["_No previous session data available for comparison._", ""]
|
||||
|
||||
# Footer
|
||||
lines += [
|
||||
"---",
|
||||
"_Auto-generated by Timmy · Session Sovereignty Report · Refs: #957_",
|
||||
]
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -2549,7 +2549,6 @@
|
||||
.tower-adv-action { font-size: 0.75rem; color: var(--green); margin-top: 4px; font-style: italic; }
|
||||
|
||||
|
||||
|
||||
/* ── Voice settings ───────────────────────────────────────── */
|
||||
.voice-settings-page { max-width: 600px; margin: 0 auto; }
|
||||
|
||||
@@ -2716,44 +2715,73 @@
|
||||
margin-bottom: 0.5rem;
|
||||
}
|
||||
|
||||
|
||||
/* ═══════════════════════════════════════════════════════════════
|
||||
Dreaming Mode
|
||||
═══════════════════════════════════════════════════════════════ */
|
||||
|
||||
.dream-active {
|
||||
display: flex; align-items: center; gap: 8px;
|
||||
padding: 6px 0;
|
||||
/* ── Self-Correction Dashboard ─────────────────────────────── */
|
||||
.sc-event {
|
||||
border-left: 3px solid var(--border);
|
||||
padding: 0.6rem 0.8rem;
|
||||
margin-bottom: 0.75rem;
|
||||
background: rgba(255,255,255,0.02);
|
||||
border-radius: 0 4px 4px 0;
|
||||
font-size: 0.82rem;
|
||||
}
|
||||
.dream-label { font-size: 0.75rem; font-weight: 700; color: var(--purple); letter-spacing: 0.12em; }
|
||||
.dream-summary { font-size: 0.75rem; color: var(--text-dim); font-style: italic; flex: 1; }
|
||||
.sc-event.sc-status-success { border-left-color: var(--green); }
|
||||
.sc-event.sc-status-partial { border-left-color: var(--amber); }
|
||||
.sc-event.sc-status-failed { border-left-color: var(--red); }
|
||||
|
||||
.dream-pulse {
|
||||
display: inline-block; width: 8px; height: 8px; border-radius: 50%;
|
||||
background: var(--purple);
|
||||
animation: dream-pulse 1.8s ease-in-out infinite;
|
||||
.sc-event-header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
margin-bottom: 0.4rem;
|
||||
flex-wrap: wrap;
|
||||
}
|
||||
@keyframes dream-pulse {
|
||||
0%, 100% { opacity: 1; transform: scale(1); }
|
||||
50% { opacity: 0.4; transform: scale(0.7); }
|
||||
.sc-status-badge {
|
||||
font-size: 0.68rem;
|
||||
font-weight: 700;
|
||||
letter-spacing: 0.06em;
|
||||
padding: 0.15rem 0.45rem;
|
||||
border-radius: 3px;
|
||||
}
|
||||
.sc-status-badge.sc-status-success { color: var(--green); background: rgba(0,255,136,0.08); }
|
||||
.sc-status-badge.sc-status-partial { color: var(--amber); background: rgba(255,179,0,0.08); }
|
||||
.sc-status-badge.sc-status-failed { color: var(--red); background: rgba(255,59,59,0.08); }
|
||||
|
||||
.dream-dot {
|
||||
display: inline-block; width: 7px; height: 7px; border-radius: 50%;
|
||||
.sc-source-badge {
|
||||
font-size: 0.68rem;
|
||||
color: var(--purple);
|
||||
background: rgba(168,85,247,0.1);
|
||||
padding: 0.1rem 0.4rem;
|
||||
border-radius: 3px;
|
||||
}
|
||||
.dream-dot-idle { background: var(--amber); }
|
||||
.dream-dot-standby { background: var(--text-dim); }
|
||||
|
||||
.dream-idle, .dream-standby {
|
||||
display: flex; align-items: center; gap: 6px; padding: 4px 0;
|
||||
.sc-event-time { font-size: 0.68rem; color: var(--text-dim); margin-left: auto; }
|
||||
.sc-event-error-type {
|
||||
font-size: 0.72rem;
|
||||
color: var(--amber);
|
||||
font-weight: 600;
|
||||
margin-bottom: 0.3rem;
|
||||
letter-spacing: 0.04em;
|
||||
}
|
||||
.dream-label-idle { font-size: 0.7rem; font-weight: 700; color: var(--amber); letter-spacing: 0.1em; }
|
||||
.dream-label-standby { font-size: 0.7rem; font-weight: 700; color: var(--text-dim); letter-spacing: 0.1em; }
|
||||
.dream-idle-meta { font-size: 0.7rem; color: var(--text-dim); }
|
||||
|
||||
.dream-history { border-top: 1px solid var(--border); padding-top: 6px; }
|
||||
.dream-record { padding: 4px 0; border-bottom: 1px solid var(--border); }
|
||||
.dream-record:last-child { border-bottom: none; }
|
||||
.dream-rule { font-size: 0.75rem; color: var(--text); font-style: italic; }
|
||||
.dream-meta { font-size: 0.65rem; color: var(--text-dim); margin-top: 2px; }
|
||||
.sc-label {
|
||||
font-size: 0.65rem;
|
||||
font-weight: 700;
|
||||
letter-spacing: 0.06em;
|
||||
color: var(--text-dim);
|
||||
margin-right: 0.3rem;
|
||||
}
|
||||
.sc-event-intent, .sc-event-error, .sc-event-strategy, .sc-event-outcome {
|
||||
color: var(--text);
|
||||
margin-bottom: 0.2rem;
|
||||
line-height: 1.4;
|
||||
word-break: break-word;
|
||||
}
|
||||
.sc-event-error { color: var(--red); }
|
||||
.sc-event-strategy { color: var(--text-dim); font-style: italic; }
|
||||
.sc-event-outcome { color: var(--text-bright); }
|
||||
.sc-event-meta { font-size: 0.68rem; color: var(--text-dim); margin-top: 0.3rem; }
|
||||
|
||||
.sc-pattern-type {
|
||||
font-family: var(--font);
|
||||
font-size: 0.8rem;
|
||||
color: var(--text-bright);
|
||||
word-break: break-all;
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
import infrastructure.events.bus as bus_module
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
from infrastructure.events.bus import (
|
||||
Event,
|
||||
EventBus,
|
||||
@@ -352,6 +354,14 @@ class TestEventBusPersistence:
|
||||
events = bus.replay()
|
||||
assert events == []
|
||||
|
||||
def test_init_persistence_db_noop_when_path_is_none(self):
|
||||
"""_init_persistence_db() is a no-op when _persistence_db_path is None."""
|
||||
bus = EventBus()
|
||||
# _persistence_db_path is None by default; calling _init_persistence_db
|
||||
# should silently return without touching the filesystem.
|
||||
bus._init_persistence_db() # must not raise
|
||||
assert bus._persistence_db_path is None
|
||||
|
||||
async def test_wal_mode_on_persistence_db(self, persistent_bus):
|
||||
"""Persistence database should use WAL mode."""
|
||||
conn = sqlite3.connect(str(persistent_bus._persistence_db_path))
|
||||
|
||||
0
tests/self_coding/__init__.py
Normal file
0
tests/self_coding/__init__.py
Normal file
363
tests/self_coding/test_loop.py
Normal file
363
tests/self_coding/test_loop.py
Normal file
@@ -0,0 +1,363 @@
|
||||
"""Unit tests for the self-modification loop.
|
||||
|
||||
Covers:
|
||||
- Protected branch guard
|
||||
- Successful cycle (mocked git + tests)
|
||||
- Edit function failure → branch reverted, no commit
|
||||
- Test failure → branch reverted, no commit
|
||||
- Gitea PR creation plumbing
|
||||
- GiteaClient graceful degradation (no token, network error)
|
||||
|
||||
All git and subprocess calls are mocked so these run offline without
|
||||
a real repo or test suite.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_loop(repo_root="/tmp/fake-repo"):
|
||||
"""Construct a SelfModifyLoop with a fake repo root."""
|
||||
from self_coding.self_modify.loop import SelfModifyLoop
|
||||
|
||||
return SelfModifyLoop(repo_root=repo_root, remote="origin", base_branch="main")
|
||||
|
||||
|
||||
def _noop_edit(repo_root: str) -> None:
|
||||
"""Edit function that does nothing."""
|
||||
|
||||
|
||||
def _failing_edit(repo_root: str) -> None:
|
||||
"""Edit function that raises."""
|
||||
raise RuntimeError("edit exploded")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Guard tests (sync — no git calls needed)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_guard_blocks_main():
|
||||
loop = _make_loop()
|
||||
with pytest.raises(ValueError, match="protected branch"):
|
||||
loop._guard_branch("main")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_guard_blocks_master():
|
||||
loop = _make_loop()
|
||||
with pytest.raises(ValueError, match="protected branch"):
|
||||
loop._guard_branch("master")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_guard_allows_feature_branch():
|
||||
loop = _make_loop()
|
||||
# Should not raise
|
||||
loop._guard_branch("self-modify/some-feature")
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_guard_allows_self_modify_prefix():
|
||||
loop = _make_loop()
|
||||
loop._guard_branch("self-modify/issue-983")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full cycle — success path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_success():
|
||||
"""Happy path: edit succeeds, tests pass, PR created."""
|
||||
loop = _make_loop()
|
||||
|
||||
fake_completed = MagicMock()
|
||||
fake_completed.stdout = "abc1234\n"
|
||||
fake_completed.returncode = 0
|
||||
|
||||
fake_test_result = MagicMock()
|
||||
fake_test_result.stdout = "3 passed"
|
||||
fake_test_result.stderr = ""
|
||||
fake_test_result.returncode = 0
|
||||
|
||||
from self_coding.gitea_client import PullRequest as _PR
|
||||
|
||||
fake_pr = _PR(number=42, title="test PR", html_url="http://gitea/pr/42")
|
||||
|
||||
with (
|
||||
patch.object(loop, "_git", return_value=fake_completed),
|
||||
patch("subprocess.run", return_value=fake_test_result),
|
||||
patch.object(loop, "_create_pr", return_value=fake_pr),
|
||||
):
|
||||
result = await loop.run(
|
||||
slug="test-feature",
|
||||
description="Add test feature",
|
||||
edit_fn=_noop_edit,
|
||||
issue_number=983,
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
assert result.branch == "self-modify/test-feature"
|
||||
assert result.pr_url == "http://gitea/pr/42"
|
||||
assert result.pr_number == 42
|
||||
assert "3 passed" in result.test_output
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_skips_tests_when_flag_set():
|
||||
"""skip_tests=True should bypass the test gate."""
|
||||
loop = _make_loop()
|
||||
|
||||
fake_completed = MagicMock()
|
||||
fake_completed.stdout = "deadbeef\n"
|
||||
fake_completed.returncode = 0
|
||||
|
||||
with (
|
||||
patch.object(loop, "_git", return_value=fake_completed),
|
||||
patch.object(loop, "_create_pr", return_value=None),
|
||||
patch("subprocess.run") as mock_run,
|
||||
):
|
||||
result = await loop.run(
|
||||
slug="skip-test-feature",
|
||||
description="Skip test feature",
|
||||
edit_fn=_noop_edit,
|
||||
skip_tests=True,
|
||||
)
|
||||
|
||||
# subprocess.run should NOT be called for tests
|
||||
mock_run.assert_not_called()
|
||||
assert result.success is True
|
||||
assert "(tests skipped)" in result.test_output
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Failure paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_reverts_on_edit_failure():
|
||||
"""If edit_fn raises, the branch should be reverted and no commit made."""
|
||||
loop = _make_loop()
|
||||
|
||||
fake_completed = MagicMock()
|
||||
fake_completed.stdout = ""
|
||||
fake_completed.returncode = 0
|
||||
|
||||
revert_called = []
|
||||
|
||||
def _fake_revert(branch):
|
||||
revert_called.append(branch)
|
||||
|
||||
with (
|
||||
patch.object(loop, "_git", return_value=fake_completed),
|
||||
patch.object(loop, "_revert_branch", side_effect=_fake_revert),
|
||||
patch.object(loop, "_commit_all") as mock_commit,
|
||||
):
|
||||
result = await loop.run(
|
||||
slug="broken-edit",
|
||||
description="This will fail",
|
||||
edit_fn=_failing_edit,
|
||||
skip_tests=True,
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert "edit exploded" in result.error
|
||||
assert "self-modify/broken-edit" in revert_called
|
||||
mock_commit.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_reverts_on_test_failure():
|
||||
"""If tests fail, branch should be reverted and no commit made."""
|
||||
loop = _make_loop()
|
||||
|
||||
fake_completed = MagicMock()
|
||||
fake_completed.stdout = ""
|
||||
fake_completed.returncode = 0
|
||||
|
||||
fake_test_result = MagicMock()
|
||||
fake_test_result.stdout = "FAILED test_foo"
|
||||
fake_test_result.stderr = "1 failed"
|
||||
fake_test_result.returncode = 1
|
||||
|
||||
revert_called = []
|
||||
|
||||
def _fake_revert(branch):
|
||||
revert_called.append(branch)
|
||||
|
||||
with (
|
||||
patch.object(loop, "_git", return_value=fake_completed),
|
||||
patch("subprocess.run", return_value=fake_test_result),
|
||||
patch.object(loop, "_revert_branch", side_effect=_fake_revert),
|
||||
patch.object(loop, "_commit_all") as mock_commit,
|
||||
):
|
||||
result = await loop.run(
|
||||
slug="tests-will-fail",
|
||||
description="This will fail tests",
|
||||
edit_fn=_noop_edit,
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert "Tests failed" in result.error
|
||||
assert "self-modify/tests-will-fail" in revert_called
|
||||
mock_commit.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_slug_with_main_creates_safe_branch():
|
||||
"""A slug of 'main' produces branch 'self-modify/main', which is not protected."""
|
||||
|
||||
loop = _make_loop()
|
||||
|
||||
fake_completed = MagicMock()
|
||||
fake_completed.stdout = "deadbeef\n"
|
||||
fake_completed.returncode = 0
|
||||
|
||||
# 'self-modify/main' is NOT in _PROTECTED_BRANCHES so the run should succeed
|
||||
with (
|
||||
patch.object(loop, "_git", return_value=fake_completed),
|
||||
patch.object(loop, "_create_pr", return_value=None),
|
||||
):
|
||||
result = await loop.run(
|
||||
slug="main",
|
||||
description="try to write to self-modify/main",
|
||||
edit_fn=_noop_edit,
|
||||
skip_tests=True,
|
||||
)
|
||||
assert result.branch == "self-modify/main"
|
||||
assert result.success is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GiteaClient tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_gitea_client_returns_none_without_token():
|
||||
"""GiteaClient should return None gracefully when no token is set."""
|
||||
from self_coding.gitea_client import GiteaClient
|
||||
|
||||
client = GiteaClient(base_url="http://localhost:3000", token="", repo="owner/repo")
|
||||
pr = client.create_pull_request(
|
||||
title="Test PR",
|
||||
body="body",
|
||||
head="self-modify/test",
|
||||
)
|
||||
assert pr is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_gitea_client_comment_returns_false_without_token():
|
||||
"""add_issue_comment should return False gracefully when no token is set."""
|
||||
from self_coding.gitea_client import GiteaClient
|
||||
|
||||
client = GiteaClient(base_url="http://localhost:3000", token="", repo="owner/repo")
|
||||
result = client.add_issue_comment(123, "hello")
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_gitea_client_create_pr_handles_network_error():
|
||||
"""create_pull_request should return None on network failure."""
|
||||
from self_coding.gitea_client import GiteaClient
|
||||
|
||||
client = GiteaClient(base_url="http://localhost:3000", token="fake-token", repo="owner/repo")
|
||||
|
||||
mock_requests = MagicMock()
|
||||
mock_requests.post.side_effect = Exception("Connection refused")
|
||||
mock_requests.exceptions.ConnectionError = Exception
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_requests}):
|
||||
pr = client.create_pull_request(
|
||||
title="Test PR",
|
||||
body="body",
|
||||
head="self-modify/test",
|
||||
)
|
||||
assert pr is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_gitea_client_comment_handles_network_error():
|
||||
"""add_issue_comment should return False on network failure."""
|
||||
from self_coding.gitea_client import GiteaClient
|
||||
|
||||
client = GiteaClient(base_url="http://localhost:3000", token="fake-token", repo="owner/repo")
|
||||
|
||||
mock_requests = MagicMock()
|
||||
mock_requests.post.side_effect = Exception("Connection refused")
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_requests}):
|
||||
result = client.add_issue_comment(456, "hello")
|
||||
assert result is False
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_gitea_client_create_pr_success():
|
||||
"""create_pull_request should return a PullRequest on HTTP 201."""
|
||||
from self_coding.gitea_client import GiteaClient, PullRequest
|
||||
|
||||
client = GiteaClient(base_url="http://localhost:3000", token="tok", repo="owner/repo")
|
||||
|
||||
fake_resp = MagicMock()
|
||||
fake_resp.raise_for_status = MagicMock()
|
||||
fake_resp.json.return_value = {
|
||||
"number": 77,
|
||||
"title": "Test PR",
|
||||
"html_url": "http://localhost:3000/owner/repo/pulls/77",
|
||||
}
|
||||
|
||||
mock_requests = MagicMock()
|
||||
mock_requests.post.return_value = fake_resp
|
||||
|
||||
with patch.dict("sys.modules", {"requests": mock_requests}):
|
||||
pr = client.create_pull_request("Test PR", "body", "self-modify/feat")
|
||||
|
||||
assert isinstance(pr, PullRequest)
|
||||
assert pr.number == 77
|
||||
assert pr.html_url == "http://localhost:3000/owner/repo/pulls/77"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LoopResult dataclass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_loop_result_defaults():
|
||||
from self_coding.self_modify.loop import LoopResult
|
||||
|
||||
r = LoopResult(success=True)
|
||||
assert r.branch == ""
|
||||
assert r.commit_sha == ""
|
||||
assert r.pr_url == ""
|
||||
assert r.pr_number == 0
|
||||
assert r.test_output == ""
|
||||
assert r.error == ""
|
||||
assert r.elapsed_ms == 0.0
|
||||
assert r.metadata == {}
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_loop_result_failure():
|
||||
from self_coding.self_modify.loop import LoopResult
|
||||
|
||||
r = LoopResult(success=False, error="something broke", branch="self-modify/test")
|
||||
assert r.success is False
|
||||
assert r.error == "something broke"
|
||||
444
tests/timmy/test_session_report.py
Normal file
444
tests/timmy/test_session_report.py
Normal file
@@ -0,0 +1,444 @@
|
||||
"""Tests for timmy.sovereignty.session_report.
|
||||
|
||||
Refs: #957 (Session Sovereignty Report Generator)
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
from timmy.sovereignty.session_report import (
|
||||
_format_duration,
|
||||
_gather_session_data,
|
||||
_gather_sovereignty_data,
|
||||
_render_markdown,
|
||||
commit_report,
|
||||
generate_and_commit_report,
|
||||
generate_report,
|
||||
mark_session_start,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _format_duration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFormatDuration:
|
||||
def test_seconds_only(self):
|
||||
assert _format_duration(45) == "45s"
|
||||
|
||||
def test_minutes_and_seconds(self):
|
||||
assert _format_duration(125) == "2m 5s"
|
||||
|
||||
def test_hours_minutes_seconds(self):
|
||||
assert _format_duration(3661) == "1h 1m 1s"
|
||||
|
||||
def test_zero(self):
|
||||
assert _format_duration(0) == "0s"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# mark_session_start + generate_report (smoke)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMarkSessionStart:
|
||||
def test_sets_session_start(self):
|
||||
import timmy.sovereignty.session_report as sr
|
||||
|
||||
sr._SESSION_START = None
|
||||
mark_session_start()
|
||||
assert sr._SESSION_START is not None
|
||||
assert sr._SESSION_START.tzinfo == UTC
|
||||
|
||||
def test_idempotent_overwrite(self):
|
||||
import timmy.sovereignty.session_report as sr
|
||||
|
||||
mark_session_start()
|
||||
first = sr._SESSION_START
|
||||
time.sleep(0.01)
|
||||
mark_session_start()
|
||||
second = sr._SESSION_START
|
||||
assert second >= first
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _gather_session_data
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGatherSessionData:
|
||||
def test_returns_defaults_when_no_file(self, tmp_path):
|
||||
mock_logger = MagicMock()
|
||||
mock_logger.flush.return_value = None
|
||||
mock_logger.session_file = tmp_path / "nonexistent.jsonl"
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_session_logger",
|
||||
return_value=mock_logger,
|
||||
):
|
||||
data = _gather_session_data()
|
||||
|
||||
assert data["user_messages"] == 0
|
||||
assert data["timmy_messages"] == 0
|
||||
assert data["tool_calls"] == 0
|
||||
assert data["errors"] == 0
|
||||
assert data["tool_call_breakdown"] == {}
|
||||
|
||||
def test_counts_entries_correctly(self, tmp_path):
|
||||
session_file = tmp_path / "session_2026-03-23.jsonl"
|
||||
entries = [
|
||||
{"type": "message", "role": "user", "content": "hello"},
|
||||
{"type": "message", "role": "timmy", "content": "hi"},
|
||||
{"type": "message", "role": "user", "content": "test"},
|
||||
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "found"},
|
||||
{"type": "tool_call", "tool": "memory_search", "args": {}, "result": "nope"},
|
||||
{"type": "tool_call", "tool": "shell", "args": {}, "result": "ok"},
|
||||
{"type": "error", "error": "boom"},
|
||||
]
|
||||
with open(session_file, "w") as f:
|
||||
for e in entries:
|
||||
f.write(json.dumps(e) + "\n")
|
||||
|
||||
mock_logger = MagicMock()
|
||||
mock_logger.flush.return_value = None
|
||||
mock_logger.session_file = session_file
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_session_logger",
|
||||
return_value=mock_logger,
|
||||
):
|
||||
data = _gather_session_data()
|
||||
|
||||
assert data["user_messages"] == 2
|
||||
assert data["timmy_messages"] == 1
|
||||
assert data["tool_calls"] == 3
|
||||
assert data["errors"] == 1
|
||||
assert data["tool_call_breakdown"]["memory_search"] == 2
|
||||
assert data["tool_call_breakdown"]["shell"] == 1
|
||||
|
||||
def test_graceful_on_import_error(self):
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_session_logger",
|
||||
side_effect=ImportError("no session_logger"),
|
||||
):
|
||||
data = _gather_session_data()
|
||||
|
||||
assert data["tool_calls"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _gather_sovereignty_data
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGatherSovereigntyData:
|
||||
def test_returns_empty_on_import_error(self):
|
||||
with patch.dict("sys.modules", {"infrastructure.sovereignty_metrics": None}):
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_sovereignty_store",
|
||||
side_effect=ImportError("no store"),
|
||||
):
|
||||
data = _gather_sovereignty_data()
|
||||
|
||||
assert data["metrics"] == {}
|
||||
assert data["deltas"] == {}
|
||||
assert data["previous_session"] == {}
|
||||
|
||||
def test_populates_deltas_from_history(self):
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_summary.return_value = {
|
||||
"cache_hit_rate": {"current": 0.5, "phase": "week1"},
|
||||
}
|
||||
# get_latest returns newest-first
|
||||
mock_store.get_latest.return_value = [
|
||||
{"value": 0.5},
|
||||
{"value": 0.3},
|
||||
{"value": 0.1},
|
||||
]
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_sovereignty_store",
|
||||
return_value=mock_store,
|
||||
):
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
|
||||
{"cache_hit_rate": {"graduation": 0.9}},
|
||||
):
|
||||
data = _gather_sovereignty_data()
|
||||
|
||||
delta = data["deltas"].get("cache_hit_rate")
|
||||
assert delta is not None
|
||||
assert delta["start"] == 0.1 # oldest in window
|
||||
assert delta["end"] == 0.5 # most recent
|
||||
assert data["previous_session"]["cache_hit_rate"] == 0.3
|
||||
|
||||
def test_single_data_point_no_delta(self):
|
||||
mock_store = MagicMock()
|
||||
mock_store.get_summary.return_value = {}
|
||||
mock_store.get_latest.return_value = [{"value": 0.4}]
|
||||
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.get_sovereignty_store",
|
||||
return_value=mock_store,
|
||||
):
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.GRADUATION_TARGETS",
|
||||
{"api_cost": {"graduation": 0.01}},
|
||||
):
|
||||
data = _gather_sovereignty_data()
|
||||
|
||||
delta = data["deltas"]["api_cost"]
|
||||
assert delta["start"] == 0.4
|
||||
assert delta["end"] == 0.4
|
||||
assert data["previous_session"]["api_cost"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# generate_report (integration — smoke test)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGenerateReport:
|
||||
def _minimal_session_data(self):
|
||||
return {
|
||||
"user_messages": 3,
|
||||
"timmy_messages": 3,
|
||||
"tool_calls": 2,
|
||||
"errors": 0,
|
||||
"tool_call_breakdown": {"memory_search": 2},
|
||||
}
|
||||
|
||||
def _minimal_sov_data(self):
|
||||
return {
|
||||
"metrics": {
|
||||
"cache_hit_rate": {"current": 0.45, "phase": "week1"},
|
||||
"api_cost": {"current": 0.12, "phase": "pre-start"},
|
||||
},
|
||||
"deltas": {
|
||||
"cache_hit_rate": {"start": 0.40, "end": 0.45},
|
||||
"api_cost": {"start": 0.10, "end": 0.12},
|
||||
},
|
||||
"previous_session": {
|
||||
"cache_hit_rate": 0.40,
|
||||
"api_cost": 0.10,
|
||||
},
|
||||
}
|
||||
|
||||
def test_smoke_produces_markdown(self):
|
||||
with (
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_session_data",
|
||||
return_value=self._minimal_session_data(),
|
||||
),
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_sovereignty_data",
|
||||
return_value=self._minimal_sov_data(),
|
||||
),
|
||||
):
|
||||
report = generate_report("test-session")
|
||||
|
||||
assert "# Sovereignty Session Report" in report
|
||||
assert "test-session" in report
|
||||
assert "## Session Activity" in report
|
||||
assert "## Sovereignty Scorecard" in report
|
||||
assert "## Cost Breakdown" in report
|
||||
assert "## Trend vs Previous Session" in report
|
||||
|
||||
def test_report_contains_session_stats(self):
|
||||
with (
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_session_data",
|
||||
return_value=self._minimal_session_data(),
|
||||
),
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_sovereignty_data",
|
||||
return_value=self._minimal_sov_data(),
|
||||
),
|
||||
):
|
||||
report = generate_report()
|
||||
|
||||
assert "| User messages | 3 |" in report
|
||||
assert "memory_search" in report
|
||||
|
||||
def test_report_no_previous_session(self):
|
||||
sov = self._minimal_sov_data()
|
||||
sov["previous_session"] = {"cache_hit_rate": None, "api_cost": None}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_session_data",
|
||||
return_value=self._minimal_session_data(),
|
||||
),
|
||||
patch(
|
||||
"timmy.sovereignty.session_report._gather_sovereignty_data",
|
||||
return_value=sov,
|
||||
),
|
||||
):
|
||||
report = generate_report()
|
||||
|
||||
assert "No previous session data" in report
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# commit_report
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCommitReport:
|
||||
def test_returns_false_when_gitea_disabled(self):
|
||||
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = False
|
||||
result = commit_report("# test", "dashboard")
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_returns_false_when_no_token(self):
|
||||
with patch("timmy.sovereignty.session_report.settings") as mock_settings:
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = ""
|
||||
result = commit_report("# test", "dashboard")
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_creates_file_via_put(self):
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.raise_for_status.return_value = None
|
||||
|
||||
mock_check = MagicMock()
|
||||
mock_check.status_code = 404 # file does not exist yet
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.get.return_value = mock_check
|
||||
mock_client.put.return_value = mock_response
|
||||
|
||||
with (
|
||||
patch("timmy.sovereignty.session_report.settings") as mock_settings,
|
||||
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "fake-token"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
result = commit_report("# report content", "dashboard")
|
||||
|
||||
assert result is True
|
||||
mock_client.put.assert_called_once()
|
||||
call_kwargs = mock_client.put.call_args
|
||||
payload = call_kwargs.kwargs.get("json", call_kwargs.args[1] if len(call_kwargs.args) > 1 else {})
|
||||
decoded = base64.b64decode(payload["content"]).decode()
|
||||
assert "# report content" in decoded
|
||||
|
||||
def test_updates_existing_file_with_sha(self):
|
||||
mock_check = MagicMock()
|
||||
mock_check.status_code = 200
|
||||
mock_check.json.return_value = {"sha": "abc123"}
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.raise_for_status.return_value = None
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.get.return_value = mock_check
|
||||
mock_client.put.return_value = mock_response
|
||||
|
||||
with (
|
||||
patch("timmy.sovereignty.session_report.settings") as mock_settings,
|
||||
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "fake-token"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
result = commit_report("# updated", "dashboard")
|
||||
|
||||
assert result is True
|
||||
payload = mock_client.put.call_args.kwargs.get("json", {})
|
||||
assert payload.get("sha") == "abc123"
|
||||
|
||||
def test_returns_false_on_http_error(self):
|
||||
import httpx
|
||||
|
||||
mock_check = MagicMock()
|
||||
mock_check.status_code = 404
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.get.return_value = mock_check
|
||||
mock_client.put.side_effect = httpx.HTTPStatusError(
|
||||
"403", request=MagicMock(), response=MagicMock(status_code=403)
|
||||
)
|
||||
|
||||
with (
|
||||
patch("timmy.sovereignty.session_report.settings") as mock_settings,
|
||||
patch("timmy.sovereignty.session_report.httpx.Client", return_value=mock_client),
|
||||
):
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "fake-token"
|
||||
mock_settings.gitea_url = "http://localhost:3000"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
|
||||
result = commit_report("# test", "dashboard")
|
||||
|
||||
assert result is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# generate_and_commit_report (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGenerateAndCommitReport:
|
||||
async def test_returns_true_on_success(self):
|
||||
with (
|
||||
patch(
|
||||
"timmy.sovereignty.session_report.generate_report",
|
||||
return_value="# mock report",
|
||||
),
|
||||
patch(
|
||||
"timmy.sovereignty.session_report.commit_report",
|
||||
return_value=True,
|
||||
),
|
||||
):
|
||||
result = await generate_and_commit_report("test")
|
||||
|
||||
assert result is True
|
||||
|
||||
async def test_returns_false_when_commit_fails(self):
|
||||
with (
|
||||
patch(
|
||||
"timmy.sovereignty.session_report.generate_report",
|
||||
return_value="# mock report",
|
||||
),
|
||||
patch(
|
||||
"timmy.sovereignty.session_report.commit_report",
|
||||
return_value=False,
|
||||
),
|
||||
):
|
||||
result = await generate_and_commit_report()
|
||||
|
||||
assert result is False
|
||||
|
||||
async def test_graceful_on_exception(self):
|
||||
with patch(
|
||||
"timmy.sovereignty.session_report.generate_report",
|
||||
side_effect=RuntimeError("explode"),
|
||||
):
|
||||
result = await generate_and_commit_report()
|
||||
|
||||
assert result is False
|
||||
@@ -1,217 +0,0 @@
|
||||
"""Unit tests for the Dreaming mode engine."""
|
||||
|
||||
import sqlite3
|
||||
from contextlib import closing
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.dreaming import DreamingEngine, DreamRecord, _SESSION_GAP_SECONDS
|
||||
|
||||
|
||||
# ── Fixtures ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def tmp_dreams_db(tmp_path):
|
||||
"""Return a temporary path for the dreams database."""
|
||||
return tmp_path / "dreams.db"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def engine(tmp_dreams_db):
|
||||
"""DreamingEngine backed by a temp database."""
|
||||
return DreamingEngine(db_path=tmp_dreams_db)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def chat_db(tmp_path):
|
||||
"""Create a minimal chat database with some messages."""
|
||||
db_path = tmp_path / "chat.db"
|
||||
with closing(sqlite3.connect(str(db_path))) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE chat_messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
role TEXT NOT NULL,
|
||||
content TEXT NOT NULL,
|
||||
timestamp TEXT NOT NULL,
|
||||
source TEXT NOT NULL DEFAULT 'browser'
|
||||
)
|
||||
""")
|
||||
now = datetime.now(UTC)
|
||||
messages = [
|
||||
("user", "Hello, can you help me?", (now - timedelta(hours=2)).isoformat()),
|
||||
("agent", "Of course! What do you need?", (now - timedelta(hours=2, seconds=-5)).isoformat()),
|
||||
("user", "How does Python handle errors?", (now - timedelta(hours=2, seconds=-60)).isoformat()),
|
||||
("agent", "Python uses try/except blocks.", (now - timedelta(hours=2, seconds=-120)).isoformat()),
|
||||
("user", "Thanks!", (now - timedelta(hours=2, seconds=-180)).isoformat()),
|
||||
]
|
||||
conn.executemany(
|
||||
"INSERT INTO chat_messages (role, content, timestamp) VALUES (?, ?, ?)",
|
||||
messages,
|
||||
)
|
||||
conn.commit()
|
||||
return db_path
|
||||
|
||||
|
||||
# ── Idle detection ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestIdleDetection:
|
||||
def test_not_idle_immediately(self, engine):
|
||||
assert engine.is_idle() is False
|
||||
|
||||
def test_idle_after_threshold(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=20)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.is_idle() is True
|
||||
|
||||
def test_not_idle_when_threshold_zero(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=99)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 0
|
||||
assert engine.is_idle() is False
|
||||
|
||||
def test_record_activity_resets_timer(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(minutes=30)
|
||||
engine.record_activity()
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.is_idle() is False
|
||||
|
||||
|
||||
# ── Status dict ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGetStatus:
|
||||
def test_status_shape(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
status = engine.get_status()
|
||||
assert "enabled" in status
|
||||
assert "dreaming" in status
|
||||
assert "idle" in status
|
||||
assert "dream_count" in status
|
||||
assert "idle_minutes" in status
|
||||
|
||||
def test_dream_count_starts_at_zero(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
assert engine.get_status()["dream_count"] == 0
|
||||
|
||||
|
||||
# ── Session grouping ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestGroupIntoSessions:
|
||||
def test_single_session(self, engine):
|
||||
now = datetime.now(UTC)
|
||||
rows = [
|
||||
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
|
||||
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=10)).isoformat()},
|
||||
]
|
||||
sessions = engine._group_into_sessions(rows)
|
||||
assert len(sessions) == 1
|
||||
assert len(sessions[0]) == 2
|
||||
|
||||
def test_splits_on_large_gap(self, engine):
|
||||
now = datetime.now(UTC)
|
||||
gap = _SESSION_GAP_SECONDS + 100
|
||||
rows = [
|
||||
{"role": "user", "content": "hi", "timestamp": now.isoformat()},
|
||||
{"role": "agent", "content": "hello", "timestamp": (now + timedelta(seconds=gap)).isoformat()},
|
||||
]
|
||||
sessions = engine._group_into_sessions(rows)
|
||||
assert len(sessions) == 2
|
||||
|
||||
def test_empty_input(self, engine):
|
||||
assert engine._group_into_sessions([]) == []
|
||||
|
||||
|
||||
# ── Dream storage ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDreamStorage:
|
||||
def test_store_and_retrieve(self, engine):
|
||||
dream = engine._store_dream(
|
||||
session_excerpt="User asked about Python.",
|
||||
decision_point="Python uses try/except blocks.",
|
||||
simulation="I could have given a code example.",
|
||||
proposed_rule="When explaining errors, include a code snippet.",
|
||||
)
|
||||
assert dream.id
|
||||
assert dream.proposed_rule == "When explaining errors, include a code snippet."
|
||||
|
||||
retrieved = engine.get_recent_dreams(limit=1)
|
||||
assert len(retrieved) == 1
|
||||
assert retrieved[0].id == dream.id
|
||||
|
||||
def test_count_increments(self, engine):
|
||||
assert engine.count_dreams() == 0
|
||||
engine._store_dream(
|
||||
session_excerpt="test", decision_point="test", simulation="test", proposed_rule="test"
|
||||
)
|
||||
assert engine.count_dreams() == 1
|
||||
|
||||
|
||||
# ── dream_once integration ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDreamOnce:
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_disabled(self, engine):
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = False
|
||||
result = await engine.dream_once()
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_not_idle(self, engine):
|
||||
engine._last_activity_time = datetime.now(UTC)
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 60
|
||||
result = await engine.dream_once()
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_when_already_dreaming(self, engine):
|
||||
engine._is_dreaming = True
|
||||
with patch("timmy.dreaming.settings") as mock_settings:
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 0
|
||||
result = await engine.dream_once()
|
||||
# Reset for cleanliness
|
||||
engine._is_dreaming = False
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dream_produces_record_when_idle(self, engine, chat_db):
|
||||
"""Full cycle: idle + chat data + mocked LLM → produces DreamRecord."""
|
||||
engine._last_activity_time = datetime.now(UTC) - timedelta(hours=1)
|
||||
|
||||
with (
|
||||
patch("timmy.dreaming.settings") as mock_settings,
|
||||
patch("timmy.dreaming.DreamingEngine._call_agent", new_callable=AsyncMock) as mock_agent,
|
||||
patch("infrastructure.chat_store.DB_PATH", chat_db),
|
||||
):
|
||||
mock_settings.dreaming_enabled = True
|
||||
mock_settings.dreaming_idle_threshold_minutes = 10
|
||||
mock_settings.dreaming_timeout_seconds = 30
|
||||
mock_agent.side_effect = [
|
||||
"I could have provided a concrete try/except example.", # simulation
|
||||
"When explaining errors, always include a runnable code snippet.", # rule
|
||||
]
|
||||
|
||||
result = await engine.dream_once()
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, DreamRecord)
|
||||
assert result.simulation
|
||||
assert result.proposed_rule
|
||||
assert engine.count_dreams() == 1
|
||||
297
tests/unit/test_energy_monitor.py
Normal file
297
tests/unit/test_energy_monitor.py
Normal file
@@ -0,0 +1,297 @@
|
||||
"""Unit tests for the Energy Budget Monitor.
|
||||
|
||||
Tests power estimation strategies, inference recording, efficiency scoring,
|
||||
and low power mode logic — all without real subprocesses.
|
||||
|
||||
Refs: #1009
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.energy.monitor import (
|
||||
EnergyBudgetMonitor,
|
||||
InferenceSample,
|
||||
_DEFAULT_MODEL_SIZE_GB,
|
||||
_EFFICIENCY_SCORE_CEILING,
|
||||
_WATTS_PER_GB_HEURISTIC,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def monitor():
|
||||
return EnergyBudgetMonitor()
|
||||
|
||||
|
||||
# ── Model size lookup ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_model_size_exact_match(monitor):
|
||||
assert monitor._model_size_gb("qwen3:8b") == 5.5
|
||||
|
||||
|
||||
def test_model_size_substring_match(monitor):
|
||||
assert monitor._model_size_gb("some-qwen3:14b-custom") == 9.0
|
||||
|
||||
|
||||
def test_model_size_unknown_returns_default(monitor):
|
||||
assert monitor._model_size_gb("unknownmodel:99b") == _DEFAULT_MODEL_SIZE_GB
|
||||
|
||||
|
||||
# ── Battery power reading ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_read_battery_watts_on_battery(monitor):
|
||||
ioreg_output = (
|
||||
"{\n"
|
||||
' "InstantAmperage" = 2500\n'
|
||||
' "Voltage" = 12000\n'
|
||||
' "ExternalConnected" = No\n'
|
||||
"}"
|
||||
)
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ioreg_output
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
watts = monitor._read_battery_watts()
|
||||
|
||||
# 2500 mA * 12000 mV / 1_000_000 = 30 W
|
||||
assert watts == pytest.approx(30.0, abs=0.01)
|
||||
|
||||
|
||||
def test_read_battery_watts_plugged_in_returns_zero(monitor):
|
||||
ioreg_output = (
|
||||
"{\n"
|
||||
' "InstantAmperage" = 1000\n'
|
||||
' "Voltage" = 12000\n'
|
||||
' "ExternalConnected" = Yes\n'
|
||||
"}"
|
||||
)
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = ioreg_output
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
watts = monitor._read_battery_watts()
|
||||
|
||||
assert watts == 0.0
|
||||
|
||||
|
||||
def test_read_battery_watts_subprocess_failure_raises(monitor):
|
||||
with patch("subprocess.run", side_effect=OSError("no ioreg")):
|
||||
with pytest.raises(OSError):
|
||||
monitor._read_battery_watts()
|
||||
|
||||
|
||||
# ── CPU proxy reading ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_read_cpu_pct_parses_top(monitor):
|
||||
top_output = (
|
||||
"Processes: 450 total\n"
|
||||
"CPU usage: 15.2% user, 8.8% sys, 76.0% idle\n"
|
||||
)
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = top_output
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
pct = monitor._read_cpu_pct()
|
||||
|
||||
assert pct == pytest.approx(24.0, abs=0.1)
|
||||
|
||||
|
||||
def test_read_cpu_pct_no_match_returns_negative(monitor):
|
||||
mock_result = MagicMock()
|
||||
mock_result.stdout = "No CPU line here\n"
|
||||
|
||||
with patch("subprocess.run", return_value=mock_result):
|
||||
pct = monitor._read_cpu_pct()
|
||||
|
||||
assert pct == -1.0
|
||||
|
||||
|
||||
# ── Power strategy selection ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_read_power_uses_battery_first(monitor):
|
||||
with patch.object(monitor, "_read_battery_watts", return_value=25.0):
|
||||
watts, strategy = monitor._read_power()
|
||||
|
||||
assert watts == 25.0
|
||||
assert strategy == "battery"
|
||||
|
||||
|
||||
def test_read_power_falls_back_to_cpu_proxy(monitor):
|
||||
with (
|
||||
patch.object(monitor, "_read_battery_watts", return_value=0.0),
|
||||
patch.object(monitor, "_read_cpu_pct", return_value=50.0),
|
||||
):
|
||||
watts, strategy = monitor._read_power()
|
||||
|
||||
assert strategy == "cpu_proxy"
|
||||
assert watts == pytest.approx(20.0, abs=0.1) # 50% of 40W TDP
|
||||
|
||||
|
||||
def test_read_power_unavailable_when_both_fail(monitor):
|
||||
with (
|
||||
patch.object(monitor, "_read_battery_watts", side_effect=OSError),
|
||||
patch.object(monitor, "_read_cpu_pct", return_value=-1.0),
|
||||
):
|
||||
watts, strategy = monitor._read_power()
|
||||
|
||||
assert strategy == "unavailable"
|
||||
assert watts == 0.0
|
||||
|
||||
|
||||
# ── Inference recording ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_record_inference_produces_sample(monitor):
|
||||
monitor._cached_watts = 10.0
|
||||
monitor._cache_ts = 9999999999.0 # far future — cache won't expire
|
||||
|
||||
sample = monitor.record_inference("qwen3:8b", tokens_per_second=40.0)
|
||||
|
||||
assert isinstance(sample, InferenceSample)
|
||||
assert sample.model == "qwen3:8b"
|
||||
assert sample.tokens_per_second == 40.0
|
||||
assert sample.estimated_watts == pytest.approx(10.0)
|
||||
# efficiency = 40 / 10 = 4.0 tok/s per W
|
||||
assert sample.efficiency == pytest.approx(4.0)
|
||||
# score = min(10, (4.0 / 5.0) * 10) = 8.0
|
||||
assert sample.efficiency_score == pytest.approx(8.0)
|
||||
|
||||
|
||||
def test_record_inference_stores_in_history(monitor):
|
||||
monitor._cached_watts = 5.0
|
||||
monitor._cache_ts = 9999999999.0
|
||||
|
||||
monitor.record_inference("qwen3:8b", 30.0)
|
||||
monitor.record_inference("qwen3:14b", 20.0)
|
||||
|
||||
assert len(monitor._samples) == 2
|
||||
|
||||
|
||||
def test_record_inference_auto_activates_low_power(monitor):
|
||||
monitor._cached_watts = 20.0 # above default 15W threshold
|
||||
monitor._cache_ts = 9999999999.0
|
||||
|
||||
assert not monitor.low_power_mode
|
||||
monitor.record_inference("qwen3:30b", 8.0)
|
||||
assert monitor.low_power_mode
|
||||
|
||||
|
||||
def test_record_inference_no_auto_low_power_below_threshold(monitor):
|
||||
monitor._cached_watts = 10.0 # below default 15W threshold
|
||||
monitor._cache_ts = 9999999999.0
|
||||
|
||||
monitor.record_inference("qwen3:8b", 40.0)
|
||||
assert not monitor.low_power_mode
|
||||
|
||||
|
||||
# ── Efficiency score ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_efficiency_score_caps_at_10(monitor):
|
||||
monitor._cached_watts = 1.0
|
||||
monitor._cache_ts = 9999999999.0
|
||||
|
||||
sample = monitor.record_inference("qwen3:1b", tokens_per_second=1000.0)
|
||||
assert sample.efficiency_score == pytest.approx(10.0)
|
||||
|
||||
|
||||
def test_efficiency_score_no_samples_returns_negative_one(monitor):
|
||||
assert monitor._compute_mean_efficiency_score() == -1.0
|
||||
|
||||
|
||||
def test_mean_efficiency_score_averages_last_10(monitor):
|
||||
monitor._cached_watts = 10.0
|
||||
monitor._cache_ts = 9999999999.0
|
||||
|
||||
for _ in range(15):
|
||||
monitor.record_inference("qwen3:8b", tokens_per_second=25.0) # efficiency=2.5 → score=5.0
|
||||
|
||||
score = monitor._compute_mean_efficiency_score()
|
||||
assert score == pytest.approx(5.0, abs=0.01)
|
||||
|
||||
|
||||
# ── Low power mode ────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_set_low_power_mode_toggle(monitor):
|
||||
assert not monitor.low_power_mode
|
||||
monitor.set_low_power_mode(True)
|
||||
assert monitor.low_power_mode
|
||||
monitor.set_low_power_mode(False)
|
||||
assert not monitor.low_power_mode
|
||||
|
||||
|
||||
# ── get_report ────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_report_structure(monitor):
|
||||
with patch.object(monitor, "_read_power", return_value=(8.0, "battery")):
|
||||
report = await monitor.get_report()
|
||||
|
||||
assert report.timestamp
|
||||
assert isinstance(report.low_power_mode, bool)
|
||||
assert isinstance(report.current_watts, float)
|
||||
assert report.strategy in ("battery", "cpu_proxy", "heuristic", "unavailable")
|
||||
assert isinstance(report.recommendation, str)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_report_to_dict(monitor):
|
||||
with patch.object(monitor, "_read_power", return_value=(5.0, "cpu_proxy")):
|
||||
report = await monitor.get_report()
|
||||
|
||||
data = report.to_dict()
|
||||
assert "timestamp" in data
|
||||
assert "low_power_mode" in data
|
||||
assert "current_watts" in data
|
||||
assert "strategy" in data
|
||||
assert "efficiency_score" in data
|
||||
assert "recent_samples" in data
|
||||
assert "recommendation" in data
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_report_caches_power_reading(monitor):
|
||||
call_count = 0
|
||||
|
||||
def counting_read_power():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return (10.0, "battery")
|
||||
|
||||
with patch.object(monitor, "_read_power", side_effect=counting_read_power):
|
||||
await monitor.get_report()
|
||||
await monitor.get_report()
|
||||
|
||||
# Cache TTL is 10s — should only call once
|
||||
assert call_count == 1
|
||||
|
||||
|
||||
# ── Recommendation text ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_recommendation_no_data(monitor):
|
||||
rec = monitor._build_recommendation(-1.0)
|
||||
assert "No inference data" in rec
|
||||
|
||||
|
||||
def test_recommendation_low_power_mode(monitor):
|
||||
monitor.set_low_power_mode(True)
|
||||
rec = monitor._build_recommendation(2.0)
|
||||
assert "Low power mode active" in rec
|
||||
|
||||
|
||||
def test_recommendation_low_efficiency(monitor):
|
||||
rec = monitor._build_recommendation(1.5)
|
||||
assert "Low efficiency" in rec
|
||||
|
||||
|
||||
def test_recommendation_good_efficiency(monitor):
|
||||
rec = monitor._build_recommendation(8.0)
|
||||
assert "Good efficiency" in rec
|
||||
269
tests/unit/test_self_correction.py
Normal file
269
tests/unit/test_self_correction.py
Normal file
@@ -0,0 +1,269 @@
|
||||
"""Unit tests for infrastructure.self_correction."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolated_db(tmp_path, monkeypatch):
|
||||
"""Point the self-correction module at a fresh temp database per test."""
|
||||
import infrastructure.self_correction as sc_mod
|
||||
|
||||
# Reset the cached path so each test gets a clean DB
|
||||
sc_mod._DB_PATH = tmp_path / "self_correction.db"
|
||||
yield
|
||||
sc_mod._DB_PATH = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# log_self_correction
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestLogSelfCorrection:
|
||||
def test_returns_event_id(self):
|
||||
from infrastructure.self_correction import log_self_correction
|
||||
|
||||
eid = log_self_correction(
|
||||
source="test",
|
||||
original_intent="Do X",
|
||||
detected_error="ValueError: bad input",
|
||||
correction_strategy="Try Y instead",
|
||||
final_outcome="Y succeeded",
|
||||
)
|
||||
assert isinstance(eid, str)
|
||||
assert len(eid) == 36 # UUID format
|
||||
|
||||
def test_derives_error_type_from_error_string(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="Connect",
|
||||
detected_error="ConnectionRefusedError: port 80",
|
||||
correction_strategy="Use port 8080",
|
||||
final_outcome="ok",
|
||||
)
|
||||
rows = get_corrections(limit=1)
|
||||
assert rows[0]["error_type"] == "ConnectionRefusedError"
|
||||
|
||||
def test_explicit_error_type_preserved(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="Run task",
|
||||
detected_error="Some weird error",
|
||||
correction_strategy="Fix it",
|
||||
final_outcome="done",
|
||||
error_type="CustomError",
|
||||
)
|
||||
rows = get_corrections(limit=1)
|
||||
assert rows[0]["error_type"] == "CustomError"
|
||||
|
||||
def test_task_id_stored(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="intent",
|
||||
detected_error="err",
|
||||
correction_strategy="strat",
|
||||
final_outcome="outcome",
|
||||
task_id="task-abc-123",
|
||||
)
|
||||
rows = get_corrections(limit=1)
|
||||
assert rows[0]["task_id"] == "task-abc-123"
|
||||
|
||||
def test_outcome_status_stored(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="i",
|
||||
detected_error="e",
|
||||
correction_strategy="s",
|
||||
final_outcome="o",
|
||||
outcome_status="failed",
|
||||
)
|
||||
rows = get_corrections(limit=1)
|
||||
assert rows[0]["outcome_status"] == "failed"
|
||||
|
||||
def test_long_strings_truncated(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
long = "x" * 3000
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent=long,
|
||||
detected_error=long,
|
||||
correction_strategy=long,
|
||||
final_outcome=long,
|
||||
)
|
||||
rows = get_corrections(limit=1)
|
||||
assert len(rows[0]["original_intent"]) <= 2000
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_corrections
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetCorrections:
|
||||
def test_empty_db_returns_empty_list(self):
|
||||
from infrastructure.self_correction import get_corrections
|
||||
|
||||
assert get_corrections() == []
|
||||
|
||||
def test_returns_newest_first(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
for i in range(3):
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent=f"intent {i}",
|
||||
detected_error="err",
|
||||
correction_strategy="fix",
|
||||
final_outcome="done",
|
||||
error_type=f"Type{i}",
|
||||
)
|
||||
rows = get_corrections(limit=10)
|
||||
assert len(rows) == 3
|
||||
# Newest first — Type2 should appear before Type0
|
||||
types = [r["error_type"] for r in rows]
|
||||
assert types.index("Type2") < types.index("Type0")
|
||||
|
||||
def test_limit_respected(self):
|
||||
from infrastructure.self_correction import get_corrections, log_self_correction
|
||||
|
||||
for _ in range(5):
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="i",
|
||||
detected_error="e",
|
||||
correction_strategy="s",
|
||||
final_outcome="o",
|
||||
)
|
||||
rows = get_corrections(limit=3)
|
||||
assert len(rows) == 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_patterns
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetPatterns:
|
||||
def test_empty_db_returns_empty_list(self):
|
||||
from infrastructure.self_correction import get_patterns
|
||||
|
||||
assert get_patterns() == []
|
||||
|
||||
def test_counts_by_error_type(self):
|
||||
from infrastructure.self_correction import get_patterns, log_self_correction
|
||||
|
||||
for _ in range(3):
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="i",
|
||||
detected_error="e",
|
||||
correction_strategy="s",
|
||||
final_outcome="o",
|
||||
error_type="TimeoutError",
|
||||
)
|
||||
log_self_correction(
|
||||
source="test",
|
||||
original_intent="i",
|
||||
detected_error="e",
|
||||
correction_strategy="s",
|
||||
final_outcome="o",
|
||||
error_type="ValueError",
|
||||
)
|
||||
patterns = get_patterns(top_n=10)
|
||||
by_type = {p["error_type"]: p for p in patterns}
|
||||
assert by_type["TimeoutError"]["count"] == 3
|
||||
assert by_type["ValueError"]["count"] == 1
|
||||
|
||||
def test_success_vs_failed_counts(self):
|
||||
from infrastructure.self_correction import get_patterns, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="test", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o",
|
||||
error_type="Foo", outcome_status="success",
|
||||
)
|
||||
log_self_correction(
|
||||
source="test", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o",
|
||||
error_type="Foo", outcome_status="failed",
|
||||
)
|
||||
patterns = get_patterns(top_n=5)
|
||||
foo = next(p for p in patterns if p["error_type"] == "Foo")
|
||||
assert foo["success_count"] == 1
|
||||
assert foo["failed_count"] == 1
|
||||
|
||||
def test_ordered_by_count_desc(self):
|
||||
from infrastructure.self_correction import get_patterns, log_self_correction
|
||||
|
||||
for _ in range(2):
|
||||
log_self_correction(
|
||||
source="t", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o", error_type="Rare",
|
||||
)
|
||||
for _ in range(5):
|
||||
log_self_correction(
|
||||
source="t", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o", error_type="Common",
|
||||
)
|
||||
patterns = get_patterns(top_n=5)
|
||||
assert patterns[0]["error_type"] == "Common"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_stats
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetStats:
|
||||
def test_empty_db_returns_zeroes(self):
|
||||
from infrastructure.self_correction import get_stats
|
||||
|
||||
stats = get_stats()
|
||||
assert stats["total"] == 0
|
||||
assert stats["success_rate"] == 0
|
||||
|
||||
def test_counts_outcomes(self):
|
||||
from infrastructure.self_correction import get_stats, log_self_correction
|
||||
|
||||
log_self_correction(
|
||||
source="t", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o", outcome_status="success",
|
||||
)
|
||||
log_self_correction(
|
||||
source="t", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o", outcome_status="failed",
|
||||
)
|
||||
stats = get_stats()
|
||||
assert stats["total"] == 2
|
||||
assert stats["success_count"] == 1
|
||||
assert stats["failed_count"] == 1
|
||||
assert stats["success_rate"] == 50
|
||||
|
||||
def test_success_rate_100_when_all_succeed(self):
|
||||
from infrastructure.self_correction import get_stats, log_self_correction
|
||||
|
||||
for _ in range(4):
|
||||
log_self_correction(
|
||||
source="t", original_intent="i", detected_error="e",
|
||||
correction_strategy="s", final_outcome="o", outcome_status="success",
|
||||
)
|
||||
stats = get_stats()
|
||||
assert stats["success_rate"] == 100
|
||||
Reference in New Issue
Block a user