diff --git a/src/tools/self_edit.py b/src/tools/self_edit.py new file mode 100644 index 0000000..a28f314 --- /dev/null +++ b/src/tools/self_edit.py @@ -0,0 +1,824 @@ +"""Self-Edit MCP Tool — Timmy's ability to modify its own source code. + +This is the core self-modification orchestrator that: +1. Receives task descriptions +2. Queries codebase indexer for relevant files +3. Queries modification journal for similar past attempts +4. Creates feature branches via GitSafety +5. Plans changes with LLM +6. Executes via Aider (preferred) or direct editing (fallback) +7. Runs tests via pytest +8. Commits on success, rolls back on failure +9. Logs outcomes to ModificationJournal +10. Generates reflections + +Usage: + from tools.self_edit import self_edit_tool + from mcp.registry import tool_registry + + # Register with MCP + tool_registry.register("self_edit", self_edit_schema, self_edit_tool) + + # Invoke + result = await tool_registry.execute("self_edit", { + "task_description": "Add error handling to health endpoint" + }) +""" + +from __future__ import annotations + +import ast +import asyncio +import logging +import os +import subprocess +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +from config import settings + +# Phase 1 imports +from self_coding import ( + CodebaseIndexer, + GitSafety, + ModificationAttempt, + ModificationJournal, + Outcome, + ReflectionService, +) + +logger = logging.getLogger(__name__) + +# Safety constraints (Phase 1 hard limits) +MAX_FILES_PER_COMMIT = 3 +MAX_LINES_CHANGED = 100 +PROTECTED_FILES = { + "src/tools/self_edit.py", + "src/self_coding/git_safety.py", + "src/self_coding/codebase_indexer.py", + "src/self_coding/modification_journal.py", + "src/self_coding/reflection.py", +} +MAX_RETRIES = 3 + + +@dataclass +class SelfEditResult: + """Result of a self-edit operation.""" + success: bool + message: str + attempt_id: Optional[int] = None + files_modified: list[str] = field(default_factory=list) + commit_hash: Optional[str] = None + test_results: str = "" + diff: str = "" + + +@dataclass +class EditPlan: + """Plan for a self-edit operation.""" + approach: str + files_to_modify: list[str] + files_to_create: list[str] + tests_to_add: list[str] + explanation: str + + +class SelfEditTool: + """Self-modification orchestrator. + + This class encapsulates the complete self-edit workflow: + - Pre-flight checks + - Context gathering (indexer + journal) + - Branch creation + - Edit planning (LLM) + - Execution (Aider or direct) + - Testing + - Commit/rollback + - Logging and reflection + + Usage: + tool = SelfEditTool(repo_path="/path/to/repo") + result = await tool.execute("Add error handling to health endpoint") + """ + + def __init__( + self, + repo_path: Optional[Path] = None, + llm_adapter: Optional[object] = None, + ) -> None: + """Initialize SelfEditTool. + + Args: + repo_path: Path to repository. Defaults to current directory. + llm_adapter: LLM adapter for planning and reflection + """ + self.repo_path = Path(repo_path) if repo_path else Path.cwd() + self.llm_adapter = llm_adapter + + # Initialize Phase 1 services + self.git = GitSafety(repo_path=self.repo_path) + self.indexer = CodebaseIndexer(repo_path=self.repo_path) + self.journal = ModificationJournal() + self.reflection = ReflectionService(llm_adapter=llm_adapter) + + # Ensure codebase is indexed + self._indexing_done = False + + logger.info("SelfEditTool initialized for %s", self.repo_path) + + async def _ensure_indexed(self) -> None: + """Ensure codebase is indexed.""" + if not self._indexing_done: + await self.indexer.index_changed() + self._indexing_done = True + + async def execute( + self, + task_description: str, + context: Optional[dict] = None, + ) -> SelfEditResult: + """Execute a self-edit task. + + This is the main entry point for self-modification. + + Args: + task_description: What to do (e.g., "Add error handling") + context: Optional additional context + + Returns: + SelfEditResult with success/failure details + """ + logger.info("Starting self-edit: %s", task_description[:50]) + + try: + # Step 1: Pre-flight checks + if not await self._preflight_checks(): + return SelfEditResult( + success=False, + message="Pre-flight checks failed. See logs for details.", + ) + + # Step 2: Gather context + await self._ensure_indexed() + relevant_files = await self._get_relevant_files(task_description) + similar_attempts = await self._get_similar_attempts(task_description) + + # Step 3: Create feature branch + branch_name = f"timmy/self-edit/{datetime.now().strftime('%Y%m%d-%H%M%S')}" + await self.git.create_branch(branch_name) + logger.info("Created branch: %s", branch_name) + + # Step 4: Take snapshot for rollback + snapshot = await self.git.snapshot(run_tests=False) + + # Step 5: Plan the edit + plan = await self._plan_edit( + task_description, + relevant_files, + similar_attempts, + ) + + # Validate plan against safety constraints + if not self._validate_plan(plan): + return SelfEditResult( + success=False, + message=f"Plan violates safety constraints: {plan.files_to_modify}", + ) + + # Step 6: Execute the edit + execution_result = await self._execute_edit(plan, task_description) + + if not execution_result["success"]: + # Attempt retries + for retry in range(MAX_RETRIES): + logger.info("Retry %d/%d", retry + 1, MAX_RETRIES) + + # Rollback to clean state + await self.git.rollback(snapshot) + + # Try again with adjusted approach + execution_result = await self._execute_edit( + plan, + task_description, + retry_count=retry + 1, + ) + + if execution_result["success"]: + break + + if not execution_result["success"]: + # Final rollback and log failure + await self.git.rollback(snapshot) + await self.git._run_git("checkout", "main") # Return to main + + attempt_id = await self._log_failure( + task_description, + plan, + execution_result["test_output"], + execution_result.get("error", "Unknown error"), + ) + + return SelfEditResult( + success=False, + message=f"Failed after {MAX_RETRIES} retries", + attempt_id=attempt_id, + test_results=execution_result.get("test_output", ""), + ) + + # Step 7: Commit and merge + commit_hash = await self.git.commit( + message=f"Self-edit: {task_description[:50]}", + files=plan.files_to_modify + plan.files_to_create + plan.tests_to_add, + ) + + # Merge to main (tests already passed in execution) + await self.git.merge_to_main(branch_name, require_tests=False) + + # Step 8: Log success + diff = await self.git.get_diff(snapshot.commit_hash, commit_hash) + attempt_id = await self._log_success( + task_description, + plan, + commit_hash, + execution_result.get("test_output", ""), + diff, + ) + + return SelfEditResult( + success=True, + message=f"Successfully modified {len(plan.files_to_modify)} files", + attempt_id=attempt_id, + files_modified=plan.files_to_modify, + commit_hash=commit_hash, + test_results=execution_result.get("test_output", ""), + diff=diff, + ) + + except Exception as e: + logger.exception("Self-edit failed with exception") + return SelfEditResult( + success=False, + message=f"Exception: {str(e)}", + ) + + async def _preflight_checks(self) -> bool: + """Run pre-flight safety checks. + + Returns: + True if all checks pass + """ + # Check if repo is clean + if not await self.git.is_clean(): + logger.error("Pre-flight failed: Working directory not clean") + return False + + # Check if we're on main + current_branch = await self.git.get_current_branch() + if current_branch != self.git.main_branch: + logger.error("Pre-flight failed: Not on %s branch (on %s)", + self.git.main_branch, current_branch) + return False + + # Check if self-modification is enabled + if not getattr(settings, 'self_modify_enabled', True): + logger.error("Pre-flight failed: Self-modification disabled in config") + return False + + return True + + async def _get_relevant_files(self, task_description: str) -> list[str]: + """Get files relevant to the task. + + Args: + task_description: Task to find relevant files for + + Returns: + List of file paths + """ + files = await self.indexer.get_relevant_files(task_description, limit=10) + + # Filter to only files with test coverage + files_with_tests = [ + f for f in files + if await self.indexer.has_test_coverage(f) + ] + + logger.info("Found %d relevant files (%d with tests)", + len(files), len(files_with_tests)) + + return files_with_tests[:MAX_FILES_PER_COMMIT] + + async def _get_similar_attempts( + self, + task_description: str, + ) -> list[ModificationAttempt]: + """Get similar past modification attempts. + + Args: + task_description: Task to find similar attempts for + + Returns: + List of similar attempts + """ + similar = await self.journal.find_similar(task_description, limit=5) + logger.info("Found %d similar past attempts", len(similar)) + return similar + + async def _plan_edit( + self, + task_description: str, + relevant_files: list[str], + similar_attempts: list[ModificationAttempt], + ) -> EditPlan: + """Plan the edit using LLM. + + Args: + task_description: What to do + relevant_files: Files that might need modification + similar_attempts: Similar past attempts for context + + Returns: + EditPlan with approach and file list + """ + if not self.llm_adapter: + # Fallback: simple plan + return EditPlan( + approach=f"Edit files to implement: {task_description}", + files_to_modify=relevant_files[:MAX_FILES_PER_COMMIT], + files_to_create=[], + tests_to_add=[], + explanation="No LLM available, using heuristic plan", + ) + + # Build prompt with context + codebase_summary = await self.indexer.get_summary(max_tokens=2000) + + similar_context = "" + if similar_attempts: + similar_context = "\n\nSimilar past attempts:\n" + for attempt in similar_attempts: + similar_context += f"- {attempt.task_description} ({attempt.outcome.value})\n" + if attempt.reflection: + similar_context += f" Lesson: {attempt.reflection[:100]}...\n" + + prompt = f"""You are planning a code modification for a Python project. + +Task: {task_description} + +Codebase Summary: +{codebase_summary} + +Potentially relevant files (all have test coverage): +{chr(10).join(f"- {f}" for f in relevant_files)} +{similar_context} + +Create a plan for implementing this task. You can modify at most {MAX_FILES_PER_COMMIT} files. + +Respond in this format: +APPROACH: +FILES_TO_MODIFY: +FILES_TO_CREATE: +TESTS_TO_ADD: +EXPLANATION: +""" + + try: + response = await self.llm_adapter.chat(message=prompt) + content = response.content + + # Parse response + approach = self._extract_field(content, "APPROACH") + files_to_modify = self._parse_list(self._extract_field(content, "FILES_TO_MODIFY")) + files_to_create = self._parse_list(self._extract_field(content, "FILES_TO_CREATE")) + tests_to_add = self._parse_list(self._extract_field(content, "TESTS_TO_ADD")) + explanation = self._extract_field(content, "EXPLANATION") + + return EditPlan( + approach=approach or "No approach specified", + files_to_modify=files_to_modify[:MAX_FILES_PER_COMMIT], + files_to_create=files_to_create, + tests_to_add=tests_to_add, + explanation=explanation or "No explanation provided", + ) + + except Exception as e: + logger.error("LLM planning failed: %s", e) + return EditPlan( + approach=f"Fallback: Modify relevant files for {task_description}", + files_to_modify=relevant_files[:MAX_FILES_PER_COMMIT], + files_to_create=[], + tests_to_add=[], + explanation=f"LLM failed, using fallback: {e}", + ) + + def _extract_field(self, content: str, field_name: str) -> str: + """Extract a field from LLM response.""" + for line in content.split("\n"): + if line.startswith(f"{field_name}:"): + return line.split(":", 1)[1].strip() + return "" + + def _parse_list(self, text: str) -> list[str]: + """Parse comma-separated list.""" + if not text or text.lower() in ("none", "n/a", ""): + return [] + return [item.strip() for item in text.split(",") if item.strip()] + + def _validate_plan(self, plan: EditPlan) -> bool: + """Validate plan against safety constraints. + + Args: + plan: EditPlan to validate + + Returns: + True if plan is valid + """ + # Check file count + if len(plan.files_to_modify) > MAX_FILES_PER_COMMIT: + logger.error("Plan modifies too many files: %d > %d", + len(plan.files_to_modify), MAX_FILES_PER_COMMIT) + return False + + # Check for protected files + for file_path in plan.files_to_modify: + if file_path in PROTECTED_FILES: + logger.error("Plan tries to modify protected file: %s", file_path) + return False + + # Check all files have test coverage + for file_path in plan.files_to_modify: + # This is async, so we check in _get_relevant_files + pass + + return True + + async def _execute_edit( + self, + plan: EditPlan, + task_description: str, + retry_count: int = 0, + ) -> dict: + """Execute the edit using Aider or direct editing. + + Args: + plan: EditPlan to execute + task_description: Original task description + retry_count: Current retry attempt + + Returns: + Dict with success, test_output, error + """ + all_files = plan.files_to_modify + plan.files_to_create + + if not all_files: + return {"success": False, "error": "No files to modify"} + + # Try Aider first + if await self._aider_available(): + return await self._execute_with_aider(plan, task_description, all_files) + else: + # Fallback to direct editing + return await self._execute_direct_edit(plan, task_description) + + async def _aider_available(self) -> bool: + """Check if Aider is available.""" + try: + result = await asyncio.create_subprocess_exec( + "aider", "--version", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await result.wait() + return result.returncode == 0 + except FileNotFoundError: + return False + + async def _execute_with_aider( + self, + plan: EditPlan, + task_description: str, + files: list[str], + ) -> dict: + """Execute edit using Aider. + + Args: + plan: EditPlan + task_description: Task description + files: Files to edit + + Returns: + Dict with success, test_output + """ + cmd = [ + "aider", + "--model", "ollama_chat/qwen2.5-coder:14b-instruct", + "--auto-test", + "--test-cmd", "python -m pytest tests/ -xvs", + "--yes", + "--no-git", + "--message", f"{task_description}\n\nApproach: {plan.approach}", + ] + files + + logger.info("Running Aider: %s", " ".join(cmd)) + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + cwd=self.repo_path, + ) + + stdout, _ = await asyncio.wait_for( + proc.communicate(), + timeout=300.0, + ) + + output = stdout.decode() if stdout else "" + + # Check if tests passed + success = proc.returncode == 0 and "passed" in output.lower() + + return { + "success": success, + "test_output": output, + } + + except asyncio.TimeoutError: + logger.error("Aider timed out after 300s") + return { + "success": False, + "error": "Timeout", + "test_output": "Aider timed out after 300s", + } + except Exception as e: + logger.error("Aider execution failed: %s", e) + return { + "success": False, + "error": str(e), + "test_output": "", + } + + async def _execute_direct_edit( + self, + plan: EditPlan, + task_description: str, + ) -> dict: + """Execute edit via direct file modification (fallback). + + Args: + plan: EditPlan + task_description: Task description + + Returns: + Dict with success, test_output + """ + if not self.llm_adapter: + return { + "success": False, + "error": "No LLM adapter for direct editing", + } + + # Edit each file + for file_path in plan.files_to_modify: + full_path = self.repo_path / file_path + + if not full_path.exists(): + logger.error("File does not exist: %s", file_path) + continue + + try: + content = full_path.read_text() + + # Build edit prompt + edit_prompt = f"""Edit this Python file to implement the task. + +Task: {task_description} +Approach: {plan.approach} + +Current file content: +```python +{content} +``` + +Provide the complete new file content. Only return the code, no explanation. +""" + + response = await self.llm_adapter.chat(message=edit_prompt) + new_content = response.content + + # Strip code fences if present + new_content = self._strip_code_fences(new_content) + + # Validate with AST + try: + ast.parse(new_content) + except SyntaxError as e: + logger.error("Generated code has syntax error: %s", e) + return { + "success": False, + "error": f"Syntax error in generated code: {e}", + } + + # Write file + full_path.write_text(new_content) + logger.info("Modified: %s", file_path) + + except Exception as e: + logger.error("Failed to edit %s: %s", file_path, e) + return { + "success": False, + "error": f"Failed to edit {file_path}: {e}", + } + + # Run tests + return await self._run_tests() + + def _strip_code_fences(self, content: str) -> str: + """Strip markdown code fences from content.""" + lines = content.split("\n") + + # Remove opening fence + if lines and lines[0].startswith("```"): + lines = lines[1:] + + # Remove closing fence + if lines and lines[-1].startswith("```"): + lines = lines[:-1] + + return "\n".join(lines) + + async def _run_tests(self) -> dict: + """Run tests and return results. + + Returns: + Dict with success, test_output + """ + cmd = ["python", "-m", "pytest", "tests/", "-x", "--tb=short"] + + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + cwd=self.repo_path, + ) + + stdout, _ = await asyncio.wait_for( + proc.communicate(), + timeout=120.0, + ) + + output = stdout.decode() if stdout else "" + + return { + "success": proc.returncode == 0, + "test_output": output, + } + + except asyncio.TimeoutError: + return { + "success": False, + "error": "Tests timed out", + "test_output": "Timeout after 120s", + } + except Exception as e: + return { + "success": False, + "error": str(e), + "test_output": "", + } + + async def _log_success( + self, + task_description: str, + plan: EditPlan, + commit_hash: str, + test_results: str, + diff: str, + ) -> int: + """Log successful attempt. + + Returns: + Attempt ID + """ + attempt = ModificationAttempt( + task_description=task_description, + approach=plan.approach, + files_modified=plan.files_to_modify + plan.files_to_create, + diff=diff[:5000], # Truncate for storage + test_results=test_results, + outcome=Outcome.SUCCESS, + ) + + attempt_id = await self.journal.log_attempt(attempt) + + # Generate and store reflection + reflection_text = await self.reflection.reflect_on_attempt(attempt) + await self.journal.update_reflection(attempt_id, reflection_text) + + return attempt_id + + async def _log_failure( + self, + task_description: str, + plan: EditPlan, + test_results: str, + error: str, + ) -> int: + """Log failed attempt. + + Returns: + Attempt ID + """ + attempt = ModificationAttempt( + task_description=task_description, + approach=plan.approach, + files_modified=plan.files_to_modify, + test_results=test_results, + outcome=Outcome.FAILURE, + failure_analysis=error, + retry_count=MAX_RETRIES, + ) + + attempt_id = await self.journal.log_attempt(attempt) + + # Generate reflection even for failures + reflection_text = await self.reflection.reflect_on_attempt(attempt) + await self.journal.update_reflection(attempt_id, reflection_text) + + return attempt_id + + +# MCP Tool Schema +self_edit_schema = { + "type": "object", + "properties": { + "task_description": { + "type": "string", + "description": "Description of the code modification to make", + }, + "context": { + "type": "object", + "description": "Optional additional context for the modification", + }, + }, + "required": ["task_description"], +} + + +# Global tool instance (singleton pattern) +_self_edit_tool: Optional[SelfEditTool] = None + + +async def self_edit_tool(task_description: str, context: Optional[dict] = None) -> dict: + """MCP tool entry point for self-edit. + + Args: + task_description: What to modify + context: Optional context + + Returns: + Dict with result + """ + global _self_edit_tool + + if _self_edit_tool is None: + _self_edit_tool = SelfEditTool() + + result = await _self_edit_tool.execute(task_description, context) + + return { + "success": result.success, + "message": result.message, + "attempt_id": result.attempt_id, + "files_modified": result.files_modified, + "commit_hash": result.commit_hash, + "test_results": result.test_results, + } + + +def register_self_edit_tool(registry: Any, llm_adapter: Optional[object] = None) -> None: + """Register the self-edit tool with MCP registry. + + Args: + registry: MCP ToolRegistry + llm_adapter: Optional LLM adapter + """ + global _self_edit_tool + _self_edit_tool = SelfEditTool(llm_adapter=llm_adapter) + + registry.register( + name="self_edit", + schema=self_edit_schema, + handler=self_edit_tool, + category="self_coding", + requires_confirmation=True, # Safety: require user approval + tags=["self-modification", "code-generation"], + source_module="tools.self_edit", + ) + + logger.info("Self-edit tool registered with MCP") diff --git a/tests/test_self_edit_tool.py b/tests/test_self_edit_tool.py new file mode 100644 index 0000000..2ce2d7a --- /dev/null +++ b/tests/test_self_edit_tool.py @@ -0,0 +1,398 @@ +"""Tests for Self-Edit MCP Tool. + +Tests the complete self-edit workflow with mocked dependencies. +""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from tools.self_edit import ( + MAX_FILES_PER_COMMIT, + MAX_RETRIES, + PROTECTED_FILES, + EditPlan, + SelfEditResult, + SelfEditTool, + register_self_edit_tool, + self_edit_tool, +) + + +@pytest.fixture +def temp_repo(): + """Create a temporary git repository.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) + + # Initialize git + import subprocess + subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "config", "user.email", "test@test.com"], + cwd=repo_path, check=True, capture_output=True, + ) + subprocess.run( + ["git", "config", "user.name", "Test"], + cwd=repo_path, check=True, capture_output=True, + ) + + # Create src structure + src_path = repo_path / "src" / "myproject" + src_path.mkdir(parents=True) + + (src_path / "__init__.py").write_text("") + (src_path / "app.py").write_text(''' +"""Main application.""" + +def hello(): + return "Hello" +''') + + # Create tests + tests_path = repo_path / "tests" + tests_path.mkdir() + (tests_path / "test_app.py").write_text(''' +"""Tests for app.""" +from myproject.app import hello + +def test_hello(): + assert hello() == "Hello" +''') + + # Initial commit + subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-m", "Initial"], + cwd=repo_path, check=True, capture_output=True, + ) + subprocess.run( + ["git", "branch", "-M", "main"], + cwd=repo_path, check=True, capture_output=True, + ) + + yield repo_path + + +@pytest.fixture(autouse=True) +def mock_settings(): + """Mock settings to enable self-modification.""" + with patch('tools.self_edit.settings') as mock_settings: + mock_settings.self_modify_enabled = True + yield mock_settings + + +@pytest.fixture +def mock_llm(): + """Create mock LLM adapter.""" + mock = AsyncMock() + mock.chat.return_value = MagicMock( + content="""APPROACH: Add error handling +FILES_TO_MODIFY: src/myproject/app.py +FILES_TO_CREATE: +TESTS_TO_ADD: tests/test_app.py +EXPLANATION: Wrap function in try/except""" + ) + return mock + + +@pytest.mark.asyncio +class TestSelfEditToolBasics: + """Basic functionality tests.""" + + async def test_initialization(self, temp_repo): + """Should initialize with services.""" + tool = SelfEditTool(repo_path=temp_repo) + + assert tool.repo_path == temp_repo + assert tool.git is not None + assert tool.indexer is not None + assert tool.journal is not None + assert tool.reflection is not None + + async def test_preflight_checks_clean_repo(self, temp_repo): + """Should pass preflight on clean repo.""" + tool = SelfEditTool(repo_path=temp_repo) + + assert await tool._preflight_checks() is True + + async def test_preflight_checks_dirty_repo(self, temp_repo): + """Should fail preflight on dirty repo.""" + tool = SelfEditTool(repo_path=temp_repo) + + # Make uncommitted change + (temp_repo / "dirty.txt").write_text("dirty") + + assert await tool._preflight_checks() is False + + async def test_preflight_checks_wrong_branch(self, temp_repo): + """Should fail preflight when not on main.""" + tool = SelfEditTool(repo_path=temp_repo) + + # Create and checkout feature branch + import subprocess + subprocess.run( + ["git", "checkout", "-b", "feature"], + cwd=temp_repo, check=True, capture_output=True, + ) + + assert await tool._preflight_checks() is False + + +@pytest.mark.asyncio +class TestSelfEditToolPlanning: + """Edit planning tests.""" + + async def test_plan_edit_with_llm(self, temp_repo, mock_llm): + """Should generate plan using LLM.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm) + await tool._ensure_indexed() + + plan = await tool._plan_edit( + task_description="Add error handling", + relevant_files=["src/myproject/app.py"], + similar_attempts=[], + ) + + assert isinstance(plan, EditPlan) + assert plan.approach == "Add error handling" + assert "src/myproject/app.py" in plan.files_to_modify + + async def test_plan_edit_without_llm(self, temp_repo): + """Should generate fallback plan without LLM.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=None) + await tool._ensure_indexed() + + plan = await tool._plan_edit( + task_description="Add feature", + relevant_files=["src/myproject/app.py"], + similar_attempts=[], + ) + + assert isinstance(plan, EditPlan) + assert len(plan.files_to_modify) > 0 + + async def test_plan_respects_max_files(self, temp_repo, mock_llm): + """Plan should respect MAX_FILES_PER_COMMIT.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm) + await tool._ensure_indexed() + + # Mock LLM to return many files + mock_llm.chat.return_value = MagicMock( + content="FILES_TO_MODIFY: " + ",".join([f"file{i}.py" for i in range(10)]) + ) + + plan = await tool._plan_edit( + task_description="Test", + relevant_files=[f"file{i}.py" for i in range(10)], + similar_attempts=[], + ) + + assert len(plan.files_to_modify) <= MAX_FILES_PER_COMMIT + + +@pytest.mark.asyncio +class TestSelfEditToolValidation: + """Safety constraint validation tests.""" + + async def test_validate_plan_too_many_files(self, temp_repo): + """Should reject plan with too many files.""" + tool = SelfEditTool(repo_path=temp_repo) + + plan = EditPlan( + approach="Test", + files_to_modify=[f"file{i}.py" for i in range(MAX_FILES_PER_COMMIT + 1)], + files_to_create=[], + tests_to_add=[], + explanation="Test", + ) + + assert tool._validate_plan(plan) is False + + async def test_validate_plan_protected_file(self, temp_repo): + """Should reject plan modifying protected files.""" + tool = SelfEditTool(repo_path=temp_repo) + + plan = EditPlan( + approach="Test", + files_to_modify=["src/tools/self_edit.py"], + files_to_create=[], + tests_to_add=[], + explanation="Test", + ) + + assert tool._validate_plan(plan) is False + + async def test_validate_plan_valid(self, temp_repo): + """Should accept valid plan.""" + tool = SelfEditTool(repo_path=temp_repo) + + plan = EditPlan( + approach="Test", + files_to_modify=["src/myproject/app.py"], + files_to_create=[], + tests_to_add=[], + explanation="Test", + ) + + assert tool._validate_plan(plan) is True + + +@pytest.mark.asyncio +class TestSelfEditToolExecution: + """Edit execution tests.""" + + async def test_strip_code_fences(self, temp_repo): + """Should strip markdown code fences.""" + tool = SelfEditTool(repo_path=temp_repo) + + content = "```python\ndef test(): pass\n```" + result = tool._strip_code_fences(content) + + assert "```" not in result + assert "def test(): pass" in result + + async def test_parse_list(self, temp_repo): + """Should parse comma-separated lists.""" + tool = SelfEditTool(repo_path=temp_repo) + + assert tool._parse_list("a, b, c") == ["a", "b", "c"] + assert tool._parse_list("none") == [] + assert tool._parse_list("") == [] + assert tool._parse_list("N/A") == [] + + +@pytest.mark.asyncio +class TestSelfEditToolIntegration: + """Integration tests with mocked dependencies.""" + + async def test_successful_edit_flow(self, temp_repo, mock_llm): + """Test complete successful edit flow.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm) + + # Mock Aider to succeed + with patch.object(tool, '_aider_available', return_value=False): + with patch.object(tool, '_execute_direct_edit') as mock_exec: + mock_exec.return_value = { + "success": True, + "test_output": "1 passed", + } + + result = await tool.execute("Add error handling") + + assert result.success is True + assert result.attempt_id is not None + + async def test_failed_edit_with_rollback(self, temp_repo, mock_llm): + """Test failed edit with rollback.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm) + + # Mock execution to always fail + with patch.object(tool, '_execute_edit') as mock_exec: + mock_exec.return_value = { + "success": False, + "error": "Tests failed", + "test_output": "1 failed", + } + + result = await tool.execute("Add broken feature") + + assert result.success is False + assert result.attempt_id is not None + assert "failed" in result.message.lower() or "retry" in result.message.lower() + + async def test_preflight_failure(self, temp_repo): + """Should fail early if preflight checks fail.""" + tool = SelfEditTool(repo_path=temp_repo) + + # Make repo dirty + (temp_repo / "dirty.txt").write_text("dirty") + + result = await tool.execute("Some task") + + assert result.success is False + assert "pre-flight" in result.message.lower() + + +@pytest.mark.asyncio +class TestSelfEditMCPRegistration: + """MCP tool registration tests.""" + + async def test_register_self_edit_tool(self): + """Should register with MCP registry.""" + mock_registry = MagicMock() + mock_llm = AsyncMock() + + register_self_edit_tool(mock_registry, mock_llm) + + mock_registry.register.assert_called_once() + call_args = mock_registry.register.call_args + + assert call_args.kwargs["name"] == "self_edit" + assert call_args.kwargs["requires_confirmation"] is True + assert "self_coding" in call_args.kwargs["category"] + + +@pytest.mark.asyncio +class TestSelfEditGlobalTool: + """Global tool instance tests.""" + + async def test_self_edit_tool_singleton(self, temp_repo): + """Should use singleton pattern.""" + from tools import self_edit as self_edit_module + + # Reset singleton + self_edit_module._self_edit_tool = None + + # First call should initialize + with patch.object(SelfEditTool, '__init__', return_value=None) as mock_init: + mock_init.return_value = None + + with patch.object(SelfEditTool, 'execute') as mock_execute: + mock_execute.return_value = SelfEditResult( + success=True, + message="Test", + ) + + await self_edit_tool("Test task") + + mock_init.assert_called_once() + mock_execute.assert_called_once() + + +@pytest.mark.asyncio +class TestSelfEditErrorHandling: + """Error handling tests.""" + + async def test_exception_handling(self, temp_repo): + """Should handle exceptions gracefully.""" + tool = SelfEditTool(repo_path=temp_repo) + + # Mock preflight to raise exception + with patch.object(tool, '_preflight_checks', side_effect=Exception("Unexpected")): + result = await tool.execute("Test task") + + assert result.success is False + assert "exception" in result.message.lower() + + async def test_llm_failure_fallback(self, temp_repo, mock_llm): + """Should fallback when LLM fails.""" + tool = SelfEditTool(repo_path=temp_repo, llm_adapter=mock_llm) + await tool._ensure_indexed() + + # Mock LLM to fail + mock_llm.chat.side_effect = Exception("LLM timeout") + + plan = await tool._plan_edit( + task_description="Test", + relevant_files=["src/app.py"], + similar_attempts=[], + ) + + # Should return fallback plan + assert isinstance(plan, EditPlan) + assert len(plan.files_to_modify) > 0