[kimi] Refactor autoresearch.py -> SystemExperiment class (#906) (#1244)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled

Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit was merged in pull request #1244.
This commit is contained in:
2026-03-23 23:28:54 +00:00
committed by Alexander Whitestone
parent 6691f4d1f3
commit 261b7be468
3 changed files with 380 additions and 65 deletions

View File

@@ -8,7 +8,7 @@ Flow:
1. prepare_experiment — clone repo + run data prep
2. run_experiment — execute train.py with wall-clock timeout
3. evaluate_result — compare metric against baseline
4. experiment_loop — orchestrate the full cycle
4. SystemExperiment — orchestrate the full cycle via class interface
All subprocess calls are guarded with timeouts for graceful degradation.
"""
@@ -20,6 +20,7 @@ import logging
import re
import subprocess
import time
from collections.abc import Callable
from pathlib import Path
from typing import Any
@@ -29,6 +30,10 @@ DEFAULT_REPO = "https://github.com/karpathy/autoresearch.git"
_METRIC_RE = re.compile(r"val_bpb[:\s]+([0-9]+\.?[0-9]*)")
# ── Higher-is-better metric names ────────────────────────────────────────────
_HIGHER_IS_BETTER = frozenset({"unit_pass_rate", "coverage"})
def prepare_experiment(
workspace: Path,
repo_url: str = DEFAULT_REPO,
@@ -125,7 +130,9 @@ def run_experiment(
"log": output[-2000:], # Keep last 2k chars
"duration_s": duration,
"success": result.returncode == 0,
"error": None if result.returncode == 0 else f"Exit code {result.returncode}",
"error": (
None if result.returncode == 0 else f"Exit code {result.returncode}"
),
}
except subprocess.TimeoutExpired:
duration = int(time.monotonic() - start)
@@ -179,9 +186,13 @@ def evaluate_result(
pct = (delta / baseline) * 100 if baseline != 0 else 0.0
if delta < 0:
return f"Improvement: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
return (
f"Improvement: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
)
elif delta > 0:
return f"Regression: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
return (
f"Regression: {metric_name} {baseline:.4f} -> {current:.4f} ({pct:+.2f}%)"
)
else:
return f"No change: {metric_name} = {current:.4f}"
@@ -237,10 +248,6 @@ def _extract_coverage(output: str) -> float | None:
return None
# ── Higher-is-better metric names ────────────────────────────────────────────
_HIGHER_IS_BETTER = frozenset({"unit_pass_rate", "coverage"})
class SystemExperiment:
"""An autoresearch experiment targeting a specific module with a configurable metric.
@@ -254,6 +261,10 @@ class SystemExperiment:
Any other value is forwarded to :func:`_extract_metric`.
budget_minutes: Wall-clock budget per experiment (default 5 min).
workspace: Working directory for subprocess calls. Defaults to ``cwd``.
revert_on_failure: Whether to revert changes on failed experiments.
hypothesis: Optional natural language hypothesis for the experiment.
metric_fn: Optional callable for custom metric extraction.
If provided, overrides built-in metric extraction.
"""
def __init__(
@@ -262,11 +273,19 @@ class SystemExperiment:
metric: str = "unit_pass_rate",
budget_minutes: int = 5,
workspace: Path | None = None,
revert_on_failure: bool = True,
hypothesis: str = "",
metric_fn: Callable[[str], float | None] | None = None,
) -> None:
self.target = target
self.metric = metric
self.budget_seconds = budget_minutes * 60
self.workspace = Path(workspace) if workspace else Path.cwd()
self.revert_on_failure = revert_on_failure
self.hypothesis = hypothesis
self.metric_fn = metric_fn
self.results: list[dict[str, Any]] = []
self.baseline: float | None = None
# ── Hypothesis generation ─────────────────────────────────────────────────
@@ -341,7 +360,9 @@ class SystemExperiment:
"log": output[-3000:],
"duration_s": duration,
"success": result.returncode == 0,
"error": None if result.returncode == 0 else f"Exit code {result.returncode}",
"error": (
None if result.returncode == 0 else f"Exit code {result.returncode}"
),
}
except subprocess.TimeoutExpired:
duration = int(time.monotonic() - start)
@@ -353,10 +374,24 @@ class SystemExperiment:
"error": f"Budget exceeded after {self.budget_seconds}s",
}
except OSError as exc:
return {"metric": None, "log": "", "duration_s": 0, "success": False, "error": str(exc)}
return {
"metric": None,
"log": "",
"duration_s": 0,
"success": False,
"error": str(exc),
}
def _extract_tox_metric(self, output: str) -> float | None:
"""Dispatch to the correct metric extractor based on *self.metric*."""
# Use custom metric function if provided
if self.metric_fn is not None:
try:
return self.metric_fn(output)
except Exception as exc:
logger.warning("Custom metric_fn failed: %s", exc)
return None
if self.metric == "unit_pass_rate":
return _extract_pass_rate(output)
if self.metric == "coverage":
@@ -391,6 +426,20 @@ class SystemExperiment:
# ── Git phase ─────────────────────────────────────────────────────────────
def create_branch(self, branch_name: str) -> bool:
"""Create and checkout a new git branch. Returns True on success."""
try:
subprocess.run(
["git", "checkout", "-b", branch_name],
cwd=str(self.workspace),
check=True,
timeout=30,
)
return True
except subprocess.CalledProcessError as exc:
logger.warning("Git branch creation failed: %s", exc)
return False
def commit_changes(self, message: str) -> bool:
"""Stage and commit all changes. Returns True on success."""
try:
@@ -421,3 +470,130 @@ class SystemExperiment:
except subprocess.CalledProcessError as exc:
logger.warning("Git revert failed: %s", exc)
return False
# ── Full experiment loop ──────────────────────────────────────────────────
def run(
self,
tox_env: str = "unit",
model: str = "qwen3:30b",
program_content: str = "",
max_iterations: int = 1,
dry_run: bool = False,
create_branch: bool = False,
) -> dict[str, Any]:
"""Run the full experiment loop: hypothesis → edit → tox → evaluate → commit/revert.
This method encapsulates the complete experiment cycle, running multiple
iterations until an improvement is found or max_iterations is reached.
Args:
tox_env: Tox environment to run (default "unit").
model: Ollama model for Aider edits (default "qwen3:30b").
program_content: Research direction for hypothesis generation.
max_iterations: Maximum number of experiment iterations.
dry_run: If True, only generate hypotheses without making changes.
create_branch: If True, create a new git branch for the experiment.
Returns:
Dict with keys: ``success`` (bool), ``final_metric`` (float|None),
``baseline`` (float|None), ``iterations`` (int), ``results`` (list).
"""
if create_branch:
branch_name = (
f"autoresearch/{self.target.replace('/', '-')}-{int(time.time())}"
)
self.create_branch(branch_name)
baseline: float | None = self.baseline
final_metric: float | None = None
success = False
for iteration in range(1, max_iterations + 1):
logger.info("Experiment iteration %d/%d", iteration, max_iterations)
# Generate hypothesis
hypothesis = self.hypothesis or self.generate_hypothesis(program_content)
logger.info("Hypothesis: %s", hypothesis)
# In dry-run mode, just record the hypothesis and continue
if dry_run:
result_record = {
"iteration": iteration,
"hypothesis": hypothesis,
"metric": None,
"baseline": baseline,
"assessment": "Dry-run: no changes made",
"success": True,
"duration_s": 0,
}
self.results.append(result_record)
continue
# Apply edit
edit_result = self.apply_edit(hypothesis, model=model)
edit_failed = "not available" in edit_result or edit_result.startswith(
"Aider error"
)
if edit_failed:
logger.warning("Edit phase failed: %s", edit_result)
# Run evaluation
tox_result = self.run_tox(tox_env=tox_env)
metric = tox_result["metric"]
# Evaluate result
assessment = self.evaluate(metric, baseline)
logger.info("Assessment: %s", assessment)
# Store result
result_record = {
"iteration": iteration,
"hypothesis": hypothesis,
"metric": metric,
"baseline": baseline,
"assessment": assessment,
"success": tox_result["success"],
"duration_s": tox_result["duration_s"],
}
self.results.append(result_record)
# Set baseline on first successful run
if metric is not None and baseline is None:
baseline = metric
self.baseline = baseline
final_metric = metric
continue
# Determine if we should commit or revert
should_commit = False
if tox_result["success"] and metric is not None and baseline is not None:
if self.is_improvement(metric, baseline):
should_commit = True
final_metric = metric
baseline = metric
self.baseline = baseline
success = True
if should_commit:
commit_msg = f"autoresearch: improve {self.metric} on {self.target}\n\n{hypothesis}"
if self.commit_changes(commit_msg):
logger.info("Changes committed")
else:
self.revert_changes()
logger.warning("Commit failed, changes reverted")
elif self.revert_on_failure:
self.revert_changes()
logger.info("Changes reverted (no improvement)")
# Early exit if we found an improvement
if success:
break
return {
"success": success,
"final_metric": final_metric,
"baseline": self.baseline,
"iterations": len(self.results),
"results": self.results,
}

View File

@@ -96,10 +96,14 @@ def _decide_autonomous(req, tool_name: str, tool_args: dict) -> None:
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
else:
req.reject(note="Auto-rejected: not in allowlist")
logger.info("AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100])
logger.info(
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
)
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
def _handle_tool_confirmation(
agent, run_output, session_id: str, *, autonomous: bool = False
):
"""Prompt user to approve/reject dangerous tool calls.
When Agno pauses a run because a tool requires confirmation, this
@@ -173,7 +177,9 @@ def think(
):
"""Ask Timmy to think carefully about a topic."""
timmy = create_timmy(backend=backend, session_id=_CLI_SESSION_ID)
timmy.print_response(f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID)
timmy.print_response(
f"Think carefully about: {topic}", stream=True, session_id=_CLI_SESSION_ID
)
def _read_message_input(message: list[str]) -> str:
@@ -246,7 +252,9 @@ def chat(
timmy = create_timmy(backend=backend, session_id=session_id)
run_output = timmy.run(message_str, stream=False, session_id=session_id)
run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous)
run_output = _handle_tool_confirmation(
timmy, run_output, session_id, autonomous=autonomous
)
content = run_output.content if hasattr(run_output, "content") else str(run_output)
if content:
@@ -300,7 +308,9 @@ def repl(
break
try:
response = loop.run_until_complete(chat(user_input, session_id=session_id))
response = loop.run_until_complete(
chat(user_input, session_id=session_id)
)
if response:
typer.echo(response)
typer.echo()
@@ -347,7 +357,10 @@ def interview(
# Force agent creation by calling chat once with a warm-up prompt
try:
loop.run_until_complete(
chat("Hello, Timmy. We're about to start your interview.", session_id="interview")
chat(
"Hello, Timmy. We're about to start your interview.",
session_id="interview",
)
)
except Exception as exc:
typer.echo(f"Warning: Initialization issue — {exc}", err=True)
@@ -360,7 +373,9 @@ def interview(
typer.echo("Starting interview...\n")
transcript = run_interview(
chat_fn=lambda msg: loop.run_until_complete(chat(msg, session_id="interview")),
chat_fn=lambda msg: loop.run_until_complete(
chat(msg, session_id="interview")
),
on_answer=_on_answer,
)
@@ -381,7 +396,9 @@ def interview(
@app.command()
def up(
dev: bool = typer.Option(False, "--dev", help="Enable hot-reload for development"),
build: bool = typer.Option(True, "--build/--no-build", help="Rebuild images before starting"),
build: bool = typer.Option(
True, "--build/--no-build", help="Rebuild images before starting"
),
):
"""Start Timmy Time in Docker (dashboard + agents)."""
cmd = ["docker", "compose"]
@@ -410,13 +427,23 @@ def down():
@app.command()
def voice(
whisper_model: str = typer.Option(
"base.en", "--whisper", "-w", help="Whisper model: tiny.en, base.en, small.en, medium.en"
"base.en",
"--whisper",
"-w",
help="Whisper model: tiny.en, base.en, small.en, medium.en",
),
use_say: bool = typer.Option(
False, "--say", help="Use macOS `say` instead of Piper TTS"
),
use_say: bool = typer.Option(False, "--say", help="Use macOS `say` instead of Piper TTS"),
threshold: float = typer.Option(
0.015, "--threshold", "-t", help="Mic silence threshold (RMS). Lower = more sensitive."
0.015,
"--threshold",
"-t",
help="Mic silence threshold (RMS). Lower = more sensitive.",
),
silence: float = typer.Option(
1.5, "--silence", help="Seconds of silence to end recording"
),
silence: float = typer.Option(1.5, "--silence", help="Seconds of silence to end recording"),
backend: str | None = _BACKEND_OPTION,
model_size: str | None = _MODEL_SIZE_OPTION,
):
@@ -457,9 +484,12 @@ def route(
@app.command()
def focus(
topic: str | None = typer.Argument(
None, help='Topic to focus on (e.g. "three-phase loop"). Omit to show current focus.'
None,
help='Topic to focus on (e.g. "three-phase loop"). Omit to show current focus.',
),
clear: bool = typer.Option(
False, "--clear", "-c", help="Clear focus and return to broad mode"
),
clear: bool = typer.Option(False, "--clear", "-c", help="Clear focus and return to broad mode"),
):
"""Set deep-focus mode on a single problem.
@@ -495,7 +525,9 @@ def healthcheck(
verbose: bool = typer.Option(
False, "--verbose", "-v", help="Show verbose output including issue details"
),
quiet: bool = typer.Option(False, "--quiet", "-q", help="Only show status line (no details)"),
quiet: bool = typer.Option(
False, "--quiet", "-q", help="Only show status line (no details)"
),
):
"""Quick health snapshot before coding.
@@ -604,7 +636,8 @@ def learn(
if target is None:
typer.echo(
"Error: --target is required. Specify the module or file to optimise.", err=True
"Error: --target is required. Specify the module or file to optimise.",
err=True,
)
raise typer.Exit(1)
@@ -616,59 +649,57 @@ def learn(
typer.echo()
typer.echo(typer.style("Autoresearch", bold=True) + f"{target}")
typer.echo(f" metric={metric} budget={budget}min max={max_experiments} tox={tox_env}")
typer.echo(
f" metric={metric} budget={budget}min max={max_experiments} tox={tox_env}"
)
if dry_run:
typer.echo(" (dry-run — no changes will be made)")
typer.echo()
baseline: float | None = None
def _progress_callback(iteration: int, max_iter: int, message: str) -> None:
"""Print progress updates during experiment iterations."""
if iteration > 0:
prefix = typer.style(f"[{iteration}/{max_iter}]", bold=True)
typer.echo(f"{prefix} {message}")
try:
for i in range(1, max_experiments + 1):
typer.echo(typer.style(f"[{i}/{max_experiments}]", bold=True), nl=False)
# Run the full experiment loop via the SystemExperiment class
result = experiment.run(
tox_env=tox_env,
model=model,
program_content=program_content,
max_iterations=max_experiments,
dry_run=dry_run,
create_branch=False, # CLI mode: work on current branch
)
hypothesis = experiment.generate_hypothesis(program_content)
typer.echo(f" {hypothesis}")
# Display results for each iteration
for i, record in enumerate(experiment.results, 1):
_progress_callback(i, max_experiments, record["hypothesis"])
if dry_run:
continue
# Edit phase
# Edit phase result
typer.echo(" → editing …", nl=False)
edit_result = experiment.apply_edit(hypothesis, model=model)
if "not available" in edit_result or edit_result.startswith("Aider error"):
typer.echo(f" skipped ({edit_result.split(':')[0]})")
if record.get("edit_failed"):
typer.echo(f" skipped ({record.get('edit_result', 'unknown')})")
else:
typer.echo(" done")
# Evaluate phase
typer.echo(" → running tox …", nl=False)
tox_result = experiment.run_tox(tox_env=tox_env)
typer.echo(f" {tox_result['duration_s']}s")
# Evaluate phase result
duration = record.get("duration_s", 0)
typer.echo(f" → running tox … {duration}s")
assessment = experiment.evaluate(tox_result["metric"], baseline)
# Assessment
assessment = record.get("assessment", "No assessment")
typer.echo(f"{assessment}")
if tox_result["metric"] is not None and baseline is None:
baseline = tox_result["metric"]
if tox_result["success"] and tox_result["metric"] is not None and baseline is not None:
if experiment.is_improvement(tox_result["metric"], baseline):
commit_msg = (
f"autoresearch: improve {metric} on {target}{assessment}"
)
if experiment.commit_changes(commit_msg):
typer.echo(" → committed")
baseline = tox_result["metric"]
else:
experiment.revert_changes()
typer.echo(" → commit failed, reverted")
else:
experiment.revert_changes()
typer.echo(" → reverted (no improvement)")
elif not tox_result["success"]:
experiment.revert_changes()
typer.echo(f" → reverted ({tox_result['error']})")
# Outcome
if record.get("committed"):
typer.echo(" → committed")
elif record.get("reverted"):
typer.echo(" → reverted (no improvement)")
typer.echo()
@@ -677,8 +708,8 @@ def learn(
raise typer.Exit(0) from None
typer.echo(typer.style("Autoresearch complete.", bold=True))
if baseline is not None:
typer.echo(f"Final {metric}: {baseline:.4f}")
if result.get("baseline") is not None:
typer.echo(f"Final {metric}: {result['baseline']:.4f}")
def main():

View File

@@ -39,7 +39,9 @@ class TestPrepareExperiment:
from timmy.autoresearch import prepare_experiment
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="", stderr="auth failed")
mock_run.return_value = MagicMock(
returncode=1, stdout="", stderr="auth failed"
)
result = prepare_experiment(tmp_path)
assert "failed" in result.lower()
@@ -102,7 +104,9 @@ class TestRunExperiment:
(repo_dir / "train.py").write_text("print('done')")
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="no metrics here", stderr="")
mock_run.return_value = MagicMock(
returncode=0, stdout="no metrics here", stderr=""
)
result = run_experiment(tmp_path)
assert result["success"] is True
@@ -349,3 +353,107 @@ class TestSystemExperiment:
success = exp.revert_changes()
assert success is False
def test_create_branch_success(self, tmp_path):
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
success = exp.create_branch("feature/test-branch")
assert success is True
# Verify correct git command was called
mock_run.assert_called_once()
call_args = mock_run.call_args[0][0]
assert "checkout" in call_args
assert "-b" in call_args
assert "feature/test-branch" in call_args
def test_create_branch_failure(self, tmp_path):
import subprocess
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.side_effect = subprocess.CalledProcessError(1, "git")
success = exp.create_branch("feature/test-branch")
assert success is False
def test_run_dry_run_mode(self, tmp_path):
"""Test that run() in dry_run mode only generates hypotheses."""
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
result = exp.run(max_iterations=3, dry_run=True, program_content="Test program")
assert result["iterations"] == 3
assert result["success"] is False # No actual experiments run
assert len(exp.results) == 3
# Each result should have a hypothesis
for record in exp.results:
assert "hypothesis" in record
def test_run_with_custom_metric_fn(self, tmp_path):
"""Test that custom metric_fn is used for metric extraction."""
from timmy.autoresearch import SystemExperiment
def custom_metric_fn(output: str) -> float | None:
match = __import__("re").search(r"custom_metric:\s*([0-9.]+)", output)
return float(match.group(1)) if match else None
exp = SystemExperiment(
target="x.py",
workspace=tmp_path,
metric="custom",
metric_fn=custom_metric_fn,
)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout="custom_metric: 42.5\nother output",
stderr="",
)
tox_result = exp.run_tox()
assert tox_result["metric"] == pytest.approx(42.5)
def test_run_single_iteration_success(self, tmp_path):
"""Test a successful single iteration that finds an improvement."""
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
with patch("timmy.autoresearch.subprocess.run") as mock_run:
# Mock tox returning a passing test with metric
mock_run.return_value = MagicMock(
returncode=0,
stdout="10 passed in 1.23s",
stderr="",
)
result = exp.run(max_iterations=1, tox_env="unit")
assert result["iterations"] == 1
assert len(exp.results) == 1
assert exp.results[0]["metric"] == pytest.approx(100.0)
def test_run_stores_baseline_on_first_success(self, tmp_path):
"""Test that baseline is set after first successful iteration."""
from timmy.autoresearch import SystemExperiment
exp = SystemExperiment(target="x.py", workspace=tmp_path)
assert exp.baseline is None
with patch("timmy.autoresearch.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0,
stdout="8 passed in 1.23s",
stderr="",
)
exp.run(max_iterations=1)
assert exp.baseline == pytest.approx(100.0)
assert exp.results[0]["baseline"] is None # First run has no baseline