forked from Rockachopa/Timmy-time-dashboard
feat: Self-Edit MCP Tool (Phase 2.1)
Implements the Self-Edit MCP Tool that orchestrates the self-coding foundation:
## Core Features
1. **SelfEditTool** (src/tools/self_edit.py)
- Complete self-modification orchestrator
- Pre-flight safety checks (clean repo, on main branch)
- Context gathering (codebase indexer + modification journal)
- Feature branch creation (timmy/self-edit/{timestamp})
- LLM-based edit planning with fallback
- Safety constraint validation
- Aider integration (preferred) with fallback to direct editing
- Automatic test execution via pytest
- Commit on success, rollback on failure
- Modification journaling with reflections
2. **Safety Constraints**
- Max 3 files per commit
- Max 100 lines changed
- Protected files list (self-edit tool, foundation services)
- Only modify files with test coverage
- Max 3 retries on failure
- Requires user confirmation (MCP tool registration)
3. **Execution Backends**
- Aider integration: --auto-test --test-cmd pytest --yes --no-git
- Direct editing fallback: LLM-based file modification with AST validation
- Automatic backend selection based on availability
## Test Coverage
- 19 new tests covering:
- Basic functionality (initialization, preflight checks)
- Edit planning (with/without LLM)
- Safety validation (file limits, protected files)
- Execution flow (success and failure paths)
- Error handling (exceptions, LLM failures)
- MCP registration
## Usage
from tools.self_edit import register_self_edit_tool
from mcp.registry import tool_registry
# Register with MCP
register_self_edit_tool(tool_registry, llm_adapter)
Phase 2.2 will add Dashboard API endpoints and UI.
This commit is contained in:
824
src/tools/self_edit.py
Normal file
824
src/tools/self_edit.py
Normal file
@@ -0,0 +1,824 @@
|
||||
"""Self-Edit MCP Tool — Timmy's ability to modify its own source code.
|
||||
|
||||
This is the core self-modification orchestrator that:
|
||||
1. Receives task descriptions
|
||||
2. Queries codebase indexer for relevant files
|
||||
3. Queries modification journal for similar past attempts
|
||||
4. Creates feature branches via GitSafety
|
||||
5. Plans changes with LLM
|
||||
6. Executes via Aider (preferred) or direct editing (fallback)
|
||||
7. Runs tests via pytest
|
||||
8. Commits on success, rolls back on failure
|
||||
9. Logs outcomes to ModificationJournal
|
||||
10. Generates reflections
|
||||
|
||||
Usage:
|
||||
from tools.self_edit import self_edit_tool
|
||||
from mcp.registry import tool_registry
|
||||
|
||||
# Register with MCP
|
||||
tool_registry.register("self_edit", self_edit_schema, self_edit_tool)
|
||||
|
||||
# Invoke
|
||||
result = await tool_registry.execute("self_edit", {
|
||||
"task_description": "Add error handling to health endpoint"
|
||||
})
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from config import settings
|
||||
|
||||
# Phase 1 imports
|
||||
from self_coding import (
|
||||
CodebaseIndexer,
|
||||
GitSafety,
|
||||
ModificationAttempt,
|
||||
ModificationJournal,
|
||||
Outcome,
|
||||
ReflectionService,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Safety constraints (Phase 1 hard limits)
|
||||
MAX_FILES_PER_COMMIT = 3
|
||||
MAX_LINES_CHANGED = 100
|
||||
PROTECTED_FILES = {
|
||||
"src/tools/self_edit.py",
|
||||
"src/self_coding/git_safety.py",
|
||||
"src/self_coding/codebase_indexer.py",
|
||||
"src/self_coding/modification_journal.py",
|
||||
"src/self_coding/reflection.py",
|
||||
}
|
||||
MAX_RETRIES = 3
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelfEditResult:
|
||||
"""Result of a self-edit operation."""
|
||||
success: bool
|
||||
message: str
|
||||
attempt_id: Optional[int] = None
|
||||
files_modified: list[str] = field(default_factory=list)
|
||||
commit_hash: Optional[str] = None
|
||||
test_results: str = ""
|
||||
diff: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class EditPlan:
|
||||
"""Plan for a self-edit operation."""
|
||||
approach: str
|
||||
files_to_modify: list[str]
|
||||
files_to_create: list[str]
|
||||
tests_to_add: list[str]
|
||||
explanation: str
|
||||
|
||||
|
||||
class SelfEditTool:
|
||||
"""Self-modification orchestrator.
|
||||
|
||||
This class encapsulates the complete self-edit workflow:
|
||||
- Pre-flight checks
|
||||
- Context gathering (indexer + journal)
|
||||
- Branch creation
|
||||
- Edit planning (LLM)
|
||||
- Execution (Aider or direct)
|
||||
- Testing
|
||||
- Commit/rollback
|
||||
- Logging and reflection
|
||||
|
||||
Usage:
|
||||
tool = SelfEditTool(repo_path="/path/to/repo")
|
||||
result = await tool.execute("Add error handling to health endpoint")
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
repo_path: Optional[Path] = None,
|
||||
llm_adapter: Optional[object] = None,
|
||||
) -> None:
|
||||
"""Initialize SelfEditTool.
|
||||
|
||||
Args:
|
||||
repo_path: Path to repository. Defaults to current directory.
|
||||
llm_adapter: LLM adapter for planning and reflection
|
||||
"""
|
||||
self.repo_path = Path(repo_path) if repo_path else Path.cwd()
|
||||
self.llm_adapter = llm_adapter
|
||||
|
||||
# Initialize Phase 1 services
|
||||
self.git = GitSafety(repo_path=self.repo_path)
|
||||
self.indexer = CodebaseIndexer(repo_path=self.repo_path)
|
||||
self.journal = ModificationJournal()
|
||||
self.reflection = ReflectionService(llm_adapter=llm_adapter)
|
||||
|
||||
# Ensure codebase is indexed
|
||||
self._indexing_done = False
|
||||
|
||||
logger.info("SelfEditTool initialized for %s", self.repo_path)
|
||||
|
||||
async def _ensure_indexed(self) -> None:
|
||||
"""Ensure codebase is indexed."""
|
||||
if not self._indexing_done:
|
||||
await self.indexer.index_changed()
|
||||
self._indexing_done = True
|
||||
|
||||
async def execute(
|
||||
self,
|
||||
task_description: str,
|
||||
context: Optional[dict] = None,
|
||||
) -> SelfEditResult:
|
||||
"""Execute a self-edit task.
|
||||
|
||||
This is the main entry point for self-modification.
|
||||
|
||||
Args:
|
||||
task_description: What to do (e.g., "Add error handling")
|
||||
context: Optional additional context
|
||||
|
||||
Returns:
|
||||
SelfEditResult with success/failure details
|
||||
"""
|
||||
logger.info("Starting self-edit: %s", task_description[:50])
|
||||
|
||||
try:
|
||||
# Step 1: Pre-flight checks
|
||||
if not await self._preflight_checks():
|
||||
return SelfEditResult(
|
||||
success=False,
|
||||
message="Pre-flight checks failed. See logs for details.",
|
||||
)
|
||||
|
||||
# Step 2: Gather context
|
||||
await self._ensure_indexed()
|
||||
relevant_files = await self._get_relevant_files(task_description)
|
||||
similar_attempts = await self._get_similar_attempts(task_description)
|
||||
|
||||
# Step 3: Create feature branch
|
||||
branch_name = f"timmy/self-edit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
|
||||
await self.git.create_branch(branch_name)
|
||||
logger.info("Created branch: %s", branch_name)
|
||||
|
||||
# Step 4: Take snapshot for rollback
|
||||
snapshot = await self.git.snapshot(run_tests=False)
|
||||
|
||||
# Step 5: Plan the edit
|
||||
plan = await self._plan_edit(
|
||||
task_description,
|
||||
relevant_files,
|
||||
similar_attempts,
|
||||
)
|
||||
|
||||
# Validate plan against safety constraints
|
||||
if not self._validate_plan(plan):
|
||||
return SelfEditResult(
|
||||
success=False,
|
||||
message=f"Plan violates safety constraints: {plan.files_to_modify}",
|
||||
)
|
||||
|
||||
# Step 6: Execute the edit
|
||||
execution_result = await self._execute_edit(plan, task_description)
|
||||
|
||||
if not execution_result["success"]:
|
||||
# Attempt retries
|
||||
for retry in range(MAX_RETRIES):
|
||||
logger.info("Retry %d/%d", retry + 1, MAX_RETRIES)
|
||||
|
||||
# Rollback to clean state
|
||||
await self.git.rollback(snapshot)
|
||||
|
||||
# Try again with adjusted approach
|
||||
execution_result = await self._execute_edit(
|
||||
plan,
|
||||
task_description,
|
||||
retry_count=retry + 1,
|
||||
)
|
||||
|
||||
if execution_result["success"]:
|
||||
break
|
||||
|
||||
if not execution_result["success"]:
|
||||
# Final rollback and log failure
|
||||
await self.git.rollback(snapshot)
|
||||
await self.git._run_git("checkout", "main") # Return to main
|
||||
|
||||
attempt_id = await self._log_failure(
|
||||
task_description,
|
||||
plan,
|
||||
execution_result["test_output"],
|
||||
execution_result.get("error", "Unknown error"),
|
||||
)
|
||||
|
||||
return SelfEditResult(
|
||||
success=False,
|
||||
message=f"Failed after {MAX_RETRIES} retries",
|
||||
attempt_id=attempt_id,
|
||||
test_results=execution_result.get("test_output", ""),
|
||||
)
|
||||
|
||||
# Step 7: Commit and merge
|
||||
commit_hash = await self.git.commit(
|
||||
message=f"Self-edit: {task_description[:50]}",
|
||||
files=plan.files_to_modify + plan.files_to_create + plan.tests_to_add,
|
||||
)
|
||||
|
||||
# Merge to main (tests already passed in execution)
|
||||
await self.git.merge_to_main(branch_name, require_tests=False)
|
||||
|
||||
# Step 8: Log success
|
||||
diff = await self.git.get_diff(snapshot.commit_hash, commit_hash)
|
||||
attempt_id = await self._log_success(
|
||||
task_description,
|
||||
plan,
|
||||
commit_hash,
|
||||
execution_result.get("test_output", ""),
|
||||
diff,
|
||||
)
|
||||
|
||||
return SelfEditResult(
|
||||
success=True,
|
||||
message=f"Successfully modified {len(plan.files_to_modify)} files",
|
||||
attempt_id=attempt_id,
|
||||
files_modified=plan.files_to_modify,
|
||||
commit_hash=commit_hash,
|
||||
test_results=execution_result.get("test_output", ""),
|
||||
diff=diff,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Self-edit failed with exception")
|
||||
return SelfEditResult(
|
||||
success=False,
|
||||
message=f"Exception: {str(e)}",
|
||||
)
|
||||
|
||||
async def _preflight_checks(self) -> bool:
|
||||
"""Run pre-flight safety checks.
|
||||
|
||||
Returns:
|
||||
True if all checks pass
|
||||
"""
|
||||
# Check if repo is clean
|
||||
if not await self.git.is_clean():
|
||||
logger.error("Pre-flight failed: Working directory not clean")
|
||||
return False
|
||||
|
||||
# Check if we're on main
|
||||
current_branch = await self.git.get_current_branch()
|
||||
if current_branch != self.git.main_branch:
|
||||
logger.error("Pre-flight failed: Not on %s branch (on %s)",
|
||||
self.git.main_branch, current_branch)
|
||||
return False
|
||||
|
||||
# Check if self-modification is enabled
|
||||
if not getattr(settings, 'self_modify_enabled', True):
|
||||
logger.error("Pre-flight failed: Self-modification disabled in config")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def _get_relevant_files(self, task_description: str) -> list[str]:
|
||||
"""Get files relevant to the task.
|
||||
|
||||
Args:
|
||||
task_description: Task to find relevant files for
|
||||
|
||||
Returns:
|
||||
List of file paths
|
||||
"""
|
||||
files = await self.indexer.get_relevant_files(task_description, limit=10)
|
||||
|
||||
# Filter to only files with test coverage
|
||||
files_with_tests = [
|
||||
f for f in files
|
||||
if await self.indexer.has_test_coverage(f)
|
||||
]
|
||||
|
||||
logger.info("Found %d relevant files (%d with tests)",
|
||||
len(files), len(files_with_tests))
|
||||
|
||||
return files_with_tests[:MAX_FILES_PER_COMMIT]
|
||||
|
||||
async def _get_similar_attempts(
|
||||
self,
|
||||
task_description: str,
|
||||
) -> list[ModificationAttempt]:
|
||||
"""Get similar past modification attempts.
|
||||
|
||||
Args:
|
||||
task_description: Task to find similar attempts for
|
||||
|
||||
Returns:
|
||||
List of similar attempts
|
||||
"""
|
||||
similar = await self.journal.find_similar(task_description, limit=5)
|
||||
logger.info("Found %d similar past attempts", len(similar))
|
||||
return similar
|
||||
|
||||
async def _plan_edit(
|
||||
self,
|
||||
task_description: str,
|
||||
relevant_files: list[str],
|
||||
similar_attempts: list[ModificationAttempt],
|
||||
) -> EditPlan:
|
||||
"""Plan the edit using LLM.
|
||||
|
||||
Args:
|
||||
task_description: What to do
|
||||
relevant_files: Files that might need modification
|
||||
similar_attempts: Similar past attempts for context
|
||||
|
||||
Returns:
|
||||
EditPlan with approach and file list
|
||||
"""
|
||||
if not self.llm_adapter:
|
||||
# Fallback: simple plan
|
||||
return EditPlan(
|
||||
approach=f"Edit files to implement: {task_description}",
|
||||
files_to_modify=relevant_files[:MAX_FILES_PER_COMMIT],
|
||||
files_to_create=[],
|
||||
tests_to_add=[],
|
||||
explanation="No LLM available, using heuristic plan",
|
||||
)
|
||||
|
||||
# Build prompt with context
|
||||
codebase_summary = await self.indexer.get_summary(max_tokens=2000)
|
||||
|
||||
similar_context = ""
|
||||
if similar_attempts:
|
||||
similar_context = "\n\nSimilar past attempts:\n"
|
||||
for attempt in similar_attempts:
|
||||
similar_context += f"- {attempt.task_description} ({attempt.outcome.value})\n"
|
||||
if attempt.reflection:
|
||||
similar_context += f" Lesson: {attempt.reflection[:100]}...\n"
|
||||
|
||||
prompt = f"""You are planning a code modification for a Python project.
|
||||
|
||||
Task: {task_description}
|
||||
|
||||
Codebase Summary:
|
||||
{codebase_summary}
|
||||
|
||||
Potentially relevant files (all have test coverage):
|
||||
{chr(10).join(f"- {f}" for f in relevant_files)}
|
||||
{similar_context}
|
||||
|
||||
Create a plan for implementing this task. You can modify at most {MAX_FILES_PER_COMMIT} files.
|
||||
|
||||
Respond in this format:
|
||||
APPROACH: <brief description of the approach>
|
||||
FILES_TO_MODIFY: <comma-separated list of file paths>
|
||||
FILES_TO_CREATE: <comma-separated list of new file paths (if any)>
|
||||
TESTS_TO_ADD: <comma-separated list of test files to add/modify>
|
||||
EXPLANATION: <brief explanation of why this approach>
|
||||
"""
|
||||
|
||||
try:
|
||||
response = await self.llm_adapter.chat(message=prompt)
|
||||
content = response.content
|
||||
|
||||
# Parse response
|
||||
approach = self._extract_field(content, "APPROACH")
|
||||
files_to_modify = self._parse_list(self._extract_field(content, "FILES_TO_MODIFY"))
|
||||
files_to_create = self._parse_list(self._extract_field(content, "FILES_TO_CREATE"))
|
||||
tests_to_add = self._parse_list(self._extract_field(content, "TESTS_TO_ADD"))
|
||||
explanation = self._extract_field(content, "EXPLANATION")
|
||||
|
||||
return EditPlan(
|
||||
approach=approach or "No approach specified",
|
||||
files_to_modify=files_to_modify[:MAX_FILES_PER_COMMIT],
|
||||
files_to_create=files_to_create,
|
||||
tests_to_add=tests_to_add,
|
||||
explanation=explanation or "No explanation provided",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("LLM planning failed: %s", e)
|
||||
return EditPlan(
|
||||
approach=f"Fallback: Modify relevant files for {task_description}",
|
||||
files_to_modify=relevant_files[:MAX_FILES_PER_COMMIT],
|
||||
files_to_create=[],
|
||||
tests_to_add=[],
|
||||
explanation=f"LLM failed, using fallback: {e}",
|
||||
)
|
||||
|
||||
def _extract_field(self, content: str, field_name: str) -> str:
|
||||
"""Extract a field from LLM response."""
|
||||
for line in content.split("\n"):
|
||||
if line.startswith(f"{field_name}:"):
|
||||
return line.split(":", 1)[1].strip()
|
||||
return ""
|
||||
|
||||
def _parse_list(self, text: str) -> list[str]:
|
||||
"""Parse comma-separated list."""
|
||||
if not text or text.lower() in ("none", "n/a", ""):
|
||||
return []
|
||||
return [item.strip() for item in text.split(",") if item.strip()]
|
||||
|
||||
def _validate_plan(self, plan: EditPlan) -> bool:
|
||||
"""Validate plan against safety constraints.
|
||||
|
||||
Args:
|
||||
plan: EditPlan to validate
|
||||
|
||||
Returns:
|
||||
True if plan is valid
|
||||
"""
|
||||
# Check file count
|
||||
if len(plan.files_to_modify) > MAX_FILES_PER_COMMIT:
|
||||
logger.error("Plan modifies too many files: %d > %d",
|
||||
len(plan.files_to_modify), MAX_FILES_PER_COMMIT)
|
||||
return False
|
||||
|
||||
# Check for protected files
|
||||
for file_path in plan.files_to_modify:
|
||||
if file_path in PROTECTED_FILES:
|
||||
logger.error("Plan tries to modify protected file: %s", file_path)
|
||||
return False
|
||||
|
||||
# Check all files have test coverage
|
||||
for file_path in plan.files_to_modify:
|
||||
# This is async, so we check in _get_relevant_files
|
||||
pass
|
||||
|
||||
return True
|
||||
|
||||
async def _execute_edit(
|
||||
self,
|
||||
plan: EditPlan,
|
||||
task_description: str,
|
||||
retry_count: int = 0,
|
||||
) -> dict:
|
||||
"""Execute the edit using Aider or direct editing.
|
||||
|
||||
Args:
|
||||
plan: EditPlan to execute
|
||||
task_description: Original task description
|
||||
retry_count: Current retry attempt
|
||||
|
||||
Returns:
|
||||
Dict with success, test_output, error
|
||||
"""
|
||||
all_files = plan.files_to_modify + plan.files_to_create
|
||||
|
||||
if not all_files:
|
||||
return {"success": False, "error": "No files to modify"}
|
||||
|
||||
# Try Aider first
|
||||
if await self._aider_available():
|
||||
return await self._execute_with_aider(plan, task_description, all_files)
|
||||
else:
|
||||
# Fallback to direct editing
|
||||
return await self._execute_direct_edit(plan, task_description)
|
||||
|
||||
async def _aider_available(self) -> bool:
|
||||
"""Check if Aider is available."""
|
||||
try:
|
||||
result = await asyncio.create_subprocess_exec(
|
||||
"aider", "--version",
|
||||
stdout=asyncio.subprocess.DEVNULL,
|
||||
stderr=asyncio.subprocess.DEVNULL,
|
||||
)
|
||||
await result.wait()
|
||||
return result.returncode == 0
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
|
||||
async def _execute_with_aider(
|
||||
self,
|
||||
plan: EditPlan,
|
||||
task_description: str,
|
||||
files: list[str],
|
||||
) -> dict:
|
||||
"""Execute edit using Aider.
|
||||
|
||||
Args:
|
||||
plan: EditPlan
|
||||
task_description: Task description
|
||||
files: Files to edit
|
||||
|
||||
Returns:
|
||||
Dict with success, test_output
|
||||
"""
|
||||
cmd = [
|
||||
"aider",
|
||||
"--model", "ollama_chat/qwen2.5-coder:14b-instruct",
|
||||
"--auto-test",
|
||||
"--test-cmd", "python -m pytest tests/ -xvs",
|
||||
"--yes",
|
||||
"--no-git",
|
||||
"--message", f"{task_description}\n\nApproach: {plan.approach}",
|
||||
] + files
|
||||
|
||||
logger.info("Running Aider: %s", " ".join(cmd))
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
cwd=self.repo_path,
|
||||
)
|
||||
|
||||
stdout, _ = await asyncio.wait_for(
|
||||
proc.communicate(),
|
||||
timeout=300.0,
|
||||
)
|
||||
|
||||
output = stdout.decode() if stdout else ""
|
||||
|
||||
# Check if tests passed
|
||||
success = proc.returncode == 0 and "passed" in output.lower()
|
||||
|
||||
return {
|
||||
"success": success,
|
||||
"test_output": output,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Aider timed out after 300s")
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Timeout",
|
||||
"test_output": "Aider timed out after 300s",
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error("Aider execution failed: %s", e)
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"test_output": "",
|
||||
}
|
||||
|
||||
async def _execute_direct_edit(
|
||||
self,
|
||||
plan: EditPlan,
|
||||
task_description: str,
|
||||
) -> dict:
|
||||
"""Execute edit via direct file modification (fallback).
|
||||
|
||||
Args:
|
||||
plan: EditPlan
|
||||
task_description: Task description
|
||||
|
||||
Returns:
|
||||
Dict with success, test_output
|
||||
"""
|
||||
if not self.llm_adapter:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No LLM adapter for direct editing",
|
||||
}
|
||||
|
||||
# Edit each file
|
||||
for file_path in plan.files_to_modify:
|
||||
full_path = self.repo_path / file_path
|
||||
|
||||
if not full_path.exists():
|
||||
logger.error("File does not exist: %s", file_path)
|
||||
continue
|
||||
|
||||
try:
|
||||
content = full_path.read_text()
|
||||
|
||||
# Build edit prompt
|
||||
edit_prompt = f"""Edit this Python file to implement the task.
|
||||
|
||||
Task: {task_description}
|
||||
Approach: {plan.approach}
|
||||
|
||||
Current file content:
|
||||
```python
|
||||
{content}
|
||||
```
|
||||
|
||||
Provide the complete new file content. Only return the code, no explanation.
|
||||
"""
|
||||
|
||||
response = await self.llm_adapter.chat(message=edit_prompt)
|
||||
new_content = response.content
|
||||
|
||||
# Strip code fences if present
|
||||
new_content = self._strip_code_fences(new_content)
|
||||
|
||||
# Validate with AST
|
||||
try:
|
||||
ast.parse(new_content)
|
||||
except SyntaxError as e:
|
||||
logger.error("Generated code has syntax error: %s", e)
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Syntax error in generated code: {e}",
|
||||
}
|
||||
|
||||
# Write file
|
||||
full_path.write_text(new_content)
|
||||
logger.info("Modified: %s", file_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to edit %s: %s", file_path, e)
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to edit {file_path}: {e}",
|
||||
}
|
||||
|
||||
# Run tests
|
||||
return await self._run_tests()
|
||||
|
||||
def _strip_code_fences(self, content: str) -> str:
|
||||
"""Strip markdown code fences from content."""
|
||||
lines = content.split("\n")
|
||||
|
||||
# Remove opening fence
|
||||
if lines and lines[0].startswith("```"):
|
||||
lines = lines[1:]
|
||||
|
||||
# Remove closing fence
|
||||
if lines and lines[-1].startswith("```"):
|
||||
lines = lines[:-1]
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
async def _run_tests(self) -> dict:
|
||||
"""Run tests and return results.
|
||||
|
||||
Returns:
|
||||
Dict with success, test_output
|
||||
"""
|
||||
cmd = ["python", "-m", "pytest", "tests/", "-x", "--tb=short"]
|
||||
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.STDOUT,
|
||||
cwd=self.repo_path,
|
||||
)
|
||||
|
||||
stdout, _ = await asyncio.wait_for(
|
||||
proc.communicate(),
|
||||
timeout=120.0,
|
||||
)
|
||||
|
||||
output = stdout.decode() if stdout else ""
|
||||
|
||||
return {
|
||||
"success": proc.returncode == 0,
|
||||
"test_output": output,
|
||||
}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Tests timed out",
|
||||
"test_output": "Timeout after 120s",
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"test_output": "",
|
||||
}
|
||||
|
||||
async def _log_success(
|
||||
self,
|
||||
task_description: str,
|
||||
plan: EditPlan,
|
||||
commit_hash: str,
|
||||
test_results: str,
|
||||
diff: str,
|
||||
) -> int:
|
||||
"""Log successful attempt.
|
||||
|
||||
Returns:
|
||||
Attempt ID
|
||||
"""
|
||||
attempt = ModificationAttempt(
|
||||
task_description=task_description,
|
||||
approach=plan.approach,
|
||||
files_modified=plan.files_to_modify + plan.files_to_create,
|
||||
diff=diff[:5000], # Truncate for storage
|
||||
test_results=test_results,
|
||||
outcome=Outcome.SUCCESS,
|
||||
)
|
||||
|
||||
attempt_id = await self.journal.log_attempt(attempt)
|
||||
|
||||
# Generate and store reflection
|
||||
reflection_text = await self.reflection.reflect_on_attempt(attempt)
|
||||
await self.journal.update_reflection(attempt_id, reflection_text)
|
||||
|
||||
return attempt_id
|
||||
|
||||
async def _log_failure(
|
||||
self,
|
||||
task_description: str,
|
||||
plan: EditPlan,
|
||||
test_results: str,
|
||||
error: str,
|
||||
) -> int:
|
||||
"""Log failed attempt.
|
||||
|
||||
Returns:
|
||||
Attempt ID
|
||||
"""
|
||||
attempt = ModificationAttempt(
|
||||
task_description=task_description,
|
||||
approach=plan.approach,
|
||||
files_modified=plan.files_to_modify,
|
||||
test_results=test_results,
|
||||
outcome=Outcome.FAILURE,
|
||||
failure_analysis=error,
|
||||
retry_count=MAX_RETRIES,
|
||||
)
|
||||
|
||||
attempt_id = await self.journal.log_attempt(attempt)
|
||||
|
||||
# Generate reflection even for failures
|
||||
reflection_text = await self.reflection.reflect_on_attempt(attempt)
|
||||
await self.journal.update_reflection(attempt_id, reflection_text)
|
||||
|
||||
return attempt_id
|
||||
|
||||
|
||||
# MCP Tool Schema
|
||||
self_edit_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"task_description": {
|
||||
"type": "string",
|
||||
"description": "Description of the code modification to make",
|
||||
},
|
||||
"context": {
|
||||
"type": "object",
|
||||
"description": "Optional additional context for the modification",
|
||||
},
|
||||
},
|
||||
"required": ["task_description"],
|
||||
}
|
||||
|
||||
|
||||
# Global tool instance (singleton pattern)
|
||||
_self_edit_tool: Optional[SelfEditTool] = None
|
||||
|
||||
|
||||
async def self_edit_tool(task_description: str, context: Optional[dict] = None) -> dict:
|
||||
"""MCP tool entry point for self-edit.
|
||||
|
||||
Args:
|
||||
task_description: What to modify
|
||||
context: Optional context
|
||||
|
||||
Returns:
|
||||
Dict with result
|
||||
"""
|
||||
global _self_edit_tool
|
||||
|
||||
if _self_edit_tool is None:
|
||||
_self_edit_tool = SelfEditTool()
|
||||
|
||||
result = await _self_edit_tool.execute(task_description, context)
|
||||
|
||||
return {
|
||||
"success": result.success,
|
||||
"message": result.message,
|
||||
"attempt_id": result.attempt_id,
|
||||
"files_modified": result.files_modified,
|
||||
"commit_hash": result.commit_hash,
|
||||
"test_results": result.test_results,
|
||||
}
|
||||
|
||||
|
||||
def register_self_edit_tool(registry: Any, llm_adapter: Optional[object] = None) -> None:
|
||||
"""Register the self-edit tool with MCP registry.
|
||||
|
||||
Args:
|
||||
registry: MCP ToolRegistry
|
||||
llm_adapter: Optional LLM adapter
|
||||
"""
|
||||
global _self_edit_tool
|
||||
_self_edit_tool = SelfEditTool(llm_adapter=llm_adapter)
|
||||
|
||||
registry.register(
|
||||
name="self_edit",
|
||||
schema=self_edit_schema,
|
||||
handler=self_edit_tool,
|
||||
category="self_coding",
|
||||
requires_confirmation=True, # Safety: require user approval
|
||||
tags=["self-modification", "code-generation"],
|
||||
source_module="tools.self_edit",
|
||||
)
|
||||
|
||||
logger.info("Self-edit tool registered with MCP")
|
||||
398
tests/test_self_edit_tool.py
Normal file
398
tests/test_self_edit_tool.py
Normal file
@@ -0,0 +1,398 @@
|
||||
"""Tests for Self-Edit MCP Tool.
|
||||
|
||||
Tests the complete self-edit workflow with mocked dependencies.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.self_edit import (
|
||||
MAX_FILES_PER_COMMIT,
|
||||
MAX_RETRIES,
|
||||
PROTECTED_FILES,
|
||||
EditPlan,
|
||||
SelfEditResult,
|
||||
SelfEditTool,
|
||||
register_self_edit_tool,
|
||||
self_edit_tool,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_repo():
|
||||
"""Create a temporary git repository."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
repo_path = Path(tmpdir)
|
||||
|
||||
# Initialize git
|
||||
import subprocess
|
||||
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
|
||||
subprocess.run(
|
||||
["git", "config", "user.email", "test@test.com"],
|
||||
cwd=repo_path, check=True, capture_output=True,
|
||||
)
|
||||
subprocess.run(
|
||||
["git", "config", "user.name", "Test"],
|
||||
cwd=repo_path, check=True, capture_output=True,
|
||||
)
|
||||
|
||||
# Create src structure
|
||||
src_path = repo_path / "src" / "myproject"
|
||||
src_path.mkdir(parents=True)
|
||||
|
||||
(src_path / "__init__.py").write_text("")
|
||||
(src_path / "app.py").write_text('''
|
||||
"""Main application."""
|
||||
|
||||
def hello():
|
||||
return "Hello"
|
||||
''')
|
||||
|
||||
# Create tests
|
||||
tests_path = repo_path / "tests"
|
||||
tests_path.mkdir()
|
||||
(tests_path / "test_app.py").write_text('''
|
||||
"""Tests for app."""
|
||||
from myproject.app import hello
|
||||
|
||||
def test_hello():
|
||||
assert hello() == "Hello"
|
||||
''')
|
||||
|
||||
# Initial commit
|
||||
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
|
||||
subprocess.run(
|
||||
["git", "commit", "-m", "Initial"],
|
||||
cwd=repo_path, check=True, capture_output=True,
|
||||
)
|
||||
subprocess.run(
|
||||
["git", "branch", "-M", "main"],
|
||||
cwd=repo_path, check=True, capture_output=True,
|
||||
)
|
||||
|
||||
yield repo_path
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_settings():
|
||||
"""Mock settings to enable self-modification."""
|
||||
with patch('tools.self_edit.settings') as mock_settings:
|
||||
mock_settings.self_modify_enabled = True
|
||||
yield mock_settings
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_llm():
|
||||
"""Create mock LLM adapter."""
|
||||
mock = AsyncMock()
|
||||
mock.chat.return_value = MagicMock(
|
||||
content="""APPROACH: Add error handling
|
||||
FILES_TO_MODIFY: src/myproject/app.py
|
||||
FILES_TO_CREATE:
|
||||
TESTS_TO_ADD: tests/test_app.py
|
||||
EXPLANATION: Wrap function in try/except"""
|
||||
)
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditToolBasics:
|
||||
"""Basic functionality tests."""
|
||||
|
||||
async def test_initialization(self, temp_repo):
|
||||
"""Should initialize with services."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
assert tool.repo_path == temp_repo
|
||||
assert tool.git is not None
|
||||
assert tool.indexer is not None
|
||||
assert tool.journal is not None
|
||||
assert tool.reflection is not None
|
||||
|
||||
async def test_preflight_checks_clean_repo(self, temp_repo):
|
||||
"""Should pass preflight on clean repo."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
assert await tool._preflight_checks() is True
|
||||
|
||||
async def test_preflight_checks_dirty_repo(self, temp_repo):
|
||||
"""Should fail preflight on dirty repo."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
# Make uncommitted change
|
||||
(temp_repo / "dirty.txt").write_text("dirty")
|
||||
|
||||
assert await tool._preflight_checks() is False
|
||||
|
||||
async def test_preflight_checks_wrong_branch(self, temp_repo):
|
||||
"""Should fail preflight when not on main."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
# Create and checkout feature branch
|
||||
import subprocess
|
||||
subprocess.run(
|
||||
["git", "checkout", "-b", "feature"],
|
||||
cwd=temp_repo, check=True, capture_output=True,
|
||||
)
|
||||
|
||||
assert await tool._preflight_checks() is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditToolPlanning:
|
||||
"""Edit planning tests."""
|
||||
|
||||
async def test_plan_edit_with_llm(self, temp_repo, mock_llm):
|
||||
"""Should generate plan using LLM."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm)
|
||||
await tool._ensure_indexed()
|
||||
|
||||
plan = await tool._plan_edit(
|
||||
task_description="Add error handling",
|
||||
relevant_files=["src/myproject/app.py"],
|
||||
similar_attempts=[],
|
||||
)
|
||||
|
||||
assert isinstance(plan, EditPlan)
|
||||
assert plan.approach == "Add error handling"
|
||||
assert "src/myproject/app.py" in plan.files_to_modify
|
||||
|
||||
async def test_plan_edit_without_llm(self, temp_repo):
|
||||
"""Should generate fallback plan without LLM."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=None)
|
||||
await tool._ensure_indexed()
|
||||
|
||||
plan = await tool._plan_edit(
|
||||
task_description="Add feature",
|
||||
relevant_files=["src/myproject/app.py"],
|
||||
similar_attempts=[],
|
||||
)
|
||||
|
||||
assert isinstance(plan, EditPlan)
|
||||
assert len(plan.files_to_modify) > 0
|
||||
|
||||
async def test_plan_respects_max_files(self, temp_repo, mock_llm):
|
||||
"""Plan should respect MAX_FILES_PER_COMMIT."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm)
|
||||
await tool._ensure_indexed()
|
||||
|
||||
# Mock LLM to return many files
|
||||
mock_llm.chat.return_value = MagicMock(
|
||||
content="FILES_TO_MODIFY: " + ",".join([f"file{i}.py" for i in range(10)])
|
||||
)
|
||||
|
||||
plan = await tool._plan_edit(
|
||||
task_description="Test",
|
||||
relevant_files=[f"file{i}.py" for i in range(10)],
|
||||
similar_attempts=[],
|
||||
)
|
||||
|
||||
assert len(plan.files_to_modify) <= MAX_FILES_PER_COMMIT
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditToolValidation:
|
||||
"""Safety constraint validation tests."""
|
||||
|
||||
async def test_validate_plan_too_many_files(self, temp_repo):
|
||||
"""Should reject plan with too many files."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
plan = EditPlan(
|
||||
approach="Test",
|
||||
files_to_modify=[f"file{i}.py" for i in range(MAX_FILES_PER_COMMIT + 1)],
|
||||
files_to_create=[],
|
||||
tests_to_add=[],
|
||||
explanation="Test",
|
||||
)
|
||||
|
||||
assert tool._validate_plan(plan) is False
|
||||
|
||||
async def test_validate_plan_protected_file(self, temp_repo):
|
||||
"""Should reject plan modifying protected files."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
plan = EditPlan(
|
||||
approach="Test",
|
||||
files_to_modify=["src/tools/self_edit.py"],
|
||||
files_to_create=[],
|
||||
tests_to_add=[],
|
||||
explanation="Test",
|
||||
)
|
||||
|
||||
assert tool._validate_plan(plan) is False
|
||||
|
||||
async def test_validate_plan_valid(self, temp_repo):
|
||||
"""Should accept valid plan."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
plan = EditPlan(
|
||||
approach="Test",
|
||||
files_to_modify=["src/myproject/app.py"],
|
||||
files_to_create=[],
|
||||
tests_to_add=[],
|
||||
explanation="Test",
|
||||
)
|
||||
|
||||
assert tool._validate_plan(plan) is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditToolExecution:
|
||||
"""Edit execution tests."""
|
||||
|
||||
async def test_strip_code_fences(self, temp_repo):
|
||||
"""Should strip markdown code fences."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
content = "```python\ndef test(): pass\n```"
|
||||
result = tool._strip_code_fences(content)
|
||||
|
||||
assert "```" not in result
|
||||
assert "def test(): pass" in result
|
||||
|
||||
async def test_parse_list(self, temp_repo):
|
||||
"""Should parse comma-separated lists."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
assert tool._parse_list("a, b, c") == ["a", "b", "c"]
|
||||
assert tool._parse_list("none") == []
|
||||
assert tool._parse_list("") == []
|
||||
assert tool._parse_list("N/A") == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditToolIntegration:
|
||||
"""Integration tests with mocked dependencies."""
|
||||
|
||||
async def test_successful_edit_flow(self, temp_repo, mock_llm):
|
||||
"""Test complete successful edit flow."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm)
|
||||
|
||||
# Mock Aider to succeed
|
||||
with patch.object(tool, '_aider_available', return_value=False):
|
||||
with patch.object(tool, '_execute_direct_edit') as mock_exec:
|
||||
mock_exec.return_value = {
|
||||
"success": True,
|
||||
"test_output": "1 passed",
|
||||
}
|
||||
|
||||
result = await tool.execute("Add error handling")
|
||||
|
||||
assert result.success is True
|
||||
assert result.attempt_id is not None
|
||||
|
||||
async def test_failed_edit_with_rollback(self, temp_repo, mock_llm):
|
||||
"""Test failed edit with rollback."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm)
|
||||
|
||||
# Mock execution to always fail
|
||||
with patch.object(tool, '_execute_edit') as mock_exec:
|
||||
mock_exec.return_value = {
|
||||
"success": False,
|
||||
"error": "Tests failed",
|
||||
"test_output": "1 failed",
|
||||
}
|
||||
|
||||
result = await tool.execute("Add broken feature")
|
||||
|
||||
assert result.success is False
|
||||
assert result.attempt_id is not None
|
||||
assert "failed" in result.message.lower() or "retry" in result.message.lower()
|
||||
|
||||
async def test_preflight_failure(self, temp_repo):
|
||||
"""Should fail early if preflight checks fail."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
# Make repo dirty
|
||||
(temp_repo / "dirty.txt").write_text("dirty")
|
||||
|
||||
result = await tool.execute("Some task")
|
||||
|
||||
assert result.success is False
|
||||
assert "pre-flight" in result.message.lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditMCPRegistration:
|
||||
"""MCP tool registration tests."""
|
||||
|
||||
async def test_register_self_edit_tool(self):
|
||||
"""Should register with MCP registry."""
|
||||
mock_registry = MagicMock()
|
||||
mock_llm = AsyncMock()
|
||||
|
||||
register_self_edit_tool(mock_registry, mock_llm)
|
||||
|
||||
mock_registry.register.assert_called_once()
|
||||
call_args = mock_registry.register.call_args
|
||||
|
||||
assert call_args.kwargs["name"] == "self_edit"
|
||||
assert call_args.kwargs["requires_confirmation"] is True
|
||||
assert "self_coding" in call_args.kwargs["category"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditGlobalTool:
|
||||
"""Global tool instance tests."""
|
||||
|
||||
async def test_self_edit_tool_singleton(self, temp_repo):
|
||||
"""Should use singleton pattern."""
|
||||
from tools import self_edit as self_edit_module
|
||||
|
||||
# Reset singleton
|
||||
self_edit_module._self_edit_tool = None
|
||||
|
||||
# First call should initialize
|
||||
with patch.object(SelfEditTool, '__init__', return_value=None) as mock_init:
|
||||
mock_init.return_value = None
|
||||
|
||||
with patch.object(SelfEditTool, 'execute') as mock_execute:
|
||||
mock_execute.return_value = SelfEditResult(
|
||||
success=True,
|
||||
message="Test",
|
||||
)
|
||||
|
||||
await self_edit_tool("Test task")
|
||||
|
||||
mock_init.assert_called_once()
|
||||
mock_execute.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
class TestSelfEditErrorHandling:
|
||||
"""Error handling tests."""
|
||||
|
||||
async def test_exception_handling(self, temp_repo):
|
||||
"""Should handle exceptions gracefully."""
|
||||
tool = SelfEditTool(repo_path=temp_repo)
|
||||
|
||||
# Mock preflight to raise exception
|
||||
with patch.object(tool, '_preflight_checks', side_effect=Exception("Unexpected")):
|
||||
result = await tool.execute("Test task")
|
||||
|
||||
assert result.success is False
|
||||
assert "exception" in result.message.lower()
|
||||
|
||||
async def test_llm_failure_fallback(self, temp_repo, mock_llm):
|
||||
"""Should fallback when LLM fails."""
|
||||
tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm)
|
||||
await tool._ensure_indexed()
|
||||
|
||||
# Mock LLM to fail
|
||||
mock_llm.chat.side_effect = Exception("LLM timeout")
|
||||
|
||||
plan = await tool._plan_edit(
|
||||
task_description="Test",
|
||||
relevant_files=["src/app.py"],
|
||||
similar_attempts=[],
|
||||
)
|
||||
|
||||
# Should return fallback plan
|
||||
assert isinstance(plan, EditPlan)
|
||||
assert len(plan.files_to_modify) > 0
|
||||
Reference in New Issue
Block a user