merge: resolve conflicts with main, keep comprehensive chat pipeline

Resolved merge conflicts in agents.py and test_task_queue.py:
- Keep full chat-to-task pipeline (agent/priority extraction, question
  filtering, context injection) over simpler main version
- Incorporate test_briefing_task_queue_summary from main
- All 64 task queue tests pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Payne
2026-02-26 11:47:34 -05:00
14 changed files with 4576 additions and 0 deletions

View File

@@ -0,0 +1,352 @@
"""Tests for Codebase Indexer.
Uses temporary directories with Python files to test AST parsing and indexing.
"""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from self_coding.codebase_indexer import CodebaseIndexer, ModuleInfo
@pytest.fixture
def temp_repo():
"""Create a temporary repository with Python files."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
# Create src directory structure
src_path = repo_path / "src" / "myproject"
src_path.mkdir(parents=True)
# Create a module with classes and functions
(src_path / "utils.py").write_text('''
"""Utility functions for the project."""
import os
from typing import Optional
class Helper:
"""A helper class for common operations."""
def __init__(self, name: str):
self.name = name
async def process(self, data: str) -> str:
"""Process the input data."""
return data.upper()
def cleanup(self):
"""Clean up resources."""
pass
def calculate_something(x: int, y: int) -> int:
"""Calculate something from x and y."""
return x + y
def untested_function():
pass
''')
# Create another module that imports from utils
(src_path / "main.py").write_text('''
"""Main application module."""
from myproject.utils import Helper, calculate_something
import os
class Application:
"""Main application class."""
def run(self):
helper = Helper("test")
result = calculate_something(1, 2)
return result
''')
# Create tests
tests_path = repo_path / "tests"
tests_path.mkdir()
(tests_path / "test_utils.py").write_text('''
"""Tests for utils module."""
import pytest
from myproject.utils import Helper, calculate_something
def test_helper_process():
helper = Helper("test")
assert helper.process("hello") == "HELLO"
def test_calculate_something():
assert calculate_something(2, 3) == 5
''')
yield repo_path
@pytest.fixture
def indexer(temp_repo):
"""Create CodebaseIndexer for temp repo."""
import uuid
return CodebaseIndexer(
repo_path=temp_repo,
db_path=temp_repo / f"test_index_{uuid.uuid4().hex[:8]}.db",
src_dirs=["src", "tests"],
)
@pytest.mark.asyncio
class TestCodebaseIndexerBasics:
"""Basic indexing functionality."""
async def test_index_all_counts(self, indexer):
"""Should index all Python files."""
stats = await indexer.index_all()
assert stats["indexed"] == 3 # utils.py, main.py, test_utils.py
assert stats["failed"] == 0
async def test_index_skips_unchanged(self, indexer):
"""Should skip unchanged files on second run."""
await indexer.index_all()
# Second index should skip all
stats = await indexer.index_all()
assert stats["skipped"] == 3
assert stats["indexed"] == 0
async def test_index_changed_detects_updates(self, indexer, temp_repo):
"""Should reindex changed files."""
await indexer.index_all()
# Modify a file
utils_path = temp_repo / "src" / "myproject" / "utils.py"
content = utils_path.read_text()
utils_path.write_text(content + "\n# Modified\n")
# Incremental index should detect change
stats = await indexer.index_changed()
assert stats["indexed"] == 1
assert stats["skipped"] == 2
@pytest.mark.asyncio
class TestCodebaseIndexerParsing:
"""AST parsing accuracy."""
async def test_parses_classes(self, indexer):
"""Should extract class information."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/utils.py")
assert info is not None
class_names = [c.name for c in info.classes]
assert "Helper" in class_names
async def test_parses_class_methods(self, indexer):
"""Should extract class methods."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/utils.py")
helper = [c for c in info.classes if c.name == "Helper"][0]
method_names = [m.name for m in helper.methods]
assert "process" in method_names
assert "cleanup" in method_names
async def test_parses_function_signatures(self, indexer):
"""Should extract function signatures."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/utils.py")
func_names = [f.name for f in info.functions]
assert "calculate_something" in func_names
assert "untested_function" in func_names
# Check signature details
calc_func = [f for f in info.functions if f.name == "calculate_something"][0]
assert calc_func.returns == "int"
assert "x" in calc_func.args[0] if calc_func.args else True
async def test_parses_imports(self, indexer):
"""Should extract import statements."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/main.py")
assert "myproject.utils.Helper" in info.imports
assert "myproject.utils.calculate_something" in info.imports
assert "os" in info.imports
async def test_parses_docstrings(self, indexer):
"""Should extract module and class docstrings."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/utils.py")
assert "Utility functions" in info.docstring
assert "helper class" in info.classes[0].docstring.lower()
@pytest.mark.asyncio
class TestCodebaseIndexerTestCoverage:
"""Test coverage mapping."""
async def test_maps_test_files(self, indexer):
"""Should map source files to test files."""
await indexer.index_all()
info = await indexer.get_module_info("src/myproject/utils.py")
assert info.test_coverage is not None
assert "test_utils.py" in info.test_coverage
async def test_has_test_coverage_method(self, indexer):
"""Should check if file has test coverage."""
await indexer.index_all()
assert await indexer.has_test_coverage("src/myproject/utils.py") is True
# main.py has no corresponding test file
assert await indexer.has_test_coverage("src/myproject/main.py") is False
@pytest.mark.asyncio
class TestCodebaseIndexerDependencies:
"""Dependency graph building."""
async def test_builds_dependency_graph(self, indexer):
"""Should build import dependency graph."""
await indexer.index_all()
# main.py imports from utils.py
deps = await indexer.get_dependency_chain("src/myproject/utils.py")
assert "src/myproject/main.py" in deps
async def test_empty_dependency_chain(self, indexer):
"""Should return empty list for files with no dependents."""
await indexer.index_all()
# test_utils.py likely doesn't have dependents
deps = await indexer.get_dependency_chain("tests/test_utils.py")
assert deps == []
@pytest.mark.asyncio
class TestCodebaseIndexerSummary:
"""Summary generation."""
async def test_generates_summary(self, indexer):
"""Should generate codebase summary."""
await indexer.index_all()
summary = await indexer.get_summary()
assert "Codebase Summary" in summary
assert "myproject.utils" in summary
assert "Helper" in summary
assert "calculate_something" in summary
async def test_summary_respects_max_tokens(self, indexer):
"""Should truncate if summary exceeds max tokens."""
await indexer.index_all()
# Very small limit
summary = await indexer.get_summary(max_tokens=10)
assert len(summary) <= 10 * 4 + 100 # rough check with buffer
@pytest.mark.asyncio
class TestCodebaseIndexerRelevance:
"""Relevant file search."""
async def test_finds_relevant_files(self, indexer):
"""Should find files relevant to task description."""
await indexer.index_all()
files = await indexer.get_relevant_files("calculate something with helper", limit=5)
assert "src/myproject/utils.py" in files
async def test_relevance_scoring(self, indexer):
"""Should score files by keyword match."""
await indexer.index_all()
files = await indexer.get_relevant_files("process data with helper", limit=5)
# utils.py should be first (has Helper class with process method)
assert files[0] == "src/myproject/utils.py"
async def test_returns_empty_for_no_matches(self, indexer):
"""Should return empty list when no files match."""
await indexer.index_all()
# Use truly unique keywords that won't match anything in the codebase
files = await indexer.get_relevant_files("astronaut dinosaur zebra unicorn", limit=5)
assert files == []
@pytest.mark.asyncio
class TestCodebaseIndexerIntegration:
"""Full workflow integration tests."""
async def test_full_index_query_workflow(self, temp_repo):
"""Complete workflow: index, query, get dependencies."""
indexer = CodebaseIndexer(
repo_path=temp_repo,
db_path=temp_repo / "integration.db",
src_dirs=["src", "tests"],
)
# Index all files
stats = await indexer.index_all()
assert stats["indexed"] == 3
# Get summary
summary = await indexer.get_summary()
assert "Helper" in summary
# Find relevant files
files = await indexer.get_relevant_files("helper class", limit=3)
assert len(files) > 0
# Check dependencies
deps = await indexer.get_dependency_chain("src/myproject/utils.py")
assert "src/myproject/main.py" in deps
# Verify test coverage
has_tests = await indexer.has_test_coverage("src/myproject/utils.py")
assert has_tests is True
async def test_handles_syntax_errors_gracefully(self, temp_repo):
"""Should skip files with syntax errors."""
# Create a file with syntax error
(temp_repo / "src" / "bad.py").write_text("def broken(:")
indexer = CodebaseIndexer(
repo_path=temp_repo,
db_path=temp_repo / "syntax_error.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
# Should index the good files, fail on bad one
assert stats["failed"] == 1
assert stats["indexed"] >= 2

View File

@@ -0,0 +1,441 @@
"""Error path tests for Codebase Indexer.
Tests syntax errors, encoding issues, circular imports, and edge cases.
"""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from self_coding.codebase_indexer import CodebaseIndexer, ModuleInfo
@pytest.mark.asyncio
class TestCodebaseIndexerErrors:
"""Indexing error handling."""
async def test_syntax_error_file(self):
"""Should skip files with syntax errors."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Valid file
(src_path / "good.py").write_text("def good(): pass")
# File with syntax error
(src_path / "bad.py").write_text("def bad(:\n pass")
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
assert stats["indexed"] == 1
assert stats["failed"] == 1
async def test_unicode_in_source(self):
"""Should handle Unicode in source files."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# File with Unicode
(src_path / "unicode.py").write_text(
'# -*- coding: utf-8 -*-\n'
'"""Module with Unicode: ñ 中文 🎉"""\n'
'def hello():\n'
' """Returns 👋"""\n'
' return "hello"\n',
encoding="utf-8",
)
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
assert stats["indexed"] == 1
assert stats["failed"] == 0
info = await indexer.get_module_info("src/unicode.py")
assert "中文" in info.docstring
async def test_empty_file(self):
"""Should handle empty Python files."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Empty file
(src_path / "empty.py").write_text("")
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
assert stats["indexed"] == 1
info = await indexer.get_module_info("src/empty.py")
assert info is not None
assert info.functions == []
assert info.classes == []
async def test_large_file(self):
"""Should handle large Python files."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Large file with many functions
content = ['"""Large module."""']
for i in range(100):
content.append(f'def function_{i}(x: int) -> int:')
content.append(f' """Function {i}."""')
content.append(f' return x + {i}')
content.append('')
(src_path / "large.py").write_text("\n".join(content))
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
assert stats["indexed"] == 1
info = await indexer.get_module_info("src/large.py")
assert len(info.functions) == 100
async def test_nested_classes(self):
"""Should handle nested classes."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
(src_path / "nested.py").write_text('''
"""Module with nested classes."""
class Outer:
"""Outer class."""
class Inner:
"""Inner class."""
def inner_method(self):
pass
def outer_method(self):
pass
''')
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
info = await indexer.get_module_info("src/nested.py")
# Should find Outer class (top-level)
assert len(info.classes) == 1
assert info.classes[0].name == "Outer"
# Outer should have outer_method
assert len(info.classes[0].methods) == 1
assert info.classes[0].methods[0].name == "outer_method"
async def test_complex_type_annotations(self):
"""Should handle complex type annotations."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
(src_path / "types.py").write_text('''
"""Module with complex types."""
from typing import Dict, List, Optional, Union, Callable
def complex_function(
items: List[Dict[str, Union[int, str]]],
callback: Callable[[int], bool],
optional: Optional[str] = None,
) -> Dict[str, List[int]]:
"""Function with complex types."""
return {}
class TypedClass:
"""Class with type annotations."""
def method(self, x: int | str) -> list[int]:
"""Method with union type (Python 3.10+)."""
return []
''')
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
info = await indexer.get_module_info("src/types.py")
# Should parse without error
assert len(info.functions) == 1
assert len(info.classes) == 1
async def test_import_variations(self):
"""Should handle various import styles."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
(src_path / "imports.py").write_text('''
"""Module with various imports."""
# Regular imports
import os
import sys as system
from pathlib import Path
# From imports
from typing import Dict, List
from collections import OrderedDict as OD
# Relative imports (may not resolve)
from . import sibling
from .subpackage import module
# Dynamic imports (won't be caught by AST)
try:
import optional_dep
except ImportError:
pass
''')
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
info = await indexer.get_module_info("src/imports.py")
# Should capture static imports
assert "os" in info.imports
assert "typing.Dict" in info.imports or "Dict" in str(info.imports)
async def test_no_src_directory(self):
"""Should handle missing src directory gracefully."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src", "tests"],
)
stats = await indexer.index_all()
assert stats["indexed"] == 0
assert stats["failed"] == 0
async def test_permission_error(self):
"""Should handle permission errors gracefully."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Create file
file_path = src_path / "locked.py"
file_path.write_text("def test(): pass")
# Remove read permission (if on Unix)
import os
try:
os.chmod(file_path, 0o000)
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
# Should count as failed
assert stats["failed"] == 1
finally:
# Restore permission for cleanup
os.chmod(file_path, 0o644)
async def test_circular_imports_in_dependency_graph(self):
"""Should handle circular imports in dependency analysis."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Create circular imports
(src_path / "a.py").write_text('''
"""Module A."""
from b import B
class A:
def get_b(self):
return B()
''')
(src_path / "b.py").write_text('''
"""Module B."""
from a import A
class B:
def get_a(self):
return A()
''')
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
# Both should have each other as dependencies
a_deps = await indexer.get_dependency_chain("src/a.py")
b_deps = await indexer.get_dependency_chain("src/b.py")
# Note: Due to import resolution, this might not be perfect
# but it shouldn't crash
assert isinstance(a_deps, list)
assert isinstance(b_deps, list)
async def test_summary_with_no_modules(self):
"""Summary should handle empty codebase."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
summary = await indexer.get_summary()
assert "Codebase Summary" in summary
assert "Total modules: 0" in summary
async def test_get_relevant_files_with_special_chars(self):
"""Should handle special characters in search query."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
(src_path / "test.py").write_text('def test(): pass')
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
await indexer.index_all()
# Search with special chars shouldn't crash
files = await indexer.get_relevant_files("test!@#$%^&*()", limit=5)
assert isinstance(files, list)
async def test_concurrent_indexing(self):
"""Should handle concurrent indexing attempts."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
(src_path / "file.py").write_text("def test(): pass")
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
# Multiple rapid indexing calls
import asyncio
tasks = [
indexer.index_all(),
indexer.index_all(),
indexer.index_all(),
]
results = await asyncio.gather(*tasks)
# All should complete without error
for stats in results:
assert stats["indexed"] >= 0
assert stats["failed"] >= 0
async def test_binary_file_in_src(self):
"""Should skip binary files in src directory."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
src_path = repo_path / "src"
src_path.mkdir()
# Binary file
(src_path / "data.bin").write_bytes(b"\x00\x01\x02\x03")
# Python file
(src_path / "script.py").write_text("def test(): pass")
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "index.db",
src_dirs=["src"],
)
stats = await indexer.index_all()
# Should only index .py file
assert stats["indexed"] == 1
assert stats["failed"] == 0 # Binary files are skipped, not failed

428
tests/test_git_safety.py Normal file
View File

@@ -0,0 +1,428 @@
"""Tests for Git Safety Layer.
Uses temporary git repositories to test snapshot/rollback/merge workflows
without affecting the actual Timmy repository.
"""
from __future__ import annotations
import asyncio
import os
import subprocess
import tempfile
from pathlib import Path
import pytest
from self_coding.git_safety import (
GitSafety,
GitDirtyWorkingDirectoryError,
GitNotRepositoryError,
GitOperationError,
Snapshot,
)
@pytest.fixture
def temp_git_repo():
"""Create a temporary git repository for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
# Initialize git repo
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "config", "user.email", "test@test.com"],
cwd=repo_path,
check=True,
capture_output=True,
)
subprocess.run(
["git", "config", "user.name", "Test User"],
cwd=repo_path,
check=True,
capture_output=True,
)
# Create initial file and commit
(repo_path / "README.md").write_text("# Test Repo")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", "Initial commit"],
cwd=repo_path,
check=True,
capture_output=True,
)
# Rename master to main if needed
result = subprocess.run(
["git", "branch", "-M", "main"],
cwd=repo_path,
capture_output=True,
)
yield repo_path
@pytest.fixture
def git_safety(temp_git_repo):
"""Create GitSafety instance for temp repo."""
safety = GitSafety(
repo_path=temp_git_repo,
main_branch="main",
test_command="echo 'No tests configured'", # Fake test command
)
return safety
@pytest.mark.asyncio
class TestGitSafetyBasics:
"""Basic git operations."""
async def test_init_with_valid_repo(self, temp_git_repo):
"""Should initialize successfully with valid git repo."""
safety = GitSafety(repo_path=temp_git_repo)
assert safety.repo_path == temp_git_repo.resolve()
assert safety.main_branch == "main"
async def test_init_with_invalid_repo(self):
"""Should raise GitNotRepositoryError for non-repo path."""
with tempfile.TemporaryDirectory() as tmpdir:
with pytest.raises(GitNotRepositoryError):
GitSafety(repo_path=tmpdir)
async def test_is_clean_clean_repo(self, git_safety, temp_git_repo):
"""Should return True for clean repo."""
safety = git_safety
assert await safety.is_clean() is True
async def test_is_clean_dirty_repo(self, git_safety, temp_git_repo):
"""Should return False when there are uncommitted changes."""
safety = git_safety
# Create uncommitted file
(temp_git_repo / "dirty.txt").write_text("dirty")
assert await safety.is_clean() is False
async def test_get_current_branch(self, git_safety):
"""Should return current branch name."""
safety = git_safety
branch = await safety.get_current_branch()
assert branch == "main"
async def test_get_current_commit(self, git_safety):
"""Should return valid commit hash."""
safety = git_safety
commit = await safety.get_current_commit()
assert len(commit) == 40 # Full SHA-1 hash
assert all(c in "0123456789abcdef" for c in commit)
@pytest.mark.asyncio
class TestGitSafetySnapshot:
"""Snapshot functionality."""
async def test_snapshot_returns_snapshot_object(self, git_safety):
"""Should return Snapshot with all fields populated."""
safety = git_safety
snapshot = await safety.snapshot(run_tests=False)
assert isinstance(snapshot, Snapshot)
assert len(snapshot.commit_hash) == 40
assert snapshot.branch == "main"
assert snapshot.timestamp is not None
assert snapshot.clean is True
async def test_snapshot_captures_clean_status(self, git_safety, temp_git_repo):
"""Should correctly capture clean/dirty status."""
safety = git_safety
# Clean snapshot
clean_snapshot = await safety.snapshot(run_tests=False)
assert clean_snapshot.clean is True
# Dirty snapshot
(temp_git_repo / "dirty.txt").write_text("dirty")
dirty_snapshot = await safety.snapshot(run_tests=False)
assert dirty_snapshot.clean is False
async def test_snapshot_with_tests(self, git_safety, temp_git_repo):
"""Should run tests and capture status."""
# Create a passing test
(temp_git_repo / "test_pass.py").write_text("""
def test_pass():
assert True
""")
safety = GitSafety(
repo_path=temp_git_repo,
test_command="python -m pytest test_pass.py -v",
)
snapshot = await safety.snapshot(run_tests=True)
assert snapshot.test_status is True
assert "passed" in snapshot.test_output.lower() or "no tests" not in snapshot.test_output
@pytest.mark.asyncio
class TestGitSafetyBranching:
"""Branch creation and management."""
async def test_create_branch(self, git_safety):
"""Should create and checkout new branch."""
safety = git_safety
branch_name = "timmy/self-edit/test"
result = await safety.create_branch(branch_name)
assert result == branch_name
assert await safety.get_current_branch() == branch_name
async def test_create_branch_from_main(self, git_safety, temp_git_repo):
"""New branch should start from main."""
safety = git_safety
main_commit = await safety.get_current_commit()
await safety.create_branch("feature-branch")
branch_commit = await safety.get_current_commit()
assert branch_commit == main_commit
@pytest.mark.asyncio
class TestGitSafetyCommit:
"""Commit operations."""
async def test_commit_specific_files(self, git_safety, temp_git_repo):
"""Should commit only specified files."""
safety = git_safety
# Create two files
(temp_git_repo / "file1.txt").write_text("content1")
(temp_git_repo / "file2.txt").write_text("content2")
# Commit only file1
commit_hash = await safety.commit("Add file1", ["file1.txt"])
assert len(commit_hash) == 40
# file2 should still be uncommitted
assert await safety.is_clean() is False
async def test_commit_all_changes(self, git_safety, temp_git_repo):
"""Should commit all changes when no files specified."""
safety = git_safety
(temp_git_repo / "new.txt").write_text("new content")
commit_hash = await safety.commit("Add new file")
assert len(commit_hash) == 40
assert await safety.is_clean() is True
async def test_commit_no_changes(self, git_safety):
"""Should handle commit with no changes gracefully."""
safety = git_safety
commit_hash = await safety.commit("No changes")
# Should return current commit when no changes
current = await safety.get_current_commit()
assert commit_hash == current
@pytest.mark.asyncio
class TestGitSafetyDiff:
"""Diff operations."""
async def test_get_diff(self, git_safety, temp_git_repo):
"""Should return diff between commits."""
safety = git_safety
original_commit = await safety.get_current_commit()
# Make a change and commit
(temp_git_repo / "new.txt").write_text("new content")
await safety.commit("Add new file")
new_commit = await safety.get_current_commit()
diff = await safety.get_diff(original_commit, new_commit)
assert "new.txt" in diff
assert "new content" in diff
async def test_get_modified_files(self, git_safety, temp_git_repo):
"""Should list modified files."""
safety = git_safety
original_commit = await safety.get_current_commit()
(temp_git_repo / "file1.txt").write_text("content")
(temp_git_repo / "file2.txt").write_text("content")
await safety.commit("Add files")
files = await safety.get_modified_files(original_commit)
assert "file1.txt" in files
assert "file2.txt" in files
@pytest.mark.asyncio
class TestGitSafetyRollback:
"""Rollback functionality."""
async def test_rollback_to_snapshot(self, git_safety, temp_git_repo):
"""Should rollback to snapshot state."""
safety = git_safety
# Take snapshot
snapshot = await safety.snapshot(run_tests=False)
original_commit = snapshot.commit_hash
# Make change and commit
(temp_git_repo / "feature.txt").write_text("feature")
await safety.commit("Add feature")
# Verify we're on new commit
new_commit = await safety.get_current_commit()
assert new_commit != original_commit
# Rollback
rolled_back = await safety.rollback(snapshot)
assert rolled_back == original_commit
assert await safety.get_current_commit() == original_commit
async def test_rollback_discards_uncommitted_changes(self, git_safety, temp_git_repo):
"""Rollback should discard uncommitted changes."""
safety = git_safety
snapshot = await safety.snapshot(run_tests=False)
# Create uncommitted file
dirty_file = temp_git_repo / "dirty.txt"
dirty_file.write_text("dirty content")
assert dirty_file.exists()
# Rollback
await safety.rollback(snapshot)
# Uncommitted file should be gone
assert not dirty_file.exists()
async def test_rollback_to_commit_hash(self, git_safety, temp_git_repo):
"""Should rollback to raw commit hash."""
safety = git_safety
original_commit = await safety.get_current_commit()
# Make change
(temp_git_repo / "temp.txt").write_text("temp")
await safety.commit("Temp commit")
# Rollback using hash string
await safety.rollback(original_commit)
assert await safety.get_current_commit() == original_commit
@pytest.mark.asyncio
class TestGitSafetyMerge:
"""Merge operations."""
async def test_merge_to_main_success(self, git_safety, temp_git_repo):
"""Should merge feature branch into main when tests pass."""
safety = git_safety
main_commit_before = await safety.get_current_commit()
# Create feature branch
await safety.create_branch("feature/test")
(temp_git_repo / "feature.txt").write_text("feature")
await safety.commit("Add feature")
feature_commit = await safety.get_current_commit()
# Merge back to main (tests pass with echo command)
merge_commit = await safety.merge_to_main("feature/test", require_tests=False)
# Should be on main with new merge commit
assert await safety.get_current_branch() == "main"
assert await safety.get_current_commit() == merge_commit
assert merge_commit != main_commit_before
async def test_merge_to_main_with_tests_failure(self, git_safety, temp_git_repo):
"""Should not merge when tests fail."""
safety = GitSafety(
repo_path=temp_git_repo,
test_command="exit 1", # Always fails
)
# Create feature branch
await safety.create_branch("feature/failing")
(temp_git_repo / "fail.txt").write_text("fail")
await safety.commit("Add failing feature")
# Merge should fail due to tests
with pytest.raises(GitOperationError) as exc_info:
await safety.merge_to_main("feature/failing", require_tests=True)
assert "tests failed" in str(exc_info.value).lower() or "cannot merge" in str(exc_info.value).lower()
@pytest.mark.asyncio
class TestGitSafetyIntegration:
"""Full workflow integration tests."""
async def test_full_self_edit_workflow(self, temp_git_repo):
"""Complete workflow: snapshot → branch → edit → commit → merge."""
safety = GitSafety(
repo_path=temp_git_repo,
test_command="echo 'tests pass'",
)
# 1. Take snapshot
snapshot = await safety.snapshot(run_tests=False)
# 2. Create feature branch
branch = await safety.create_branch("timmy/self-edit/test-feature")
# 3. Make edits
(temp_git_repo / "src" / "feature.py").parent.mkdir(parents=True, exist_ok=True)
(temp_git_repo / "src" / "feature.py").write_text("""
def new_feature():
return "Hello from new feature!"
""")
# 4. Commit
commit = await safety.commit("Add new feature", ["src/feature.py"])
# 5. Merge to main
merge_commit = await safety.merge_to_main(branch, require_tests=False)
# Verify state
assert await safety.get_current_branch() == "main"
assert (temp_git_repo / "src" / "feature.py").exists()
async def test_rollback_on_failure(self, temp_git_repo):
"""Rollback workflow when changes need to be abandoned."""
safety = GitSafety(
repo_path=temp_git_repo,
test_command="echo 'tests pass'",
)
# Snapshot
snapshot = await safety.snapshot(run_tests=False)
original_commit = snapshot.commit_hash
# Create branch and make changes
await safety.create_branch("timmy/self-edit/bad-feature")
(temp_git_repo / "bad.py").write_text("# Bad code")
await safety.commit("Add bad feature")
# Oops! Rollback
await safety.rollback(snapshot)
# Should be back to original state
assert await safety.get_current_commit() == original_commit
assert not (temp_git_repo / "bad.py").exists()

View File

@@ -0,0 +1,263 @@
"""Error path tests for Git Safety Layer.
Tests timeout handling, git failures, merge conflicts, and edge cases.
"""
from __future__ import annotations
import subprocess
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from self_coding.git_safety import (
GitNotRepositoryError,
GitOperationError,
GitSafety,
)
@pytest.mark.asyncio
class TestGitSafetyErrors:
"""Git operation error handling."""
async def test_invalid_repo_path(self):
"""Should raise GitNotRepositoryError for non-repo."""
with tempfile.TemporaryDirectory() as tmpdir:
with pytest.raises(GitNotRepositoryError):
GitSafety(repo_path=tmpdir)
async def test_git_command_failure(self):
"""Should raise GitOperationError on git failure."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Try to checkout non-existent branch
with pytest.raises(GitOperationError):
await safety._run_git("checkout", "nonexistent-branch")
async def test_merge_conflict_detection(self):
"""Should handle merge conflicts gracefully."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
# Create initial file
(repo_path / "file.txt").write_text("original")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "branch", "-M", "main"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Create branch A with changes
await safety.create_branch("branch-a")
(repo_path / "file.txt").write_text("branch-a changes")
await safety.commit("Branch A changes")
# Go back to main, create branch B with conflicting changes
await safety._run_git("checkout", "main")
await safety.create_branch("branch-b")
(repo_path / "file.txt").write_text("branch-b changes")
await safety.commit("Branch B changes")
# Try to merge branch-a into branch-b (will conflict)
with pytest.raises(GitOperationError):
await safety._run_git("merge", "branch-a")
async def test_rollback_after_merge(self):
"""Should be able to rollback even after merge."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Initial commit
(repo_path / "file.txt").write_text("v1")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "v1"], cwd=repo_path, check=True, capture_output=True)
snapshot = await safety.snapshot(run_tests=False)
# Make changes and commit
(repo_path / "file.txt").write_text("v2")
await safety.commit("v2")
# Rollback
await safety.rollback(snapshot)
# Verify
content = (repo_path / "file.txt").read_text()
assert content == "v1"
async def test_snapshot_with_failing_tests(self):
"""Snapshot should capture failing test status."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
# Need an initial commit for HEAD to exist
(repo_path / "initial.txt").write_text("initial")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
# Create failing test
(repo_path / "test_fail.py").write_text("def test_fail(): assert False")
safety = GitSafety(
repo_path=repo_path,
test_command="python -m pytest test_fail.py -v",
)
snapshot = await safety.snapshot(run_tests=True)
assert snapshot.test_status is False
assert "FAILED" in snapshot.test_output or "failed" in snapshot.test_output.lower()
async def test_get_diff_between_commits(self):
"""Should get diff between any two commits."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Commit 1
(repo_path / "file.txt").write_text("version 1")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "v1"], cwd=repo_path, check=True, capture_output=True)
commit1 = await safety.get_current_commit()
# Commit 2
(repo_path / "file.txt").write_text("version 2")
await safety.commit("v2")
commit2 = await safety.get_current_commit()
# Get diff
diff = await safety.get_diff(commit1, commit2)
assert "version 1" in diff
assert "version 2" in diff
async def test_is_clean_with_untracked_files(self):
"""is_clean should return False with untracked files (they count as changes)."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
# Need an initial commit for HEAD to exist
(repo_path / "initial.txt").write_text("initial")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Verify clean state first
assert await safety.is_clean() is True
# Create untracked file
(repo_path / "untracked.txt").write_text("untracked")
# is_clean returns False when there are untracked files
# (git status --porcelain shows ?? for untracked)
assert await safety.is_clean() is False
async def test_empty_commit_allowed(self):
"""Should allow empty commits when requested."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
# Initial commit
(repo_path / "file.txt").write_text("content")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Empty commit
commit_hash = await safety.commit("Empty commit message", allow_empty=True)
assert len(commit_hash) == 40
async def test_modified_files_detection(self):
"""Should detect which files were modified."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path)
# Initial commits
(repo_path / "file1.txt").write_text("content1")
(repo_path / "file2.txt").write_text("content2")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
base_commit = await safety.get_current_commit()
# Modify only file1
(repo_path / "file1.txt").write_text("modified content")
await safety.commit("Modify file1")
# Get modified files
modified = await safety.get_modified_files(base_commit)
assert "file1.txt" in modified
assert "file2.txt" not in modified
async def test_branch_switching(self):
"""Should handle switching between branches."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.email", "test@test.com"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "config", "user.name", "Test"], cwd=repo_path, check=True, capture_output=True)
# Initial commit on master (default branch name)
(repo_path / "main.txt").write_text("main branch content")
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(["git", "commit", "-m", "Initial"], cwd=repo_path, check=True, capture_output=True)
# Rename to main for consistency
subprocess.run(["git", "branch", "-M", "main"], cwd=repo_path, check=True, capture_output=True)
safety = GitSafety(repo_path=repo_path, main_branch="main")
# Create feature branch
await safety.create_branch("feature")
(repo_path / "feature.txt").write_text("feature content")
await safety.commit("Add feature")
# Switch back to main
await safety._run_git("checkout", "main")
# Verify main doesn't have feature.txt
assert not (repo_path / "feature.txt").exists()
# Switch to feature
await safety._run_git("checkout", "feature")
# Verify feature has feature.txt
assert (repo_path / "feature.txt").exists()

View File

@@ -0,0 +1,322 @@
"""Tests for Modification Journal.
Tests logging, querying, and metrics for self-modification attempts.
"""
from __future__ import annotations
import tempfile
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
from self_coding.modification_journal import (
ModificationAttempt,
ModificationJournal,
Outcome,
)
@pytest.fixture
def temp_journal():
"""Create a ModificationJournal with temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "journal.db"
journal = ModificationJournal(db_path=db_path)
yield journal
@pytest.mark.asyncio
class TestModificationJournalLogging:
"""Logging modification attempts."""
async def test_log_attempt_success(self, temp_journal):
"""Should log a successful attempt."""
attempt = ModificationAttempt(
task_description="Add error handling to health endpoint",
approach="Use try/except block",
files_modified=["src/app.py"],
diff="@@ -1,3 +1,7 @@...",
test_results="1 passed",
outcome=Outcome.SUCCESS,
)
attempt_id = await temp_journal.log_attempt(attempt)
assert attempt_id > 0
async def test_log_attempt_failure(self, temp_journal):
"""Should log a failed attempt."""
attempt = ModificationAttempt(
task_description="Refactor database layer",
approach="Extract connection pool",
files_modified=["src/db.py", "src/models.py"],
diff="@@ ...",
test_results="2 failed",
outcome=Outcome.FAILURE,
failure_analysis="Circular dependency introduced",
retry_count=2,
)
attempt_id = await temp_journal.log_attempt(attempt)
# Retrieve and verify
retrieved = await temp_journal.get_by_id(attempt_id)
assert retrieved is not None
assert retrieved.outcome == Outcome.FAILURE
assert retrieved.failure_analysis == "Circular dependency introduced"
assert retrieved.retry_count == 2
@pytest.mark.asyncio
class TestModificationJournalRetrieval:
"""Retrieving logged attempts."""
async def test_get_by_id(self, temp_journal):
"""Should retrieve attempt by ID."""
attempt = ModificationAttempt(
task_description="Fix bug",
outcome=Outcome.SUCCESS,
)
attempt_id = await temp_journal.log_attempt(attempt)
retrieved = await temp_journal.get_by_id(attempt_id)
assert retrieved is not None
assert retrieved.task_description == "Fix bug"
assert retrieved.id == attempt_id
async def test_get_by_id_not_found(self, temp_journal):
"""Should return None for non-existent ID."""
result = await temp_journal.get_by_id(9999)
assert result is None
async def test_find_similar_basic(self, temp_journal):
"""Should find similar attempts by keyword."""
# Log some attempts
await temp_journal.log_attempt(ModificationAttempt(
task_description="Add error handling to API endpoints",
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Add logging to database queries",
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Fix CSS styling on homepage",
outcome=Outcome.FAILURE,
))
# Search for error handling
similar = await temp_journal.find_similar("error handling in endpoints", limit=3)
assert len(similar) > 0
# Should find the API error handling attempt first
assert "error" in similar[0].task_description.lower()
async def test_find_similar_filter_outcome(self, temp_journal):
"""Should filter by outcome when specified."""
await temp_journal.log_attempt(ModificationAttempt(
task_description="Database optimization",
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Database refactoring",
outcome=Outcome.FAILURE,
))
# Search only for successes
similar = await temp_journal.find_similar(
"database work",
include_outcomes=[Outcome.SUCCESS],
)
assert len(similar) == 1
assert similar[0].outcome == Outcome.SUCCESS
async def test_find_similar_empty(self, temp_journal):
"""Should return empty list when no matches."""
await temp_journal.log_attempt(ModificationAttempt(
task_description="Fix bug",
outcome=Outcome.SUCCESS,
))
similar = await temp_journal.find_similar("xyzqwerty unicorn astronaut", limit=5)
assert similar == []
@pytest.mark.asyncio
class TestModificationJournalMetrics:
"""Success rate metrics."""
async def test_get_success_rate_empty(self, temp_journal):
"""Should handle empty journal."""
metrics = await temp_journal.get_success_rate()
assert metrics["overall"] == 0.0
assert metrics["total"] == 0
async def test_get_success_rate_calculated(self, temp_journal):
"""Should calculate success rate correctly."""
# Log various outcomes
for _ in range(5):
await temp_journal.log_attempt(ModificationAttempt(
task_description="Success task",
outcome=Outcome.SUCCESS,
))
for _ in range(3):
await temp_journal.log_attempt(ModificationAttempt(
task_description="Failure task",
outcome=Outcome.FAILURE,
))
for _ in range(2):
await temp_journal.log_attempt(ModificationAttempt(
task_description="Rollback task",
outcome=Outcome.ROLLBACK,
))
metrics = await temp_journal.get_success_rate()
assert metrics["success"] == 5
assert metrics["failure"] == 3
assert metrics["rollback"] == 2
assert metrics["total"] == 10
assert metrics["overall"] == 0.5 # 5/10
async def test_get_recent_failures(self, temp_journal):
"""Should get recent failures."""
# Log failures and successes (last one is most recent)
await temp_journal.log_attempt(ModificationAttempt(
task_description="Rollback attempt",
outcome=Outcome.ROLLBACK,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Success",
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Failed attempt",
outcome=Outcome.FAILURE,
))
failures = await temp_journal.get_recent_failures(limit=10)
assert len(failures) == 2
# Most recent first (Failure was logged last)
assert failures[0].outcome == Outcome.FAILURE
assert failures[1].outcome == Outcome.ROLLBACK
@pytest.mark.asyncio
class TestModificationJournalUpdates:
"""Updating logged attempts."""
async def test_update_reflection(self, temp_journal):
"""Should update reflection for an attempt."""
attempt = ModificationAttempt(
task_description="Test task",
outcome=Outcome.SUCCESS,
)
attempt_id = await temp_journal.log_attempt(attempt)
# Update reflection
success = await temp_journal.update_reflection(
attempt_id,
"This worked well because...",
)
assert success is True
# Verify
retrieved = await temp_journal.get_by_id(attempt_id)
assert retrieved.reflection == "This worked well because..."
async def test_update_reflection_not_found(self, temp_journal):
"""Should return False for non-existent ID."""
success = await temp_journal.update_reflection(9999, "Reflection")
assert success is False
@pytest.mark.asyncio
class TestModificationJournalFileTracking:
"""Tracking attempts by file."""
async def test_get_attempts_for_file(self, temp_journal):
"""Should find all attempts that modified a file."""
await temp_journal.log_attempt(ModificationAttempt(
task_description="Fix app.py",
files_modified=["src/app.py", "src/config.py"],
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Update config only",
files_modified=["src/config.py"],
outcome=Outcome.SUCCESS,
))
await temp_journal.log_attempt(ModificationAttempt(
task_description="Other file",
files_modified=["src/other.py"],
outcome=Outcome.SUCCESS,
))
app_attempts = await temp_journal.get_attempts_for_file("src/app.py")
assert len(app_attempts) == 1
assert "src/app.py" in app_attempts[0].files_modified
@pytest.mark.asyncio
class TestModificationJournalIntegration:
"""Full workflow integration tests."""
async def test_full_workflow(self, temp_journal):
"""Complete workflow: log, find similar, get metrics."""
# Log some attempts
for i in range(3):
await temp_journal.log_attempt(ModificationAttempt(
task_description=f"Database optimization {i}",
approach="Add indexes",
files_modified=["src/db.py"],
outcome=Outcome.SUCCESS if i % 2 == 0 else Outcome.FAILURE,
))
# Find similar
similar = await temp_journal.find_similar("optimize database queries", limit=5)
assert len(similar) == 3
# Get success rate
metrics = await temp_journal.get_success_rate()
assert metrics["total"] == 3
assert metrics["success"] == 2
# Get recent failures
failures = await temp_journal.get_recent_failures(limit=5)
assert len(failures) == 1
# Get attempts for file
file_attempts = await temp_journal.get_attempts_for_file("src/db.py")
assert len(file_attempts) == 3
async def test_persistence(self):
"""Should persist across instances."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "persist.db"
# First instance
journal1 = ModificationJournal(db_path=db_path)
attempt_id = await journal1.log_attempt(ModificationAttempt(
task_description="Persistent attempt",
outcome=Outcome.SUCCESS,
))
# Second instance with same database
journal2 = ModificationJournal(db_path=db_path)
retrieved = await journal2.get_by_id(attempt_id)
assert retrieved is not None
assert retrieved.task_description == "Persistent attempt"

243
tests/test_reflection.py Normal file
View File

@@ -0,0 +1,243 @@
"""Tests for Reflection Service.
Tests fallback and LLM-based reflection generation.
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
from self_coding.modification_journal import ModificationAttempt, Outcome
from self_coding.reflection import ReflectionService
class MockLLMResponse:
"""Mock LLM response."""
def __init__(self, content: str, provider_used: str = "mock"):
self.content = content
self.provider_used = provider_used
self.latency_ms = 100.0
self.fallback_used = False
@pytest.mark.asyncio
class TestReflectionServiceFallback:
"""Fallback reflections without LLM."""
async def test_fallback_success(self):
"""Should generate fallback reflection for success."""
service = ReflectionService(llm_adapter=None)
attempt = ModificationAttempt(
task_description="Add error handling",
files_modified=["src/app.py"],
outcome=Outcome.SUCCESS,
)
reflection = await service.reflect_on_attempt(attempt)
assert "What went well" in reflection
assert "successfully completed" in reflection.lower()
assert "src/app.py" in reflection
async def test_fallback_failure(self):
"""Should generate fallback reflection for failure."""
service = ReflectionService(llm_adapter=None)
attempt = ModificationAttempt(
task_description="Refactor database",
files_modified=["src/db.py", "src/models.py"],
outcome=Outcome.FAILURE,
failure_analysis="Circular dependency",
retry_count=2,
)
reflection = await service.reflect_on_attempt(attempt)
assert "What went well" in reflection
assert "What could be improved" in reflection
assert "circular dependency" in reflection.lower()
assert "2 retries" in reflection
async def test_fallback_rollback(self):
"""Should generate fallback reflection for rollback."""
service = ReflectionService(llm_adapter=None)
attempt = ModificationAttempt(
task_description="Update API",
files_modified=["src/api.py"],
outcome=Outcome.ROLLBACK,
)
reflection = await service.reflect_on_attempt(attempt)
assert "What went well" in reflection
assert "rollback" in reflection.lower()
assert "preferable to shipping broken code" in reflection.lower()
@pytest.mark.asyncio
class TestReflectionServiceWithLLM:
"""Reflections with mock LLM."""
async def test_llm_reflection_success(self):
"""Should use LLM for reflection when available."""
mock_adapter = AsyncMock()
mock_adapter.chat.return_value = MockLLMResponse(
"**What went well:** Clean implementation\n"
"**What could be improved:** More tests\n"
"**Next time:** Add edge cases\n"
"**General lesson:** Always test errors"
)
service = ReflectionService(llm_adapter=mock_adapter)
attempt = ModificationAttempt(
task_description="Add validation",
approach="Use Pydantic",
files_modified=["src/validation.py"],
outcome=Outcome.SUCCESS,
test_results="5 passed",
)
reflection = await service.reflect_on_attempt(attempt)
assert "Clean implementation" in reflection
assert mock_adapter.chat.called
# Check the prompt was formatted correctly
call_args = mock_adapter.chat.call_args
assert "Add validation" in call_args.kwargs["message"]
assert "SUCCESS" in call_args.kwargs["message"]
async def test_llm_reflection_failure_fallback(self):
"""Should fallback when LLM fails."""
mock_adapter = AsyncMock()
mock_adapter.chat.side_effect = Exception("LLM timeout")
service = ReflectionService(llm_adapter=mock_adapter)
attempt = ModificationAttempt(
task_description="Fix bug",
outcome=Outcome.FAILURE,
)
reflection = await service.reflect_on_attempt(attempt)
# Should still return a reflection (fallback)
assert "What went well" in reflection
assert "What could be improved" in reflection
@pytest.mark.asyncio
class TestReflectionServiceWithContext:
"""Reflections with similar past attempts."""
async def test_reflect_with_context(self):
"""Should include past attempts in reflection."""
mock_adapter = AsyncMock()
mock_adapter.chat.return_value = MockLLMResponse(
"Reflection with historical context"
)
service = ReflectionService(llm_adapter=mock_adapter)
current = ModificationAttempt(
task_description="Add auth middleware",
outcome=Outcome.SUCCESS,
)
past = ModificationAttempt(
task_description="Add logging middleware",
outcome=Outcome.SUCCESS,
reflection="Good pattern: use decorators",
)
reflection = await service.reflect_with_context(current, [past])
assert reflection == "Reflection with historical context"
# Check context was included
call_args = mock_adapter.chat.call_args
assert "logging middleware" in call_args.kwargs["message"]
assert "Good pattern: use decorators" in call_args.kwargs["message"]
async def test_reflect_with_context_fallback(self):
"""Should fallback when LLM fails with context."""
mock_adapter = AsyncMock()
mock_adapter.chat.side_effect = Exception("LLM error")
service = ReflectionService(llm_adapter=mock_adapter)
current = ModificationAttempt(
task_description="Add feature",
outcome=Outcome.SUCCESS,
)
past = ModificationAttempt(
task_description="Past feature",
outcome=Outcome.SUCCESS,
reflection="Past lesson",
)
# Should fallback to regular reflection
reflection = await service.reflect_with_context(current, [past])
assert "What went well" in reflection
@pytest.mark.asyncio
class TestReflectionServiceEdgeCases:
"""Edge cases and error handling."""
async def test_empty_files_list(self):
"""Should handle empty files list."""
service = ReflectionService(llm_adapter=None)
attempt = ModificationAttempt(
task_description="Test task",
files_modified=[],
outcome=Outcome.SUCCESS,
)
reflection = await service.reflect_on_attempt(attempt)
assert "What went well" in reflection
assert "N/A" in reflection or "these files" in reflection
async def test_long_test_results_truncated(self):
"""Should truncate long test results in prompt."""
mock_adapter = AsyncMock()
mock_adapter.chat.return_value = MockLLMResponse("Short reflection")
service = ReflectionService(llm_adapter=mock_adapter)
attempt = ModificationAttempt(
task_description="Big refactor",
outcome=Outcome.FAILURE,
test_results="Error\n" * 1000, # Very long
)
await service.reflect_on_attempt(attempt)
# Check that test results were truncated in the prompt
call_args = mock_adapter.chat.call_args
prompt = call_args.kwargs["message"]
assert len(prompt) < 10000 # Should be truncated
async def test_no_approach_documented(self):
"""Should handle missing approach."""
service = ReflectionService(llm_adapter=None)
attempt = ModificationAttempt(
task_description="Quick fix",
approach="", # Empty
outcome=Outcome.SUCCESS,
)
reflection = await service.reflect_on_attempt(attempt)
assert "What went well" in reflection
assert "No approach documented" not in reflection # Should use fallback

View File

@@ -0,0 +1,475 @@
"""End-to-end integration tests for Self-Coding layer.
Tests the complete workflow: GitSafety + CodebaseIndexer + ModificationJournal + Reflection
working together.
"""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from self_coding import (
CodebaseIndexer,
GitSafety,
ModificationAttempt,
ModificationJournal,
Outcome,
ReflectionService,
Snapshot,
)
@pytest.fixture
def self_coding_env():
"""Create a complete self-coding environment with temp repo."""
with tempfile.TemporaryDirectory() as tmpdir:
repo_path = Path(tmpdir)
# Initialize git repo
import subprocess
subprocess.run(["git", "init"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "config", "user.email", "test@test.com"],
cwd=repo_path, check=True, capture_output=True,
)
subprocess.run(
["git", "config", "user.name", "Test User"],
cwd=repo_path, check=True, capture_output=True,
)
# Create src directory with real Python files
src_path = repo_path / "src" / "myproject"
src_path.mkdir(parents=True)
(src_path / "__init__.py").write_text("")
(src_path / "calculator.py").write_text('''
"""A simple calculator module."""
class Calculator:
"""Basic calculator with add/subtract."""
def add(self, a: int, b: int) -> int:
return a + b
def subtract(self, a: int, b: int) -> int:
return a - b
''')
(src_path / "utils.py").write_text('''
"""Utility functions."""
from myproject.calculator import Calculator
def calculate_total(items: list[int]) -> int:
calc = Calculator()
return sum(calc.add(0, item) for item in items)
''')
# Create tests
tests_path = repo_path / "tests"
tests_path.mkdir()
(tests_path / "test_calculator.py").write_text('''
"""Tests for calculator."""
from myproject.calculator import Calculator
def test_add():
calc = Calculator()
assert calc.add(2, 3) == 5
def test_subtract():
calc = Calculator()
assert calc.subtract(5, 3) == 2
''')
# Initial commit
subprocess.run(["git", "add", "."], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", "Initial commit"],
cwd=repo_path, check=True, capture_output=True,
)
subprocess.run(
["git", "branch", "-M", "main"],
cwd=repo_path, check=True, capture_output=True,
)
# Initialize services
git = GitSafety(
repo_path=repo_path,
main_branch="main",
test_command="python -m pytest tests/ -v",
)
indexer = CodebaseIndexer(
repo_path=repo_path,
db_path=repo_path / "codebase.db",
src_dirs=["src", "tests"],
)
journal = ModificationJournal(db_path=repo_path / "journal.db")
reflection = ReflectionService(llm_adapter=None)
yield {
"repo_path": repo_path,
"git": git,
"indexer": indexer,
"journal": journal,
"reflection": reflection,
}
@pytest.mark.asyncio
class TestSelfCodingGreenPath:
"""Happy path: successful self-modification workflow."""
async def test_complete_successful_modification(self, self_coding_env):
"""Full workflow: snapshot → branch → modify → test → commit → merge → log → reflect."""
env = self_coding_env
git = env["git"]
indexer = env["indexer"]
journal = env["journal"]
reflection = env["reflection"]
repo_path = env["repo_path"]
# 1. Index codebase to understand structure
await indexer.index_all()
# 2. Find relevant files for task
files = await indexer.get_relevant_files("add multiply method to calculator", limit=3)
assert "src/myproject/calculator.py" in files
# 3. Check for similar past attempts
similar = await journal.find_similar("add multiply method", limit=5)
# Should be empty (first attempt)
# 4. Take snapshot
snapshot = await git.snapshot(run_tests=False)
assert isinstance(snapshot, Snapshot)
# 5. Create feature branch
branch_name = "timmy/self-edit/add-multiply"
branch = await git.create_branch(branch_name)
assert branch == branch_name
# 6. Make modification (simulate adding multiply method)
calc_path = repo_path / "src" / "myproject" / "calculator.py"
content = calc_path.read_text()
new_method = '''
def multiply(self, a: int, b: int) -> int:
"""Multiply two numbers."""
return a * b
'''
# Insert before last method
content = content.rstrip() + "\n" + new_method + "\n"
calc_path.write_text(content)
# 7. Add test for new method
test_path = repo_path / "tests" / "test_calculator.py"
test_content = test_path.read_text()
new_test = '''
def test_multiply():
calc = Calculator()
assert calc.multiply(3, 4) == 12
'''
test_path.write_text(test_content.rstrip() + new_test + "\n")
# 8. Commit changes
commit_hash = await git.commit(
"Add multiply method to Calculator",
["src/myproject/calculator.py", "tests/test_calculator.py"],
)
assert len(commit_hash) == 40
# 9. Merge to main (skipping actual test run for speed)
merge_hash = await git.merge_to_main(branch, require_tests=False)
assert merge_hash != snapshot.commit_hash
# 10. Log the successful attempt
diff = await git.get_diff(snapshot.commit_hash)
attempt = ModificationAttempt(
task_description="Add multiply method to Calculator",
approach="Added multiply method with docstring and test",
files_modified=["src/myproject/calculator.py", "tests/test_calculator.py"],
diff=diff[:1000], # Truncate for storage
test_results="Tests passed",
outcome=Outcome.SUCCESS,
)
attempt_id = await journal.log_attempt(attempt)
# 11. Generate reflection
reflection_text = await reflection.reflect_on_attempt(attempt)
assert "What went well" in reflection_text
await journal.update_reflection(attempt_id, reflection_text)
# 12. Verify final state
final_commit = await git.get_current_commit()
assert final_commit == merge_hash
# Verify we're on main branch
current_branch = await git.get_current_branch()
assert current_branch == "main"
# Verify multiply method exists
final_content = calc_path.read_text()
assert "def multiply" in final_content
async def test_incremental_codebase_indexing(self, self_coding_env):
"""Codebase indexer should detect changes after modification."""
env = self_coding_env
indexer = env["indexer"]
# Initial index
stats1 = await indexer.index_all()
assert stats1["indexed"] == 4 # __init__.py, calculator.py, utils.py, test_calculator.py
# Add new file
new_file = env["repo_path"] / "src" / "myproject" / "new_module.py"
new_file.write_text('''
"""New module."""
def new_function(): pass
''')
# Incremental index should detect only the new file
stats2 = await indexer.index_changed()
assert stats2["indexed"] == 1
assert stats2["skipped"] == 4
@pytest.mark.asyncio
class TestSelfCodingRedPaths:
"""Error paths: failures, rollbacks, and recovery."""
async def test_rollback_on_test_failure(self, self_coding_env):
"""Should rollback when tests fail."""
env = self_coding_env
git = env["git"]
journal = env["journal"]
repo_path = env["repo_path"]
# Take snapshot
snapshot = await git.snapshot(run_tests=False)
original_commit = snapshot.commit_hash
# Create branch
branch = await git.create_branch("timmy/self-edit/bad-change")
# Make breaking change (remove add method)
calc_path = repo_path / "src" / "myproject" / "calculator.py"
calc_path.write_text('''
"""A simple calculator module."""
class Calculator:
"""Basic calculator - broken version."""
pass
''')
await git.commit("Remove methods (breaking change)")
# Log the failed attempt
attempt = ModificationAttempt(
task_description="Refactor Calculator class",
approach="Remove unused methods",
files_modified=["src/myproject/calculator.py"],
outcome=Outcome.FAILURE,
failure_analysis="Tests failed - removed methods that were used",
retry_count=0,
)
await journal.log_attempt(attempt)
# Rollback
await git.rollback(snapshot)
# Verify rollback
current = await git.get_current_commit()
assert current == original_commit
# Verify file restored
restored_content = calc_path.read_text()
assert "def add" in restored_content
async def test_find_similar_learns_from_failures(self, self_coding_env):
"""Should find similar past failures to avoid repeating mistakes."""
env = self_coding_env
journal = env["journal"]
# Log a failure
await journal.log_attempt(ModificationAttempt(
task_description="Add division method to calculator",
approach="Simple division without zero check",
files_modified=["src/myproject/calculator.py"],
outcome=Outcome.FAILURE,
failure_analysis="ZeroDivisionError not handled",
reflection="Always check for division by zero",
))
# Later, try similar task
similar = await journal.find_similar(
"Add modulo operation to calculator",
limit=5,
)
# Should find the past failure
assert len(similar) > 0
assert "division" in similar[0].task_description.lower()
async def test_dependency_chain_detects_blast_radius(self, self_coding_env):
"""Should detect which files depend on modified file."""
env = self_coding_env
indexer = env["indexer"]
await indexer.index_all()
# utils.py imports from calculator.py
deps = await indexer.get_dependency_chain("src/myproject/calculator.py")
assert "src/myproject/utils.py" in deps
async def test_success_rate_tracking(self, self_coding_env):
"""Should track success/failure metrics over time."""
env = self_coding_env
journal = env["journal"]
# Log mixed outcomes
for i in range(5):
await journal.log_attempt(ModificationAttempt(
task_description=f"Task {i}",
outcome=Outcome.SUCCESS if i % 2 == 0 else Outcome.FAILURE,
))
metrics = await journal.get_success_rate()
assert metrics["total"] == 5
assert metrics["success"] == 3
assert metrics["failure"] == 2
assert metrics["overall"] == 0.6
async def test_journal_persists_across_instances(self, self_coding_env):
"""Journal should persist even with new service instances."""
env = self_coding_env
db_path = env["repo_path"] / "persistent_journal.db"
# First instance logs attempt
journal1 = ModificationJournal(db_path=db_path)
attempt_id = await journal1.log_attempt(ModificationAttempt(
task_description="Persistent task",
outcome=Outcome.SUCCESS,
))
# New instance should see the attempt
journal2 = ModificationJournal(db_path=db_path)
retrieved = await journal2.get_by_id(attempt_id)
assert retrieved is not None
assert retrieved.task_description == "Persistent task"
@pytest.mark.asyncio
class TestSelfCodingSafetyConstraints:
"""Safety constraints and validation."""
async def test_only_modify_files_with_test_coverage(self, self_coding_env):
"""Should only allow modifying files that have tests."""
env = self_coding_env
indexer = env["indexer"]
await indexer.index_all()
# calculator.py has test coverage
assert await indexer.has_test_coverage("src/myproject/calculator.py")
# utils.py has no test file
assert not await indexer.has_test_coverage("src/myproject/utils.py")
async def test_cannot_delete_test_files(self, self_coding_env):
"""Safety check: should not delete test files."""
env = self_coding_env
git = env["git"]
repo_path = env["repo_path"]
snapshot = await git.snapshot(run_tests=False)
branch = await git.create_branch("timmy/self-edit/bad-idea")
# Try to delete test file
test_file = repo_path / "tests" / "test_calculator.py"
test_file.unlink()
# This would be caught by safety constraints in real implementation
# For now, verify the file is gone
assert not test_file.exists()
# Rollback should restore it
await git.rollback(snapshot)
assert test_file.exists()
async def test_branch_naming_convention(self, self_coding_env):
"""Branches should follow naming convention."""
env = self_coding_env
git = env["git"]
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
branch_name = f"timmy/self-edit/{timestamp}"
branch = await git.create_branch(branch_name)
assert branch.startswith("timmy/self-edit/")
@pytest.mark.asyncio
class TestSelfCodingErrorRecovery:
"""Error recovery scenarios."""
async def test_git_operation_timeout_handling(self, self_coding_env):
"""Should handle git operation timeouts gracefully."""
# This would require mocking subprocess to timeout
# For now, verify the timeout parameter exists
env = self_coding_env
git = env["git"]
# The _run_git method has timeout parameter
# If a git operation times out, it raises GitOperationError
assert hasattr(git, '_run_git')
async def test_journal_handles_concurrent_writes(self, self_coding_env):
"""Journal should handle multiple rapid writes."""
env = self_coding_env
journal = env["journal"]
# Log multiple attempts rapidly
ids = []
for i in range(10):
attempt_id = await journal.log_attempt(ModificationAttempt(
task_description=f"Concurrent task {i}",
outcome=Outcome.SUCCESS,
))
ids.append(attempt_id)
# All should be unique and retrievable
assert len(set(ids)) == 10
for attempt_id in ids:
retrieved = await journal.get_by_id(attempt_id)
assert retrieved is not None
async def test_indexer_handles_syntax_errors(self, self_coding_env):
"""Indexer should skip files with syntax errors."""
env = self_coding_env
indexer = env["indexer"]
repo_path = env["repo_path"]
# Create file with syntax error
bad_file = repo_path / "src" / "myproject" / "bad_syntax.py"
bad_file.write_text("def broken(:")
stats = await indexer.index_all()
# Should index good files, fail on bad one
assert stats["failed"] == 1
assert stats["indexed"] >= 4 # The good files

View File

@@ -545,3 +545,16 @@ class TestBuildQueueContext:
ctx = _build_queue_context()
assert isinstance(ctx, str)
assert ctx == ""
# ── Briefing Integration ──────────────────────────────────────────────────
def test_briefing_task_queue_summary():
"""Briefing engine should include task queue data."""
from task_queue.models import create_task
from timmy.briefing import _gather_task_queue_summary
create_task(title="Briefing integration test", created_by="test")
summary = _gather_task_queue_summary()
assert "pending" in summary.lower() or "task" in summary.lower()