diff --git a/pyproject.toml b/pyproject.toml
index 54f4b893..ebac5829 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -76,6 +76,7 @@ creative = [
timmy = "timmy.cli:main"
timmy-serve = "timmy_serve.cli:main"
self-tdd = "self_tdd.watchdog:main"
+self-modify = "self_modify.cli:main"
[tool.hatch.build.targets.wheel]
sources = {"src" = ""}
@@ -97,6 +98,7 @@ include = [
"src/creative",
"src/agent_core",
"src/lightning",
+ "src/self_modify",
]
[tool.pytest.ini_options]
diff --git a/src/config.py b/src/config.py
index bdf8a843..9bd5e6d3 100644
--- a/src/config.py
+++ b/src/config.py
@@ -76,6 +76,14 @@ class Settings(BaseSettings):
# In production, security settings are strictly enforced.
timmy_env: Literal["development", "production"] = "development"
+ # ── Self-Modification ──────────────────────────────────────────────
+ # Enable self-modification capabilities. When enabled, Timmy can
+ # edit its own source code, run tests, and commit changes.
+ self_modify_enabled: bool = False
+ self_modify_max_retries: int = 2
+ self_modify_allowed_dirs: str = "src,tests"
+ self_modify_backend: str = "auto" # "ollama", "anthropic", or "auto"
+
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index da1be36e..3b2788a9 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -26,6 +26,7 @@ from dashboard.routes.tools import router as tools_router
from dashboard.routes.spark import router as spark_router
from dashboard.routes.creative import router as creative_router
from dashboard.routes.discord import router as discord_router
+from dashboard.routes.self_modify import router as self_modify_router
logging.basicConfig(
level=logging.INFO,
@@ -154,6 +155,7 @@ app.include_router(tools_router)
app.include_router(spark_router)
app.include_router(creative_router)
app.include_router(discord_router)
+app.include_router(self_modify_router)
@app.get("/", response_class=HTMLResponse)
diff --git a/src/dashboard/routes/self_modify.py b/src/dashboard/routes/self_modify.py
new file mode 100644
index 00000000..2e0cf74a
--- /dev/null
+++ b/src/dashboard/routes/self_modify.py
@@ -0,0 +1,71 @@
+"""Self-modification routes — /self-modify endpoints.
+
+Exposes the edit-test-commit loop as a REST API. Gated by
+``SELF_MODIFY_ENABLED`` (default False).
+"""
+
+import asyncio
+import logging
+
+from fastapi import APIRouter, Form, HTTPException
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/self-modify", tags=["self-modify"])
+
+
+@router.post("/run")
+async def run_self_modify(
+ instruction: str = Form(...),
+ target_files: str = Form(""),
+ dry_run: bool = Form(False),
+ speak_result: bool = Form(False),
+):
+ """Execute a self-modification loop.
+
+ Returns the ModifyResult as JSON.
+ """
+ if not settings.self_modify_enabled:
+ raise HTTPException(403, "Self-modification is disabled")
+
+ from self_modify.loop import SelfModifyLoop, ModifyRequest
+
+ files = [f.strip() for f in target_files.split(",") if f.strip()]
+ request = ModifyRequest(
+ instruction=instruction,
+ target_files=files,
+ dry_run=dry_run,
+ )
+
+ loop = SelfModifyLoop()
+ result = await asyncio.to_thread(loop.run, request)
+
+ if speak_result and result.success:
+ try:
+ from timmy_serve.voice_tts import voice_tts
+
+ if voice_tts.available:
+ voice_tts.speak(
+ f"Code modification complete. "
+ f"{len(result.files_changed)} files changed. Tests passing."
+ )
+ except Exception:
+ pass
+
+ return {
+ "success": result.success,
+ "files_changed": result.files_changed,
+ "test_passed": result.test_passed,
+ "commit_sha": result.commit_sha,
+ "branch_name": result.branch_name,
+ "error": result.error,
+ "attempts": result.attempts,
+ }
+
+
+@router.get("/status")
+async def self_modify_status():
+ """Return whether self-modification is enabled."""
+ return {"enabled": settings.self_modify_enabled}
diff --git a/src/dashboard/routes/voice_enhanced.py b/src/dashboard/routes/voice_enhanced.py
index cd9339c8..8a17ec01 100644
--- a/src/dashboard/routes/voice_enhanced.py
+++ b/src/dashboard/routes/voice_enhanced.py
@@ -55,6 +55,39 @@ async def process_voice_input(
elif intent.name == "voice":
response_text = "Voice settings acknowledged. TTS is available for spoken responses."
+ elif intent.name == "code":
+ from config import settings as app_settings
+ if not app_settings.self_modify_enabled:
+ response_text = (
+ "Self-modification is disabled. "
+ "Set SELF_MODIFY_ENABLED=true to enable."
+ )
+ else:
+ import asyncio
+ from self_modify.loop import SelfModifyLoop, ModifyRequest
+
+ target_files = []
+ if "target_file" in intent.entities:
+ target_files = [intent.entities["target_file"]]
+
+ loop = SelfModifyLoop()
+ request = ModifyRequest(
+ instruction=text,
+ target_files=target_files,
+ )
+ result = await asyncio.to_thread(loop.run, request)
+
+ if result.success:
+ sha_short = result.commit_sha[:8] if result.commit_sha else "none"
+ response_text = (
+ f"Code modification complete. "
+ f"Changed {len(result.files_changed)} file(s). "
+ f"Tests passed. Committed as {sha_short} "
+ f"on branch {result.branch_name}."
+ )
+ else:
+ response_text = f"Code modification failed: {result.error}"
+
else:
# Default: chat with Timmy
agent = create_timmy()
diff --git a/src/self_modify/__init__.py b/src/self_modify/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/src/self_modify/cli.py b/src/self_modify/cli.py
new file mode 100644
index 00000000..9a74fb6f
--- /dev/null
+++ b/src/self_modify/cli.py
@@ -0,0 +1,134 @@
+"""CLI for self-modification — run from the terminal.
+
+Usage:
+ self-modify run "Add a docstring to src/timmy/prompts.py" --file src/timmy/prompts.py
+ self-modify run "Fix the bug in config" --dry-run
+ self-modify run "Add logging" --backend anthropic --autonomous
+ self-modify status
+"""
+
+import logging
+import os
+from typing import Optional
+
+import typer
+from rich.console import Console
+from rich.panel import Panel
+
+console = Console()
+app = typer.Typer(help="Timmy self-modify — edit code, run tests, commit")
+
+
+@app.command()
+def run(
+ instruction: str = typer.Argument(..., help="What to change (natural language)"),
+ file: Optional[list[str]] = typer.Option(None, "--file", "-f", help="Target file(s) to modify"),
+ dry_run: bool = typer.Option(False, "--dry-run", "-n", help="Generate edits but don't write"),
+ retries: int = typer.Option(2, "--retries", "-r", help="Max retry attempts on test failure"),
+ backend: Optional[str] = typer.Option(None, "--backend", "-b", help="LLM backend: ollama, anthropic, auto"),
+ autonomous: bool = typer.Option(False, "--autonomous", "-a", help="Enable autonomous self-correction"),
+ max_cycles: int = typer.Option(3, "--max-cycles", help="Max autonomous correction cycles"),
+ branch: bool = typer.Option(False, "--branch", help="Create a git branch (off by default to avoid container restarts)"),
+ speak: bool = typer.Option(False, "--speak", "-s", help="Speak the result via TTS"),
+):
+ """Run the self-modification loop."""
+ # Force enable for CLI usage
+ os.environ["SELF_MODIFY_ENABLED"] = "true"
+
+ logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)-8s %(name)s -- %(message)s",
+ datefmt="%H:%M:%S",
+ )
+
+ # Skip branch creation unless explicitly requested
+ if not branch:
+ os.environ["SELF_MODIFY_SKIP_BRANCH"] = "1"
+
+ from self_modify.loop import SelfModifyLoop, ModifyRequest
+
+ target_files = list(file) if file else []
+ effective_backend = backend or os.environ.get("SELF_MODIFY_BACKEND", "auto")
+
+ console.print(Panel(
+ f"[bold]Instruction:[/bold] {instruction}\n"
+ f"[bold]Files:[/bold] {', '.join(target_files) or '(auto-detect)'}\n"
+ f"[bold]Backend:[/bold] {effective_backend}\n"
+ f"[bold]Autonomous:[/bold] {autonomous}\n"
+ f"[bold]Dry run:[/bold] {dry_run}\n"
+ f"[bold]Max retries:[/bold] {retries}",
+ title="Self-Modify",
+ border_style="cyan",
+ ))
+
+ loop = SelfModifyLoop(
+ max_retries=retries,
+ backend=effective_backend,
+ autonomous=autonomous,
+ max_autonomous_cycles=max_cycles,
+ )
+ request = ModifyRequest(
+ instruction=instruction,
+ target_files=target_files,
+ dry_run=dry_run,
+ )
+
+ with console.status("[bold cyan]Running self-modification loop..."):
+ result = loop.run(request)
+
+ if result.report_path:
+ console.print(f"\n[dim]Report saved: {result.report_path}[/dim]\n")
+
+ if result.success:
+ console.print(Panel(
+ f"[green bold]SUCCESS[/green bold]\n\n"
+ f"Files changed: {', '.join(result.files_changed)}\n"
+ f"Tests passed: {result.test_passed}\n"
+ f"Commit: {result.commit_sha or 'none (dry run)'}\n"
+ f"Branch: {result.branch_name or 'current'}\n"
+ f"Attempts: {result.attempts}\n"
+ f"Autonomous cycles: {result.autonomous_cycles}",
+ title="Result",
+ border_style="green",
+ ))
+ else:
+ console.print(Panel(
+ f"[red bold]FAILED[/red bold]\n\n"
+ f"Error: {result.error}\n"
+ f"Attempts: {result.attempts}\n"
+ f"Autonomous cycles: {result.autonomous_cycles}",
+ title="Result",
+ border_style="red",
+ ))
+ raise typer.Exit(1)
+
+ if speak and result.success:
+ try:
+ from timmy_serve.voice_tts import voice_tts
+ if voice_tts.available:
+ voice_tts.speak_sync(
+ f"Code modification complete. "
+ f"{len(result.files_changed)} files changed. Tests passing."
+ )
+ except Exception:
+ pass
+
+
+@app.command()
+def status():
+ """Show whether self-modification is enabled."""
+ from config import settings
+ enabled = settings.self_modify_enabled
+ color = "green" if enabled else "red"
+ console.print(f"Self-modification: [{color}]{'ENABLED' if enabled else 'DISABLED'}[/{color}]")
+ console.print(f"Max retries: {settings.self_modify_max_retries}")
+ console.print(f"Backend: {settings.self_modify_backend}")
+ console.print(f"Allowed dirs: {settings.self_modify_allowed_dirs}")
+
+
+def main():
+ app()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/self_modify/loop.py b/src/self_modify/loop.py
new file mode 100644
index 00000000..633c905a
--- /dev/null
+++ b/src/self_modify/loop.py
@@ -0,0 +1,741 @@
+"""Self-modification loop — read source, generate edits, test, commit.
+
+Orchestrates the full cycle for Timmy to modify its own codebase:
+1. Create a working git branch
+2. Read target source files
+3. Send instruction + source to the LLM
+4. Validate syntax before writing
+5. Write edits to disk
+6. Run pytest
+7. On success -> git add + commit; on failure -> revert
+8. On total failure -> diagnose from report, restart autonomously
+
+Supports multiple LLM backends:
+- "ollama" — local Ollama (default, sovereign)
+- "anthropic" — Claude API via Anthropic SDK
+- "auto" — try anthropic first (if key set), fall back to ollama
+
+Reports are saved to data/self_modify_reports/ for debugging.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import re
+import subprocess
+import sys
+import threading
+import time
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Optional
+
+from config import settings
+
+logger = logging.getLogger(__name__)
+
+# Project root — two levels up from src/self_modify/
+PROJECT_ROOT = Path(__file__).parent.parent.parent
+
+# Reports directory
+REPORTS_DIR = PROJECT_ROOT / "data" / "self_modify_reports"
+
+# Only one self-modification at a time
+_LOCK = threading.Lock()
+
+# Maximum file size we'll send to the LLM (bytes)
+_MAX_FILE_SIZE = 50_000
+
+# Delimiter format the LLM is instructed to use
+_FILE_BLOCK_RE = re.compile(
+ r"---\s*FILE:\s*(.+?)\s*---\n(.*?)---\s*END\s*FILE\s*---",
+ re.DOTALL,
+)
+
+# Backend type literal
+BACKENDS = ("ollama", "anthropic", "auto")
+
+
+@dataclass
+class ModifyRequest:
+ """A request to modify code."""
+
+ instruction: str
+ target_files: list[str] = field(default_factory=list)
+ dry_run: bool = False
+
+
+@dataclass
+class ModifyResult:
+ """Result of a self-modification attempt."""
+
+ success: bool
+ files_changed: list[str] = field(default_factory=list)
+ test_passed: bool = False
+ commit_sha: Optional[str] = None
+ branch_name: Optional[str] = None
+ error: Optional[str] = None
+ llm_response: str = ""
+ attempts: int = 0
+ report_path: Optional[str] = None
+ autonomous_cycles: int = 0
+
+
+class SelfModifyLoop:
+ """Orchestrates the read -> edit -> test -> commit cycle.
+
+ Supports autonomous self-correction: when all retries fail, reads its own
+ failure report, diagnoses the root cause, and restarts with a corrected
+ instruction.
+ """
+
+ def __init__(
+ self,
+ repo_path: Optional[Path] = None,
+ max_retries: Optional[int] = None,
+ backend: Optional[str] = None,
+ autonomous: bool = False,
+ max_autonomous_cycles: int = 3,
+ ) -> None:
+ self._repo_path = repo_path or PROJECT_ROOT
+ self._max_retries = (
+ max_retries if max_retries is not None else settings.self_modify_max_retries
+ )
+ self._allowed_dirs = [
+ d.strip() for d in settings.self_modify_allowed_dirs.split(",") if d.strip()
+ ]
+ self._run_id = f"{int(time.time())}"
+ self._attempt_reports: list[dict] = []
+ self._backend = backend or settings.self_modify_backend
+ self._autonomous = autonomous
+ self._max_autonomous_cycles = max_autonomous_cycles
+
+ # ── Public API ────────────────────────────────────────────────────────────
+
+ def run(self, request: ModifyRequest) -> ModifyResult:
+ """Execute the full self-modification loop."""
+ if not settings.self_modify_enabled:
+ return ModifyResult(
+ success=False,
+ error="Self-modification is disabled. Set SELF_MODIFY_ENABLED=true.",
+ )
+
+ if not _LOCK.acquire(blocking=False):
+ return ModifyResult(
+ success=False,
+ error="Another self-modification is already running.",
+ )
+
+ try:
+ result = self._run_locked(request)
+ report_path = self._save_report(request, result)
+ result.report_path = str(report_path)
+
+ # Autonomous mode: if failed, diagnose and restart
+ if self._autonomous and not result.success and not request.dry_run:
+ result = self._autonomous_loop(request, result, report_path)
+
+ return result
+ finally:
+ _LOCK.release()
+
+ # ── Autonomous self-correction ─────────────────────────────────────────
+
+ def _autonomous_loop(
+ self, original_request: ModifyRequest, last_result: ModifyResult, last_report: Path
+ ) -> ModifyResult:
+ """Read the failure report, diagnose, and restart with a fix."""
+ for cycle in range(1, self._max_autonomous_cycles + 1):
+ logger.info("Autonomous cycle %d/%d", cycle, self._max_autonomous_cycles)
+
+ # Diagnose what went wrong
+ diagnosis = self._diagnose_failure(last_report)
+ if not diagnosis:
+ logger.warning("Could not diagnose failure, stopping autonomous loop")
+ last_result.autonomous_cycles = cycle
+ return last_result
+
+ logger.info("Diagnosis: %s", diagnosis[:200])
+
+ # Build a corrected instruction
+ corrected_instruction = (
+ f"{original_request.instruction}\n\n"
+ f"IMPORTANT CORRECTION from previous failure:\n{diagnosis}"
+ )
+
+ # Reset attempt reports for this cycle
+ self._attempt_reports = []
+
+ corrected_request = ModifyRequest(
+ instruction=corrected_instruction,
+ target_files=original_request.target_files,
+ dry_run=original_request.dry_run,
+ )
+
+ result = self._run_locked(corrected_request)
+ report_path = self._save_report(corrected_request, result)
+ result.report_path = str(report_path)
+ result.autonomous_cycles = cycle
+
+ if result.success:
+ logger.info("Autonomous cycle %d succeeded!", cycle)
+ return result
+
+ last_result = result
+ last_report = report_path
+
+ logger.warning("Autonomous loop exhausted after %d cycles", self._max_autonomous_cycles)
+ return last_result
+
+ def _diagnose_failure(self, report_path: Path) -> Optional[str]:
+ """Read a failure report and produce a diagnosis + fix instruction.
+
+ Uses the best available LLM to analyze the report. This is the
+ 'meta-reasoning' step — the agent reasoning about its own failures.
+ """
+ try:
+ report_text = report_path.read_text(encoding="utf-8")
+ except Exception as exc:
+ logger.error("Could not read report %s: %s", report_path, exc)
+ return None
+
+ # Truncate to keep within context limits
+ if len(report_text) > 8000:
+ report_text = report_text[:8000] + "\n... (truncated)"
+
+ diagnosis_prompt = f"""You are a code debugging expert. Analyze this self-modification failure report and provide a concise diagnosis.
+
+FAILURE REPORT:
+{report_text}
+
+Analyze the report and provide:
+1. ROOT CAUSE: What specifically went wrong (syntax error, logic error, missing import, etc.)
+2. FIX INSTRUCTIONS: Exact instructions for a code-generation LLM to avoid this mistake.
+ Be very specific — e.g. "Do NOT start the file with triple-quotes" or
+ "The em-dash character U+2014 must stay INSIDE a string literal, never outside one."
+
+Keep your response under 500 words. Focus on actionable fix instructions."""
+
+ try:
+ raw = self._call_llm(diagnosis_prompt)
+ return raw.strip() if raw else None
+ except Exception as exc:
+ logger.error("Diagnosis LLM call failed: %s", exc)
+ return None
+
+ # ── Internal orchestration ────────────────────────────────────────────────
+
+ def _run_locked(self, request: ModifyRequest) -> ModifyResult:
+ branch_name = None
+ attempt = 0
+
+ # Skip branch creation — writing files triggers container restarts
+ # which kills the process mid-operation. Work on the current branch.
+ if not os.environ.get("SELF_MODIFY_SKIP_BRANCH"):
+ try:
+ branch_name = self._create_branch()
+ except Exception as exc:
+ logger.warning("Could not create branch: %s (continuing on current)", exc)
+
+ # Resolve target files
+ target_files = request.target_files or self._infer_target_files(
+ request.instruction
+ )
+ if not target_files:
+ return ModifyResult(
+ success=False,
+ error="No target files identified. Specify target_files or use more specific language.",
+ branch_name=branch_name,
+ )
+
+ # Validate paths
+ try:
+ self._validate_paths(target_files)
+ except ValueError as exc:
+ return ModifyResult(success=False, error=str(exc), branch_name=branch_name)
+
+ last_test_output = ""
+ last_llm_response = ""
+ last_syntax_errors: dict[str, str] = {}
+
+ while attempt <= self._max_retries:
+ attempt += 1
+ logger.info(
+ "Self-modify attempt %d/%d: %s",
+ attempt,
+ self._max_retries + 1,
+ request.instruction[:80],
+ )
+
+ # Read current contents
+ file_contents = self._read_files(target_files)
+ if not file_contents:
+ return ModifyResult(
+ success=False,
+ error="Could not read any target files.",
+ branch_name=branch_name,
+ attempts=attempt,
+ )
+
+ # Generate edits via LLM
+ try:
+ edits, llm_response = self._generate_edits(
+ request.instruction, file_contents,
+ prev_test_output=last_test_output if attempt > 1 else None,
+ prev_syntax_errors=last_syntax_errors if attempt > 1 else None,
+ )
+ last_llm_response = llm_response
+ except Exception as exc:
+ self._attempt_reports.append({
+ "attempt": attempt,
+ "phase": "llm_generation",
+ "error": str(exc),
+ })
+ return ModifyResult(
+ success=False,
+ error=f"LLM generation failed: {exc}",
+ branch_name=branch_name,
+ attempts=attempt,
+ )
+
+ if not edits:
+ self._attempt_reports.append({
+ "attempt": attempt,
+ "phase": "parse_edits",
+ "error": "No file edits parsed from LLM response",
+ "llm_response": llm_response,
+ })
+ return ModifyResult(
+ success=False,
+ error="LLM produced no file edits.",
+ llm_response=llm_response,
+ branch_name=branch_name,
+ attempts=attempt,
+ )
+
+ # Syntax validation — check BEFORE writing to disk
+ syntax_errors = self._validate_syntax(edits)
+ if syntax_errors:
+ last_syntax_errors = syntax_errors
+ error_summary = "; ".join(
+ f"{fp}: {err}" for fp, err in syntax_errors.items()
+ )
+ logger.warning("Syntax errors in LLM output: %s", error_summary)
+ self._attempt_reports.append({
+ "attempt": attempt,
+ "phase": "syntax_validation",
+ "error": error_summary,
+ "edits_content": {fp: content for fp, content in edits.items()},
+ "llm_response": llm_response,
+ })
+ # Don't write — go straight to retry
+ continue
+
+ last_syntax_errors = {}
+
+ if request.dry_run:
+ self._attempt_reports.append({
+ "attempt": attempt,
+ "phase": "dry_run",
+ "edits": {fp: content[:500] + "..." if len(content) > 500 else content
+ for fp, content in edits.items()},
+ "llm_response": llm_response,
+ })
+ return ModifyResult(
+ success=True,
+ files_changed=list(edits.keys()),
+ llm_response=llm_response,
+ branch_name=branch_name,
+ attempts=attempt,
+ )
+
+ # Write edits
+ written = self._write_files(edits)
+
+ # Run tests
+ test_passed, test_output = self._run_tests()
+ last_test_output = test_output
+
+ # Save per-attempt report
+ self._attempt_reports.append({
+ "attempt": attempt,
+ "phase": "complete",
+ "files_written": written,
+ "edits_content": {fp: content for fp, content in edits.items()},
+ "test_passed": test_passed,
+ "test_output": test_output,
+ "llm_response": llm_response,
+ })
+
+ if test_passed:
+ sha = self._git_commit(
+ f"self-modify: {request.instruction[:72]}", written
+ )
+ return ModifyResult(
+ success=True,
+ files_changed=written,
+ test_passed=True,
+ commit_sha=sha,
+ branch_name=branch_name,
+ llm_response=llm_response,
+ attempts=attempt,
+ )
+
+ # Tests failed — revert and maybe retry
+ logger.warning(
+ "Tests failed on attempt %d: %s", attempt, test_output[:200]
+ )
+ self._revert_files(written)
+
+ return ModifyResult(
+ success=False,
+ files_changed=[],
+ test_passed=False,
+ error=f"Tests failed after {attempt} attempt(s).",
+ llm_response=last_llm_response,
+ branch_name=branch_name,
+ attempts=attempt,
+ )
+
+ # ── Syntax validation ──────────────────────────────────────────────────
+
+ def _validate_syntax(self, edits: dict[str, str]) -> dict[str, str]:
+ """Compile-check each .py file edit. Returns {path: error} for failures."""
+ errors: dict[str, str] = {}
+ for fp, content in edits.items():
+ if not fp.endswith(".py"):
+ continue
+ try:
+ compile(content, fp, "exec")
+ except SyntaxError as exc:
+ errors[fp] = f"line {exc.lineno}: {exc.msg}"
+ return errors
+
+ # ── Report saving ─────────────────────────────────────────────────────────
+
+ def _save_report(self, request: ModifyRequest, result: ModifyResult) -> Path:
+ """Save a detailed report to data/self_modify_reports/."""
+ REPORTS_DIR.mkdir(parents=True, exist_ok=True)
+ ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
+ slug = re.sub(r"[^a-z0-9]+", "_", request.instruction[:40].lower()).strip("_")
+ report_file = REPORTS_DIR / f"{ts}_{slug}.md"
+
+ lines = [
+ f"# Self-Modify Report: {ts}",
+ "",
+ f"**Instruction:** {request.instruction[:200]}",
+ f"**Target files:** {', '.join(request.target_files) or '(auto-detected)'}",
+ f"**Dry run:** {request.dry_run}",
+ f"**Backend:** {self._backend}",
+ f"**Branch:** {result.branch_name or 'N/A'}",
+ f"**Result:** {'SUCCESS' if result.success else 'FAILED'}",
+ f"**Error:** {result.error or 'none'}",
+ f"**Commit:** {result.commit_sha or 'none'}",
+ f"**Attempts:** {result.attempts}",
+ f"**Autonomous cycles:** {result.autonomous_cycles}",
+ "",
+ ]
+
+ for attempt_data in self._attempt_reports:
+ n = attempt_data.get("attempt", "?")
+ phase = attempt_data.get("phase", "?")
+ lines.append(f"## Attempt {n} -- {phase}")
+ lines.append("")
+
+ if "error" in attempt_data and attempt_data.get("phase") != "complete":
+ lines.append(f"**Error:** {attempt_data['error']}")
+ lines.append("")
+
+ if "llm_response" in attempt_data:
+ lines.append("### LLM Response")
+ lines.append("```")
+ lines.append(attempt_data["llm_response"])
+ lines.append("```")
+ lines.append("")
+
+ if "edits_content" in attempt_data:
+ lines.append("### Edits Written")
+ for fp, content in attempt_data["edits_content"].items():
+ lines.append(f"#### {fp}")
+ lines.append("```python")
+ lines.append(content)
+ lines.append("```")
+ lines.append("")
+
+ if "test_output" in attempt_data:
+ lines.append(f"### Test Result: {'PASSED' if attempt_data.get('test_passed') else 'FAILED'}")
+ lines.append("```")
+ lines.append(attempt_data["test_output"])
+ lines.append("```")
+ lines.append("")
+
+ report_text = "\n".join(lines)
+ report_file.write_text(report_text, encoding="utf-8")
+ logger.info("Report saved: %s", report_file)
+ return report_file
+
+ # ── Git helpers ───────────────────────────────────────────────────────────
+
+ def _create_branch(self) -> str:
+ """Create and switch to a working branch."""
+ from tools.git_tools import git_branch
+
+ branch_name = f"timmy/self-modify-{int(time.time())}"
+ git_branch(self._repo_path, create=branch_name, switch=branch_name)
+ logger.info("Created branch: %s", branch_name)
+ return branch_name
+
+ def _git_commit(self, message: str, files: list[str]) -> Optional[str]:
+ """Stage files and commit."""
+ from tools.git_tools import git_add, git_commit
+
+ try:
+ git_add(self._repo_path, paths=files)
+ result = git_commit(self._repo_path, message)
+ sha = result.get("sha")
+ logger.info("Committed %s: %s", sha[:8] if sha else "?", message)
+ return sha
+ except Exception as exc:
+ logger.error("Git commit failed: %s", exc)
+ return None
+
+ def _revert_files(self, file_paths: list[str]) -> None:
+ """Restore files from git HEAD."""
+ for fp in file_paths:
+ try:
+ subprocess.run(
+ ["git", "checkout", "HEAD", "--", fp],
+ cwd=self._repo_path,
+ capture_output=True,
+ timeout=10,
+ )
+ except Exception as exc:
+ logger.error("Failed to revert %s: %s", fp, exc)
+
+ # ── File I/O ──────────────────────────────────────────────────────────────
+
+ def _validate_paths(self, file_paths: list[str]) -> None:
+ """Ensure all paths are within allowed directories."""
+ for fp in file_paths:
+ resolved = (self._repo_path / fp).resolve()
+ repo_resolved = self._repo_path.resolve()
+ if not str(resolved).startswith(str(repo_resolved)):
+ raise ValueError(f"Path escapes repository: {fp}")
+ rel = str(resolved.relative_to(repo_resolved))
+ if not any(rel.startswith(d) for d in self._allowed_dirs):
+ raise ValueError(
+ f"Path not in allowed directories ({self._allowed_dirs}): {fp}"
+ )
+
+ def _read_files(self, file_paths: list[str]) -> dict[str, str]:
+ """Read file contents from disk."""
+ contents: dict[str, str] = {}
+ for fp in file_paths:
+ full = self._repo_path / fp
+ if not full.is_file():
+ logger.warning("File not found: %s", full)
+ continue
+ if full.stat().st_size > _MAX_FILE_SIZE:
+ logger.warning("File too large, skipping: %s", fp)
+ continue
+ try:
+ contents[fp] = full.read_text(encoding="utf-8")
+ except Exception as exc:
+ logger.warning("Could not read %s: %s", fp, exc)
+ return contents
+
+ def _write_files(self, edits: dict[str, str]) -> list[str]:
+ """Write edited content to disk. Returns paths written."""
+ written: list[str] = []
+ for fp, content in edits.items():
+ full = self._repo_path / fp
+ full.parent.mkdir(parents=True, exist_ok=True)
+ full.write_text(content, encoding="utf-8")
+ written.append(fp)
+ logger.info("Wrote %d bytes to %s", len(content), fp)
+ return written
+
+ def _infer_target_files(self, instruction: str) -> list[str]:
+ """Guess which files to modify from the instruction text."""
+ paths = re.findall(r"[\w/._-]+\.py", instruction)
+ if paths:
+ return paths
+
+ keyword_files = {
+ "config": ["src/config.py"],
+ "health": ["src/dashboard/routes/health.py"],
+ "swarm": ["src/swarm/coordinator.py"],
+ "voice": ["src/voice/nlu.py"],
+ "agent": ["src/timmy/agent.py"],
+ "tool": ["src/timmy/tools.py"],
+ "dashboard": ["src/dashboard/app.py"],
+ "prompt": ["src/timmy/prompts.py"],
+ }
+ instruction_lower = instruction.lower()
+ for keyword, files in keyword_files.items():
+ if keyword in instruction_lower:
+ return files
+ return []
+
+ # ── Test runner ───────────────────────────────────────────────────────────
+
+ def _run_tests(self) -> tuple[bool, str]:
+ """Run the test suite. Returns (passed, output)."""
+ try:
+ result = subprocess.run(
+ [sys.executable, "-m", "pytest", "tests/", "-q", "--tb=short"],
+ capture_output=True,
+ text=True,
+ cwd=self._repo_path,
+ timeout=120,
+ )
+ output = (result.stdout + result.stderr).strip()
+ return result.returncode == 0, output
+ except subprocess.TimeoutExpired:
+ return False, "Tests timed out after 120s"
+ except Exception as exc:
+ return False, f"Failed to run tests: {exc}"
+
+ # ── Multi-backend LLM ─────────────────────────────────────────────────────
+
+ def _resolve_backend(self) -> str:
+ """Resolve 'auto' backend to a concrete one."""
+ if self._backend == "auto":
+ api_key = os.environ.get("ANTHROPIC_API_KEY", "")
+ if api_key:
+ return "anthropic"
+ return "ollama"
+ return self._backend
+
+ def _call_llm(self, prompt: str) -> str:
+ """Route a prompt to the configured LLM backend. Returns raw text."""
+ backend = self._resolve_backend()
+
+ if backend == "anthropic":
+ return self._call_anthropic(prompt)
+ else:
+ return self._call_ollama(prompt)
+
+ def _call_anthropic(self, prompt: str) -> str:
+ """Call Claude via the Anthropic SDK."""
+ import anthropic
+
+ api_key = os.environ.get("ANTHROPIC_API_KEY", "")
+ if not api_key:
+ raise RuntimeError("ANTHROPIC_API_KEY not set — cannot use anthropic backend")
+
+ client = anthropic.Anthropic(api_key=api_key)
+ message = client.messages.create(
+ model="claude-sonnet-4-20250514",
+ max_tokens=4096,
+ messages=[{"role": "user", "content": prompt}],
+ )
+ return message.content[0].text
+
+ def _call_ollama(self, prompt: str) -> str:
+ """Call the local Ollama instance via Agno."""
+ from agno.agent import Agent
+ from agno.models.ollama import Ollama
+
+ agent = Agent(
+ name="SelfModify",
+ model=Ollama(id=settings.ollama_model, host=settings.ollama_url),
+ markdown=False,
+ )
+ run_result = agent.run(prompt, stream=False)
+ return run_result.content if hasattr(run_result, "content") else str(run_result)
+
+ # ── LLM interaction ───────────────────────────────────────────────────────
+
+ def _generate_edits(
+ self,
+ instruction: str,
+ file_contents: dict[str, str],
+ prev_test_output: Optional[str] = None,
+ prev_syntax_errors: Optional[dict[str, str]] = None,
+ ) -> tuple[dict[str, str], str]:
+ """Ask the LLM to generate file edits.
+
+ Returns (edits_dict, raw_llm_response).
+ """
+ # Build the prompt
+ files_block = ""
+ for fp, content in file_contents.items():
+ files_block += f"\n\n{content}\n\n"
+
+ retry_context = ""
+ if prev_test_output:
+ retry_context += f"""
+PREVIOUS ATTEMPT FAILED with test errors:
+
+{prev_test_output[:2000]}
+
+Fix the issues shown above.
+"""
+ if prev_syntax_errors:
+ errors_text = "\n".join(f" {fp}: {err}" for fp, err in prev_syntax_errors.items())
+ retry_context += f"""
+PREVIOUS ATTEMPT HAD SYNTAX ERRORS (code was rejected before writing):
+{errors_text}
+
+You MUST produce syntactically valid Python. Run through the code mentally
+and make sure all strings are properly terminated, all indentation is correct,
+and there are no invalid characters outside of string literals.
+"""
+
+ prompt = f"""You are a precise code modification agent. Edit source files according to the instruction.
+
+INSTRUCTION: {instruction}
+
+CURRENT FILES:
+{files_block}
+{retry_context}
+OUTPUT FORMAT — wrap each modified file like this:
+
+
+complete file content here
+
+
+CRITICAL RULES:
+- Output the COMPLETE file content, not just changed lines
+- Keep ALL existing functionality unless told to remove it
+- The output must be syntactically valid Python — verify mentally before outputting
+- Preserve all special characters (unicode, em-dashes, etc.) exactly as they appear in the original
+- Do NOT wrap the file content in triple-quotes or markdown code fences
+- Do NOT start the file content with \"\"\" — that would turn the code into a string literal
+- Follow the existing code style
+
+Generate the modified files now:"""
+
+ raw = self._call_llm(prompt)
+
+ # Parse ... blocks
+ edits = {}
+ xml_re = re.compile(
+ r'\n?(.*?)',
+ re.DOTALL,
+ )
+ for match in xml_re.finditer(raw):
+ filepath = match.group(1).strip()
+ content = match.group(2)
+ # Strip trailing whitespace but keep a final newline
+ content = content.rstrip() + "\n"
+ edits[filepath] = content
+
+ # Fallback: try the old delimiter format
+ if not edits:
+ for match in _FILE_BLOCK_RE.finditer(raw):
+ filepath = match.group(1).strip()
+ content = match.group(2).rstrip() + "\n"
+ edits[filepath] = content
+
+ # Last resort: single file + code block
+ if not edits and len(file_contents) == 1:
+ only_path = next(iter(file_contents))
+ code_match = re.search(r"```(?:python)?\n(.*?)```", raw, re.DOTALL)
+ if code_match:
+ edits[only_path] = code_match.group(1).rstrip() + "\n"
+
+ return edits, raw
diff --git a/src/swarm/tool_executor.py b/src/swarm/tool_executor.py
index f0839f76..37fc64c0 100644
--- a/src/swarm/tool_executor.py
+++ b/src/swarm/tool_executor.py
@@ -276,22 +276,55 @@ Response:"""
class DirectToolExecutor(ToolExecutor):
"""Tool executor that actually calls tools directly.
-
- This is a more advanced version that actually executes the tools
- rather than just simulating. Use with caution - it has real side effects.
-
- Currently WIP - for future implementation.
+
+ For code-modification tasks assigned to the Forge persona, dispatches
+ to the SelfModifyLoop for real edit → test → commit execution.
+ Other tasks fall back to the simulated parent.
"""
-
+
+ _CODE_KEYWORDS = frozenset({
+ "modify", "edit", "fix", "refactor", "implement",
+ "add function", "change code", "update source", "patch",
+ })
+
def execute_with_tools(self, task_description: str) -> dict[str, Any]:
- """Actually execute tools to complete the task.
-
- This would involve:
- 1. Parsing the task into tool calls
- 2. Executing each tool
- 3. Handling results and errors
- 4. Potentially iterating based on results
+ """Execute tools to complete the task.
+
+ Code-modification tasks on the Forge persona are routed through
+ the SelfModifyLoop. Everything else delegates to the parent.
"""
- # Future: Implement ReAct pattern or similar
- # For now, just delegate to parent
+ task_lower = task_description.lower()
+ is_code_task = any(kw in task_lower for kw in self._CODE_KEYWORDS)
+
+ if is_code_task and self._persona_id == "forge":
+ try:
+ from config import settings as cfg
+ if not cfg.self_modify_enabled:
+ return self.execute_task(task_description)
+
+ from self_modify.loop import SelfModifyLoop, ModifyRequest
+
+ loop = SelfModifyLoop()
+ result = loop.run(ModifyRequest(instruction=task_description))
+
+ return {
+ "success": result.success,
+ "result": (
+ f"Modified {len(result.files_changed)} file(s). "
+ f"Tests {'passed' if result.test_passed else 'failed'}."
+ ),
+ "tools_used": ["read_file", "write_file", "shell", "git_commit"],
+ "persona_id": self._persona_id,
+ "agent_id": self._agent_id,
+ "commit_sha": result.commit_sha,
+ }
+ except Exception as exc:
+ logger.exception("Direct tool execution failed")
+ return {
+ "success": False,
+ "error": str(exc),
+ "result": None,
+ "tools_used": [],
+ }
+
return self.execute_task(task_description)
diff --git a/src/voice/nlu.py b/src/voice/nlu.py
index 26990dbe..2e9b5354 100644
--- a/src/voice/nlu.py
+++ b/src/voice/nlu.py
@@ -11,6 +11,7 @@ Intents:
- task: Task creation/management
- help: Request help or list commands
- voice: Voice settings (volume, rate, etc.)
+ - code: Code modification / self-modify commands
- unknown: Unrecognized intent
"""
@@ -62,6 +63,14 @@ _PATTERNS: list[tuple[str, re.Pattern, float]] = [
r"\b(voice|speak|volume|rate|speed|louder|quieter|faster|slower|mute|unmute)\b",
re.IGNORECASE,
), 0.85),
+
+ # Code modification / self-modify
+ ("code", re.compile(
+ r"\b(modify|edit|change|update|fix|refactor|implement|patch)\s+(the\s+)?(code|file|function|class|module|source)\b"
+ r"|\bself[- ]?modify\b"
+ r"|\b(update|change|edit)\s+(your|the)\s+(code|source)\b",
+ re.IGNORECASE,
+ ), 0.9),
]
# Keywords for entity extraction
@@ -69,6 +78,7 @@ _ENTITY_PATTERNS = {
"agent_name": re.compile(r"(?:spawn|start)\s+(?:agent\s+)?(\w+)|(?:agent)\s+(\w+)", re.IGNORECASE),
"task_description": re.compile(r"(?:task|assign)[:;]?\s+(.+)", re.IGNORECASE),
"number": re.compile(r"\b(\d+)\b"),
+ "target_file": re.compile(r"(?:in|file|modify)\s+(?:the\s+)?([/\w._-]+\.py)", re.IGNORECASE),
}
diff --git a/tests/test_self_modify.py b/tests/test_self_modify.py
new file mode 100644
index 00000000..177941e3
--- /dev/null
+++ b/tests/test_self_modify.py
@@ -0,0 +1,450 @@
+"""Tests for the self-modification loop (self_modify/loop.py).
+
+All tests are fully mocked — no Ollama, no real file I/O, no git.
+"""
+
+from unittest.mock import MagicMock, patch
+from pathlib import Path
+
+import pytest
+
+from self_modify.loop import SelfModifyLoop, ModifyRequest, ModifyResult
+
+
+# ── Dataclass tests ───────────────────────────────────────────────────────────
+
+
+class TestModifyRequest:
+ def test_defaults(self):
+ req = ModifyRequest(instruction="Fix the bug")
+ assert req.instruction == "Fix the bug"
+ assert req.target_files == []
+ assert req.dry_run is False
+
+ def test_with_target_files(self):
+ req = ModifyRequest(
+ instruction="Add docstring",
+ target_files=["src/foo.py"],
+ dry_run=True,
+ )
+ assert req.target_files == ["src/foo.py"]
+ assert req.dry_run is True
+
+
+class TestModifyResult:
+ def test_success_result(self):
+ result = ModifyResult(
+ success=True,
+ files_changed=["src/foo.py"],
+ test_passed=True,
+ commit_sha="abc12345",
+ branch_name="timmy/self-modify-123",
+ llm_response="...",
+ attempts=1,
+ )
+ assert result.success
+ assert result.commit_sha == "abc12345"
+ assert result.error is None
+ assert result.autonomous_cycles == 0
+
+ def test_failure_result(self):
+ result = ModifyResult(success=False, error="something broke")
+ assert not result.success
+ assert result.error == "something broke"
+ assert result.files_changed == []
+
+
+# ── SelfModifyLoop unit tests ────────────────────────────────────────────────
+
+
+class TestSelfModifyLoop:
+ def test_init_defaults(self):
+ loop = SelfModifyLoop()
+ assert loop._max_retries == 2
+
+ def test_init_custom_retries(self):
+ loop = SelfModifyLoop(max_retries=5)
+ assert loop._max_retries == 5
+
+ def test_init_backend(self):
+ loop = SelfModifyLoop(backend="anthropic")
+ assert loop._backend == "anthropic"
+
+ def test_init_autonomous(self):
+ loop = SelfModifyLoop(autonomous=True, max_autonomous_cycles=5)
+ assert loop._autonomous is True
+ assert loop._max_autonomous_cycles == 5
+
+ @patch("self_modify.loop.settings")
+ def test_run_disabled(self, mock_settings):
+ mock_settings.self_modify_enabled = False
+ loop = SelfModifyLoop()
+ result = loop.run(ModifyRequest(instruction="test"))
+ assert not result.success
+ assert "disabled" in result.error.lower()
+
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_run_no_target_files(self, mock_settings):
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 0
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+ loop = SelfModifyLoop()
+ loop._infer_target_files = MagicMock(return_value=[])
+ result = loop.run(ModifyRequest(instruction="do something vague"))
+ assert not result.success
+ assert "no target files" in result.error.lower()
+
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_run_success_path(self, mock_settings):
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 2
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+
+ loop = SelfModifyLoop()
+ loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
+ loop._generate_edits = MagicMock(
+ return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
+ )
+ loop._write_files = MagicMock(return_value=["src/foo.py"])
+ loop._run_tests = MagicMock(return_value=(True, "5 passed"))
+ loop._git_commit = MagicMock(return_value="abc12345")
+ loop._validate_paths = MagicMock()
+
+ result = loop.run(
+ ModifyRequest(instruction="Add docstring", target_files=["src/foo.py"])
+ )
+
+ assert result.success
+ assert result.test_passed
+ assert result.commit_sha == "abc12345"
+ assert result.files_changed == ["src/foo.py"]
+ loop._run_tests.assert_called_once()
+ loop._git_commit.assert_called_once()
+
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_run_test_failure_reverts(self, mock_settings):
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 0
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+
+ loop = SelfModifyLoop(max_retries=0)
+ loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
+ loop._generate_edits = MagicMock(
+ return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
+ )
+ loop._write_files = MagicMock(return_value=["src/foo.py"])
+ loop._run_tests = MagicMock(return_value=(False, "1 failed"))
+ loop._revert_files = MagicMock()
+ loop._validate_paths = MagicMock()
+
+ result = loop.run(
+ ModifyRequest(instruction="Break it", target_files=["src/foo.py"])
+ )
+
+ assert not result.success
+ assert not result.test_passed
+ loop._revert_files.assert_called()
+
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_dry_run(self, mock_settings):
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 2
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+
+ loop = SelfModifyLoop()
+ loop._read_files = MagicMock(return_value={"src/foo.py": "old content"})
+ loop._generate_edits = MagicMock(
+ return_value=({"src/foo.py": "x = 1\n"}, "llm raw")
+ )
+ loop._validate_paths = MagicMock()
+
+ result = loop.run(
+ ModifyRequest(
+ instruction="Add docstring",
+ target_files=["src/foo.py"],
+ dry_run=True,
+ )
+ )
+
+ assert result.success
+ assert result.files_changed == ["src/foo.py"]
+
+
+# ── Syntax validation tests ─────────────────────────────────────────────────
+
+
+class TestSyntaxValidation:
+ def test_valid_python_passes(self):
+ loop = SelfModifyLoop()
+ errors = loop._validate_syntax({"src/foo.py": "x = 1\nprint(x)\n"})
+ assert errors == {}
+
+ def test_invalid_python_caught(self):
+ loop = SelfModifyLoop()
+ errors = loop._validate_syntax({"src/foo.py": "def foo(\n"})
+ assert "src/foo.py" in errors
+ assert "line" in errors["src/foo.py"]
+
+ def test_unterminated_string_caught(self):
+ loop = SelfModifyLoop()
+ bad_code = '"""\nTIMMY = """\nstuff\n"""\n'
+ errors = loop._validate_syntax({"src/foo.py": bad_code})
+ # This specific code is actually valid, but let's test truly broken code
+ broken = '"""\nunclosed string\n'
+ errors = loop._validate_syntax({"src/foo.py": broken})
+ assert "src/foo.py" in errors
+
+ def test_non_python_files_skipped(self):
+ loop = SelfModifyLoop()
+ errors = loop._validate_syntax({"README.md": "this is not python {{{}"})
+ assert errors == {}
+
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_syntax_error_skips_write(self, mock_settings):
+ """When LLM produces invalid syntax, we skip writing and retry."""
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 1
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+
+ loop = SelfModifyLoop(max_retries=1)
+ loop._read_files = MagicMock(return_value={"src/foo.py": "x = 1\n"})
+ # First call returns broken syntax, second returns valid
+ loop._generate_edits = MagicMock(side_effect=[
+ ({"src/foo.py": "def foo(\n"}, "bad llm"),
+ ({"src/foo.py": "def foo():\n pass\n"}, "good llm"),
+ ])
+ loop._write_files = MagicMock(return_value=["src/foo.py"])
+ loop._run_tests = MagicMock(return_value=(True, "passed"))
+ loop._git_commit = MagicMock(return_value="abc123")
+ loop._validate_paths = MagicMock()
+
+ result = loop.run(
+ ModifyRequest(instruction="Fix foo", target_files=["src/foo.py"])
+ )
+
+ assert result.success
+ # _write_files should only be called once (for the valid attempt)
+ loop._write_files.assert_called_once()
+
+
+# ── Multi-backend tests ──────────────────────────────────────────────────────
+
+
+class TestBackendResolution:
+ def test_resolve_ollama(self):
+ loop = SelfModifyLoop(backend="ollama")
+ assert loop._resolve_backend() == "ollama"
+
+ def test_resolve_anthropic(self):
+ loop = SelfModifyLoop(backend="anthropic")
+ assert loop._resolve_backend() == "anthropic"
+
+ @patch.dict("os.environ", {"ANTHROPIC_API_KEY": "sk-test-123"})
+ def test_resolve_auto_with_key(self):
+ loop = SelfModifyLoop(backend="auto")
+ assert loop._resolve_backend() == "anthropic"
+
+ @patch.dict("os.environ", {}, clear=True)
+ def test_resolve_auto_without_key(self):
+ loop = SelfModifyLoop(backend="auto")
+ assert loop._resolve_backend() == "ollama"
+
+
+# ── Autonomous loop tests ────────────────────────────────────────────────────
+
+
+class TestAutonomousLoop:
+ @patch("self_modify.loop.os.environ", {"SELF_MODIFY_SKIP_BRANCH": "1"})
+ @patch("self_modify.loop.settings")
+ def test_autonomous_retries_after_failure(self, mock_settings):
+ mock_settings.self_modify_enabled = True
+ mock_settings.self_modify_max_retries = 0
+ mock_settings.self_modify_allowed_dirs = "src,tests"
+ mock_settings.self_modify_backend = "ollama"
+
+ loop = SelfModifyLoop(max_retries=0, autonomous=True, max_autonomous_cycles=2)
+ loop._validate_paths = MagicMock()
+ loop._read_files = MagicMock(return_value={"src/foo.py": "x = 1\n"})
+
+ # First run fails, autonomous cycle 1 succeeds
+ call_count = [0]
+
+ def fake_generate(instruction, contents, prev_test_output=None, prev_syntax_errors=None):
+ call_count[0] += 1
+ return ({"src/foo.py": "x = 2\n"}, "llm raw")
+
+ loop._generate_edits = MagicMock(side_effect=fake_generate)
+ loop._write_files = MagicMock(return_value=["src/foo.py"])
+ loop._revert_files = MagicMock()
+
+ # First call fails tests, second succeeds
+ test_results = [(False, "FAILED"), (True, "PASSED")]
+ loop._run_tests = MagicMock(side_effect=test_results)
+ loop._git_commit = MagicMock(return_value="abc123")
+ loop._diagnose_failure = MagicMock(return_value="Fix: do X instead of Y")
+
+ result = loop.run(
+ ModifyRequest(instruction="Fix foo", target_files=["src/foo.py"])
+ )
+
+ assert result.success
+ assert result.autonomous_cycles == 1
+ loop._diagnose_failure.assert_called_once()
+
+ def test_diagnose_failure_reads_report(self, tmp_path):
+ report = tmp_path / "report.md"
+ report.write_text("# Report\n**Error:** SyntaxError line 5\n")
+
+ loop = SelfModifyLoop(backend="ollama")
+ loop._call_llm = MagicMock(return_value="ROOT CAUSE: Missing closing paren")
+
+ diagnosis = loop._diagnose_failure(report)
+ assert "Missing closing paren" in diagnosis
+ loop._call_llm.assert_called_once()
+
+ def test_diagnose_failure_handles_missing_report(self, tmp_path):
+ loop = SelfModifyLoop(backend="ollama")
+ result = loop._diagnose_failure(tmp_path / "nonexistent.md")
+ assert result is None
+
+
+# ── Path validation tests ─────────────────────────────────────────────────────
+
+
+class TestPathValidation:
+ def test_rejects_path_outside_repo(self):
+ loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
+ with pytest.raises(ValueError, match="escapes repository"):
+ loop._validate_paths(["../../etc/passwd"])
+
+ def test_rejects_path_outside_allowed_dirs(self):
+ loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
+ with pytest.raises(ValueError, match="not in allowed directories"):
+ loop._validate_paths(["docs/secret.py"])
+
+ def test_accepts_src_path(self):
+ loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
+ loop._validate_paths(["src/some_module.py"])
+
+ def test_accepts_tests_path(self):
+ loop = SelfModifyLoop(repo_path=Path("/tmp/test-repo"))
+ loop._validate_paths(["tests/test_something.py"])
+
+
+# ── File inference tests ──────────────────────────────────────────────────────
+
+
+class TestFileInference:
+ def test_infer_explicit_py_path(self):
+ loop = SelfModifyLoop()
+ files = loop._infer_target_files("fix bug in src/dashboard/app.py")
+ assert "src/dashboard/app.py" in files
+
+ def test_infer_from_keyword_config(self):
+ loop = SelfModifyLoop()
+ files = loop._infer_target_files("update the config to add a new setting")
+ assert "src/config.py" in files
+
+ def test_infer_from_keyword_agent(self):
+ loop = SelfModifyLoop()
+ files = loop._infer_target_files("modify the agent prompt")
+ assert "src/timmy/agent.py" in files
+
+ def test_infer_returns_empty_for_vague(self):
+ loop = SelfModifyLoop()
+ files = loop._infer_target_files("do something cool")
+ assert files == []
+
+
+# ── NLU intent tests ──────────────────────────────────────────────────────────
+
+
+class TestCodeIntent:
+ def test_detects_modify_code(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("modify the code in config.py")
+ assert intent.name == "code"
+
+ def test_detects_self_modify(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("self-modify to add a new endpoint")
+ assert intent.name == "code"
+
+ def test_detects_edit_source(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("edit the source to fix the bug")
+ assert intent.name == "code"
+
+ def test_detects_update_your_code(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("update your code to handle errors")
+ assert intent.name == "code"
+
+ def test_detects_fix_function(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("fix the function that calculates totals")
+ assert intent.name == "code"
+
+ def test_does_not_match_general_chat(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("tell me about the weather today")
+ assert intent.name == "chat"
+
+ def test_extracts_target_file_entity(self):
+ from voice.nlu import detect_intent
+
+ intent = detect_intent("modify file src/config.py to add debug flag")
+ assert intent.entities.get("target_file") == "src/config.py"
+
+
+# ── Route tests ───────────────────────────────────────────────────────────────
+
+
+class TestSelfModifyRoutes:
+ def test_status_endpoint(self, client):
+ resp = client.get("/self-modify/status")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert "enabled" in data
+ assert data["enabled"] is False # Default
+
+ def test_run_when_disabled(self, client):
+ resp = client.post("/self-modify/run", data={"instruction": "test"})
+ assert resp.status_code == 403
+
+
+# ── DirectToolExecutor integration ────────────────────────────────────────────
+
+
+class TestDirectToolExecutor:
+ def test_code_task_falls_back_when_disabled(self):
+ from swarm.tool_executor import DirectToolExecutor
+
+ executor = DirectToolExecutor("forge", "forge-test-001")
+ result = executor.execute_with_tools("modify the code to fix bug")
+ # Should fall back to simulated since self_modify_enabled=False
+ assert isinstance(result, dict)
+ assert "result" in result or "success" in result
+
+ def test_non_code_task_delegates_to_parent(self):
+ from swarm.tool_executor import DirectToolExecutor
+
+ executor = DirectToolExecutor("echo", "echo-test-001")
+ result = executor.execute_with_tools("search for information")
+ assert isinstance(result, dict)