Operation Darling Purge: slim to wealth core (-33,783 lines) (#121)

This commit is contained in:
Alexander Whitestone
2026-03-02 13:17:38 -05:00
committed by GitHub
parent f694eff0a4
commit 584eeb679e
183 changed files with 154 additions and 33807 deletions

View File

@@ -1,23 +0,0 @@
# self_coding/ — Module Guide
Self-modification infrastructure with safety constraints.
## Structure
- `git_safety.py` — Atomic git operations with rollback
- `codebase_indexer.py` — Live mental model of the codebase
- `modification_journal.py` — Persistent log of modification attempts
- `reflection.py` — Generate lessons learned
- `self_modify/` — Runtime self-modification loop (LLM-driven)
- `self_tdd/` — Continuous test watchdog
- `upgrades/` — Self-upgrade approval queue
## Entry points
```toml
self-tdd = "self_coding.self_tdd.watchdog:main"
self-modify = "self_coding.self_modify.cli:main"
```
## Testing
```bash
pytest tests/self_coding/ -q
```

View File

@@ -1,50 +0,0 @@
"""Self-Coding Layer — Timmy's ability to modify its own source code safely.
This module provides the foundational infrastructure for self-modification:
- GitSafety: Atomic git operations with rollback capability
- CodebaseIndexer: Live mental model of the codebase
- ModificationJournal: Persistent log of modification attempts
- ReflectionService: Generate lessons learned from attempts
Usage:
from self_coding import GitSafety, CodebaseIndexer, ModificationJournal
from self_coding import ModificationAttempt, Outcome, Snapshot
# Initialize services
git = GitSafety(repo_path="/path/to/repo")
indexer = CodebaseIndexer(repo_path="/path/to/repo")
journal = ModificationJournal()
# Use in self-modification workflow
snapshot = await git.snapshot()
# ... make changes ...
if tests_pass:
await git.commit("Changes", ["file.py"])
else:
await git.rollback(snapshot)
"""
from self_coding.git_safety import GitSafety, Snapshot
from self_coding.codebase_indexer import CodebaseIndexer, ModuleInfo, FunctionInfo, ClassInfo
from self_coding.modification_journal import (
ModificationJournal,
ModificationAttempt,
Outcome,
)
from self_coding.reflection import ReflectionService
__all__ = [
# Core services
"GitSafety",
"CodebaseIndexer",
"ModificationJournal",
"ReflectionService",
# Data classes
"Snapshot",
"ModuleInfo",
"FunctionInfo",
"ClassInfo",
"ModificationAttempt",
"Outcome",
]

View File

@@ -1,772 +0,0 @@
"""Codebase Indexer — Live mental model of Timmy's own codebase.
Parses Python files using AST to extract classes, functions, imports, and
docstrings. Builds a dependency graph and provides semantic search for
relevant files.
"""
from __future__ import annotations
import ast
import hashlib
import json
import logging
import sqlite3
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
# Default database location
DEFAULT_DB_PATH = Path("data/self_coding.db")
@dataclass
class FunctionInfo:
"""Information about a function."""
name: str
args: list[str]
returns: Optional[str] = None
docstring: Optional[str] = None
line_number: int = 0
is_async: bool = False
is_method: bool = False
@dataclass
class ClassInfo:
"""Information about a class."""
name: str
methods: list[FunctionInfo] = field(default_factory=list)
docstring: Optional[str] = None
line_number: int = 0
bases: list[str] = field(default_factory=list)
@dataclass
class ModuleInfo:
"""Information about a Python module."""
file_path: str
module_name: str
classes: list[ClassInfo] = field(default_factory=list)
functions: list[FunctionInfo] = field(default_factory=list)
imports: list[str] = field(default_factory=list)
docstring: Optional[str] = None
test_coverage: Optional[str] = None
class CodebaseIndexer:
"""Indexes Python codebase for self-modification workflows.
Parses all Python files using AST to extract:
- Module names and structure
- Class definitions with methods
- Function signatures with args and return types
- Import relationships
- Test coverage mapping
Stores everything in SQLite for fast querying.
Usage:
indexer = CodebaseIndexer(repo_path="/path/to/repo")
# Full reindex
await indexer.index_all()
# Incremental update
await indexer.index_changed()
# Get LLM context summary
summary = await indexer.get_summary()
# Find relevant files for a task
files = await indexer.get_relevant_files("Add error handling to health endpoint")
# Get dependency chain
deps = await indexer.get_dependency_chain("src/timmy/agent.py")
"""
def __init__(
self,
repo_path: Optional[str | Path] = None,
db_path: Optional[str | Path] = None,
src_dirs: Optional[list[str]] = None,
) -> None:
"""Initialize CodebaseIndexer.
Args:
repo_path: Root of repository to index. Defaults to current directory.
db_path: SQLite database path. Defaults to data/self_coding.db
src_dirs: Source directories to index. Defaults to ["src", "tests"]
"""
self.repo_path = Path(repo_path).resolve() if repo_path else Path.cwd()
self.db_path = Path(db_path) if db_path else DEFAULT_DB_PATH
self.src_dirs = src_dirs or ["src", "tests"]
self._ensure_schema()
logger.info("CodebaseIndexer initialized for %s", self.repo_path)
def _get_conn(self) -> sqlite3.Connection:
"""Get database connection with schema ensured."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _ensure_schema(self) -> None:
"""Create database tables if they don't exist."""
with self._get_conn() as conn:
# Main codebase index table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS codebase_index (
file_path TEXT PRIMARY KEY,
module_name TEXT NOT NULL,
classes JSON,
functions JSON,
imports JSON,
test_coverage TEXT,
last_indexed TIMESTAMP NOT NULL,
content_hash TEXT NOT NULL,
docstring TEXT,
embedding BLOB
)
"""
)
# Dependency graph table
conn.execute(
"""
CREATE TABLE IF NOT EXISTS dependency_graph (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_file TEXT NOT NULL,
target_file TEXT NOT NULL,
import_type TEXT NOT NULL,
UNIQUE(source_file, target_file)
)
"""
)
# Create indexes
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_module_name ON codebase_index(module_name)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_test_coverage ON codebase_index(test_coverage)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_deps_source ON dependency_graph(source_file)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_deps_target ON dependency_graph(target_file)"
)
conn.commit()
def _compute_hash(self, content: str) -> str:
"""Compute SHA-256 hash of file content."""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _find_python_files(self) -> list[Path]:
"""Find all Python files in source directories."""
files = []
for src_dir in self.src_dirs:
src_path = self.repo_path / src_dir
if src_path.exists():
files.extend(src_path.rglob("*.py"))
return sorted(files)
def _find_test_file(self, source_file: Path) -> Optional[str]:
"""Find corresponding test file for a source file.
Uses conventions:
- src/x/y.py -> tests/test_x_y.py
- src/x/y.py -> tests/x/test_y.py
- src/x/y.py -> tests/test_y.py
"""
rel_path = source_file.relative_to(self.repo_path)
# Only look for tests for files in src/
if not str(rel_path).startswith("src/"):
return None
# Try various test file naming conventions
possible_tests = [
# tests/test_module.py
self.repo_path / "tests" / f"test_{source_file.stem}.py",
# tests/test_path_module.py (flat)
self.repo_path / "tests" / f"test_{'_'.join(rel_path.with_suffix('').parts[1:])}.py",
]
# Try mirroring src structure in tests (tests/x/test_y.py)
try:
src_relative = rel_path.relative_to("src")
possible_tests.append(
self.repo_path / "tests" / src_relative.parent / f"test_{source_file.stem}.py"
)
except ValueError:
pass
for test_path in possible_tests:
if test_path.exists():
return str(test_path.relative_to(self.repo_path))
return None
def _parse_function(self, node: ast.FunctionDef | ast.AsyncFunctionDef, is_method: bool = False) -> FunctionInfo:
"""Parse a function definition node."""
args = []
# Handle different Python versions' AST structures
func_args = node.args
# Positional args
for arg in func_args.args:
arg_str = arg.arg
if arg.annotation:
arg_str += f": {ast.unparse(arg.annotation)}"
args.append(arg_str)
# Keyword-only args
for arg in func_args.kwonlyargs:
arg_str = arg.arg
if arg.annotation:
arg_str += f": {ast.unparse(arg.annotation)}"
args.append(arg_str)
# Return type
returns = None
if node.returns:
returns = ast.unparse(node.returns)
# Docstring
docstring = ast.get_docstring(node)
return FunctionInfo(
name=node.name,
args=args,
returns=returns,
docstring=docstring,
line_number=node.lineno,
is_async=isinstance(node, ast.AsyncFunctionDef),
is_method=is_method,
)
def _parse_class(self, node: ast.ClassDef) -> ClassInfo:
"""Parse a class definition node."""
methods = []
for item in node.body:
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
methods.append(self._parse_function(item, is_method=True))
# Get bases
bases = [ast.unparse(base) for base in node.bases]
return ClassInfo(
name=node.name,
methods=methods,
docstring=ast.get_docstring(node),
line_number=node.lineno,
bases=bases,
)
def _parse_module(self, file_path: Path) -> Optional[ModuleInfo]:
"""Parse a Python module file.
Args:
file_path: Path to Python file
Returns:
ModuleInfo or None if parsing fails
"""
try:
content = file_path.read_text(encoding="utf-8")
tree = ast.parse(content)
# Compute module name from file path
rel_path = file_path.relative_to(self.repo_path)
module_name = str(rel_path.with_suffix("")).replace("/", ".")
classes = []
functions = []
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for alias in node.names:
imports.append(f"{module}.{alias.name}")
# Get top-level definitions (not in classes)
for node in tree.body:
if isinstance(node, ast.ClassDef):
classes.append(self._parse_class(node))
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
functions.append(self._parse_function(node))
# Get module docstring
docstring = ast.get_docstring(tree)
# Find test coverage
test_coverage = self._find_test_file(file_path)
return ModuleInfo(
file_path=str(rel_path),
module_name=module_name,
classes=classes,
functions=functions,
imports=imports,
docstring=docstring,
test_coverage=test_coverage,
)
except SyntaxError as e:
logger.warning("Syntax error in %s: %s", file_path, e)
return None
except Exception as e:
logger.error("Failed to parse %s: %s", file_path, e)
return None
def _store_module(self, conn: sqlite3.Connection, module: ModuleInfo, content_hash: str) -> None:
"""Store module info in database."""
conn.execute(
"""
INSERT OR REPLACE INTO codebase_index
(file_path, module_name, classes, functions, imports, test_coverage,
last_indexed, content_hash, docstring)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
module.file_path,
module.module_name,
json.dumps([asdict(c) for c in module.classes]),
json.dumps([asdict(f) for f in module.functions]),
json.dumps(module.imports),
module.test_coverage,
datetime.now(timezone.utc).isoformat(),
content_hash,
module.docstring,
),
)
def _build_dependency_graph(self, conn: sqlite3.Connection) -> None:
"""Build and store dependency graph from imports."""
# Clear existing graph
conn.execute("DELETE FROM dependency_graph")
# Get all modules
rows = conn.execute("SELECT file_path, module_name, imports FROM codebase_index").fetchall()
# Map module names to file paths
module_to_file = {row["module_name"]: row["file_path"] for row in rows}
# Also map without src/ prefix for package imports like myproject.utils
module_to_file_alt = {}
for row in rows:
module_name = row["module_name"]
if module_name.startswith("src."):
alt_name = module_name[4:] # Remove "src." prefix
module_to_file_alt[alt_name] = row["file_path"]
# Build dependencies
for row in rows:
source_file = row["file_path"]
imports = json.loads(row["imports"])
for imp in imports:
# Try to resolve import to a file
# Handle both "module.name" and "module.name.Class" forms
# First try exact match
if imp in module_to_file:
conn.execute(
"""
INSERT OR IGNORE INTO dependency_graph
(source_file, target_file, import_type)
VALUES (?, ?, ?)
""",
(source_file, module_to_file[imp], "import"),
)
continue
# Try alternative name (without src/ prefix)
if imp in module_to_file_alt:
conn.execute(
"""
INSERT OR IGNORE INTO dependency_graph
(source_file, target_file, import_type)
VALUES (?, ?, ?)
""",
(source_file, module_to_file_alt[imp], "import"),
)
continue
# Try prefix match (import myproject.utils.Helper -> myproject.utils)
imp_parts = imp.split(".")
for i in range(len(imp_parts), 0, -1):
prefix = ".".join(imp_parts[:i])
# Try original module name
if prefix in module_to_file:
conn.execute(
"""
INSERT OR IGNORE INTO dependency_graph
(source_file, target_file, import_type)
VALUES (?, ?, ?)
""",
(source_file, module_to_file[prefix], "import"),
)
break
# Try alternative name (without src/ prefix)
if prefix in module_to_file_alt:
conn.execute(
"""
INSERT OR IGNORE INTO dependency_graph
(source_file, target_file, import_type)
VALUES (?, ?, ?)
""",
(source_file, module_to_file_alt[prefix], "import"),
)
break
conn.commit()
async def index_all(self) -> dict[str, int]:
"""Perform full reindex of all Python files.
Returns:
Dict with stats: {"indexed": int, "failed": int, "skipped": int}
"""
logger.info("Starting full codebase index")
files = self._find_python_files()
stats = {"indexed": 0, "failed": 0, "skipped": 0}
with self._get_conn() as conn:
for file_path in files:
try:
content = file_path.read_text(encoding="utf-8")
content_hash = self._compute_hash(content)
# Check if file needs reindexing
existing = conn.execute(
"SELECT content_hash FROM codebase_index WHERE file_path = ?",
(str(file_path.relative_to(self.repo_path)),),
).fetchone()
if existing and existing["content_hash"] == content_hash:
stats["skipped"] += 1
continue
module = self._parse_module(file_path)
if module:
self._store_module(conn, module, content_hash)
stats["indexed"] += 1
else:
stats["failed"] += 1
except Exception as e:
logger.error("Failed to index %s: %s", file_path, e)
stats["failed"] += 1
# Build dependency graph
self._build_dependency_graph(conn)
conn.commit()
logger.info(
"Indexing complete: %(indexed)d indexed, %(failed)d failed, %(skipped)d skipped",
stats,
)
return stats
async def index_changed(self) -> dict[str, int]:
"""Perform incremental index of only changed files.
Compares content hashes to detect changes.
Returns:
Dict with stats: {"indexed": int, "failed": int, "skipped": int}
"""
logger.info("Starting incremental codebase index")
files = self._find_python_files()
stats = {"indexed": 0, "failed": 0, "skipped": 0}
with self._get_conn() as conn:
for file_path in files:
try:
rel_path = str(file_path.relative_to(self.repo_path))
content = file_path.read_text(encoding="utf-8")
content_hash = self._compute_hash(content)
# Check if changed
existing = conn.execute(
"SELECT content_hash FROM codebase_index WHERE file_path = ?",
(rel_path,),
).fetchone()
if existing and existing["content_hash"] == content_hash:
stats["skipped"] += 1
continue
module = self._parse_module(file_path)
if module:
self._store_module(conn, module, content_hash)
stats["indexed"] += 1
else:
stats["failed"] += 1
except Exception as e:
logger.error("Failed to index %s: %s", file_path, e)
stats["failed"] += 1
# Rebuild dependency graph (some imports may have changed)
self._build_dependency_graph(conn)
conn.commit()
logger.info(
"Incremental indexing complete: %(indexed)d indexed, %(failed)d failed, %(skipped)d skipped",
stats,
)
return stats
async def get_summary(self, max_tokens: int = 4000) -> str:
"""Generate compressed codebase summary for LLM context.
Lists modules, their purposes, key classes/functions, and test coverage.
Keeps output under max_tokens (approximate).
Args:
max_tokens: Maximum approximate tokens for summary
Returns:
Summary string suitable for LLM context
"""
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT file_path, module_name, classes, functions, test_coverage, docstring
FROM codebase_index
ORDER BY module_name
"""
).fetchall()
lines = ["# Codebase Summary\n"]
lines.append(f"Total modules: {len(rows)}\n")
lines.append("---\n")
for row in rows:
module_name = row["module_name"]
file_path = row["file_path"]
docstring = row["docstring"]
test_coverage = row["test_coverage"]
lines.append(f"\n## {module_name}")
lines.append(f"File: `{file_path}`")
if test_coverage:
lines.append(f"Tests: `{test_coverage}`")
else:
lines.append("Tests: None")
if docstring:
# Take first line of docstring
first_line = docstring.split("\n")[0][:100]
lines.append(f"Purpose: {first_line}")
# Classes
classes = json.loads(row["classes"])
if classes:
lines.append("Classes:")
for cls in classes[:5]: # Limit to 5 classes
methods = [m["name"] for m in cls["methods"][:3]]
method_str = ", ".join(methods) + ("..." if len(cls["methods"]) > 3 else "")
lines.append(f" - {cls['name']}({method_str})")
if len(classes) > 5:
lines.append(f" ... and {len(classes) - 5} more")
# Functions
functions = json.loads(row["functions"])
if functions:
func_names = [f["name"] for f in functions[:5]]
func_str = ", ".join(func_names)
if len(functions) > 5:
func_str += f"... and {len(functions) - 5} more"
lines.append(f"Functions: {func_str}")
lines.append("")
summary = "\n".join(lines)
# Rough token estimation (1 token ≈ 4 characters)
if len(summary) > max_tokens * 4:
# Truncate with note
summary = summary[:max_tokens * 4]
summary += "\n\n[Summary truncated due to length]"
return summary
async def get_relevant_files(self, task_description: str, limit: int = 5) -> list[str]:
"""Find files relevant to a task description.
Uses keyword matching and import relationships. In Phase 2,
this will use semantic search with vector embeddings.
Args:
task_description: Natural language description of the task
limit: Maximum number of files to return
Returns:
List of file paths sorted by relevance
"""
# Simple keyword extraction for now
keywords = set(task_description.lower().split())
# Remove common words
keywords -= {"the", "a", "an", "to", "in", "on", "at", "for", "with", "and", "or", "of", "is", "are"}
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT file_path, module_name, classes, functions, docstring, test_coverage
FROM codebase_index
"""
).fetchall()
scored_files = []
for row in rows:
score = 0
file_path = row["file_path"].lower()
module_name = row["module_name"].lower()
docstring = (row["docstring"] or "").lower()
classes = json.loads(row["classes"])
functions = json.loads(row["functions"])
# Score based on keyword matches
for keyword in keywords:
if keyword in file_path:
score += 3
if keyword in module_name:
score += 2
if keyword in docstring:
score += 2
# Check class/function names
for cls in classes:
if keyword in cls["name"].lower():
score += 2
for method in cls["methods"]:
if keyword in method["name"].lower():
score += 1
for func in functions:
if keyword in func["name"].lower():
score += 1
# Boost files with test coverage (only if already matched)
if score > 0 and row["test_coverage"]:
score += 1
if score > 0:
scored_files.append((score, row["file_path"]))
# Sort by score descending, return top N
scored_files.sort(reverse=True, key=lambda x: x[0])
return [f[1] for f in scored_files[:limit]]
async def get_dependency_chain(self, file_path: str) -> list[str]:
"""Get all files that import the given file.
Useful for understanding blast radius of changes.
Args:
file_path: Path to file (relative to repo root)
Returns:
List of file paths that import this file
"""
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT source_file FROM dependency_graph
WHERE target_file = ?
""",
(file_path,),
).fetchall()
return [row["source_file"] for row in rows]
async def has_test_coverage(self, file_path: str) -> bool:
"""Check if a file has corresponding test coverage.
Args:
file_path: Path to file (relative to repo root)
Returns:
True if test file exists, False otherwise
"""
with self._get_conn() as conn:
row = conn.execute(
"SELECT test_coverage FROM codebase_index WHERE file_path = ?",
(file_path,),
).fetchone()
return row is not None and row["test_coverage"] is not None
async def get_module_info(self, file_path: str) -> Optional[ModuleInfo]:
"""Get detailed info for a specific module.
Args:
file_path: Path to file (relative to repo root)
Returns:
ModuleInfo or None if not indexed
"""
with self._get_conn() as conn:
row = conn.execute(
"""
SELECT file_path, module_name, classes, functions, imports,
test_coverage, docstring
FROM codebase_index
WHERE file_path = ?
""",
(file_path,),
).fetchone()
if not row:
return None
# Parse classes - convert dict methods to FunctionInfo objects
classes_data = json.loads(row["classes"])
classes = []
for cls_data in classes_data:
methods = [FunctionInfo(**m) for m in cls_data.get("methods", [])]
cls_info = ClassInfo(
name=cls_data["name"],
methods=methods,
docstring=cls_data.get("docstring"),
line_number=cls_data.get("line_number", 0),
bases=cls_data.get("bases", []),
)
classes.append(cls_info)
# Parse functions
functions_data = json.loads(row["functions"])
functions = [FunctionInfo(**f) for f in functions_data]
return ModuleInfo(
file_path=row["file_path"],
module_name=row["module_name"],
classes=classes,
functions=functions,
imports=json.loads(row["imports"]),
docstring=row["docstring"],
test_coverage=row["test_coverage"],
)

View File

@@ -1,505 +0,0 @@
"""Git Safety Layer — Atomic git operations with rollback capability.
All self-modifications happen on feature branches. Only merge to main after
full test suite passes. Snapshots enable rollback on failure.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import subprocess
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class Snapshot:
"""Immutable snapshot of repository state before modification.
Attributes:
commit_hash: Git commit hash at snapshot time
branch: Current branch name
timestamp: When snapshot was taken
test_status: Whether tests were passing at snapshot time
test_output: Pytest output from test run
clean: Whether working directory was clean
"""
commit_hash: str
branch: str
timestamp: datetime
test_status: bool
test_output: str
clean: bool
class GitSafetyError(Exception):
"""Base exception for git safety operations."""
pass
class GitNotRepositoryError(GitSafetyError):
"""Raised when operation is attempted outside a git repository."""
pass
class GitDirtyWorkingDirectoryError(GitSafetyError):
"""Raised when working directory is not clean and clean_required=True."""
pass
class GitOperationError(GitSafetyError):
"""Raised when a git operation fails."""
pass
class GitSafety:
"""Safe git operations for self-modification workflows.
All operations are atomic and support rollback. Self-modifications happen
on feature branches named 'timmy/self-edit/{timestamp}'. Only merged to
main after tests pass.
Usage:
safety = GitSafety(repo_path="/path/to/repo")
# Take snapshot before changes
snapshot = await safety.snapshot()
# Create feature branch
branch = await safety.create_branch(f"timmy/self-edit/{timestamp}")
# Make changes, commit them
await safety.commit("Add error handling", ["src/file.py"])
# Run tests, merge if pass
if tests_pass:
await safety.merge_to_main(branch)
else:
await safety.rollback(snapshot)
"""
def __init__(
self,
repo_path: Optional[str | Path] = None,
main_branch: str = "main",
test_command: str = "python -m pytest --tb=short -q",
) -> None:
"""Initialize GitSafety with repository path.
Args:
repo_path: Path to git repository. Defaults to current working directory.
main_branch: Name of main branch (main, master, etc.)
test_command: Command to run tests for snapshot validation
"""
self.repo_path = Path(repo_path).resolve() if repo_path else Path.cwd()
self.main_branch = main_branch
self.test_command = test_command
self._verify_git_repo()
logger.info("GitSafety initialized for %s", self.repo_path)
def _verify_git_repo(self) -> None:
"""Verify that repo_path is a git repository."""
git_dir = self.repo_path / ".git"
if not git_dir.exists():
raise GitNotRepositoryError(
f"{self.repo_path} is not a git repository"
)
async def _run_git(
self,
*args: str,
check: bool = True,
capture_output: bool = True,
timeout: float = 30.0,
) -> subprocess.CompletedProcess:
"""Run a git command asynchronously.
Args:
*args: Git command arguments
check: Whether to raise on non-zero exit
capture_output: Whether to capture stdout/stderr
timeout: Maximum time to wait for command
Returns:
CompletedProcess with returncode, stdout, stderr
Raises:
GitOperationError: If git command fails and check=True
"""
cmd = ["git", *args]
logger.debug("Running: %s", " ".join(cmd))
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
cwd=self.repo_path,
stdout=asyncio.subprocess.PIPE if capture_output else None,
stderr=asyncio.subprocess.PIPE if capture_output else None,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
result = subprocess.CompletedProcess(
args=cmd,
returncode=proc.returncode or 0,
stdout=stdout.decode() if stdout else "",
stderr=stderr.decode() if stderr else "",
)
if check and result.returncode != 0:
raise GitOperationError(
f"Git command failed: {' '.join(args)}\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)
return result
except asyncio.TimeoutError as e:
proc.kill()
raise GitOperationError(f"Git command timed out after {timeout}s: {' '.join(args)}") from e
async def _run_shell(
self,
command: str,
timeout: float = 120.0,
) -> subprocess.CompletedProcess:
"""Run a shell command asynchronously.
Args:
command: Shell command to run
timeout: Maximum time to wait
Returns:
CompletedProcess with returncode, stdout, stderr
"""
logger.debug("Running shell: %s", command)
proc = await asyncio.create_subprocess_shell(
command,
cwd=self.repo_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
return subprocess.CompletedProcess(
args=command,
returncode=proc.returncode or 0,
stdout=stdout.decode(),
stderr=stderr.decode(),
)
async def is_clean(self) -> bool:
"""Check if working directory is clean (no uncommitted changes).
Returns:
True if clean, False if there are uncommitted changes
"""
result = await self._run_git("status", "--porcelain", check=False)
return result.stdout.strip() == ""
async def get_current_branch(self) -> str:
"""Get current git branch name.
Returns:
Current branch name
"""
result = await self._run_git("branch", "--show-current")
return result.stdout.strip()
async def get_current_commit(self) -> str:
"""Get current commit hash.
Returns:
Full commit hash
"""
result = await self._run_git("rev-parse", "HEAD")
return result.stdout.strip()
async def _run_tests(self) -> tuple[bool, str]:
"""Run test suite and return results.
Returns:
Tuple of (all_passed, test_output)
"""
logger.info("Running tests: %s", self.test_command)
result = await self._run_shell(self.test_command, timeout=300.0)
passed = result.returncode == 0
output = result.stdout + "\n" + result.stderr
if passed:
logger.info("Tests passed")
else:
logger.warning("Tests failed with returncode %d", result.returncode)
return passed, output
async def snapshot(self, run_tests: bool = True) -> Snapshot:
"""Take a snapshot of current repository state.
Captures commit hash, branch, test status. Used for rollback if
modifications fail.
Args:
run_tests: Whether to run tests as part of snapshot
Returns:
Snapshot object with current state
Raises:
GitOperationError: If git operations fail
"""
logger.info("Taking snapshot of repository state")
commit_hash = await self.get_current_commit()
branch = await self.get_current_branch()
clean = await self.is_clean()
timestamp = datetime.now(timezone.utc)
test_status = False
test_output = ""
if run_tests:
test_status, test_output = await self._run_tests()
else:
test_status = True # Assume OK if not running tests
test_output = "Tests skipped"
snapshot = Snapshot(
commit_hash=commit_hash,
branch=branch,
timestamp=timestamp,
test_status=test_status,
test_output=test_output,
clean=clean,
)
logger.info(
"Snapshot taken: %s@%s (clean=%s, tests=%s)",
branch,
commit_hash[:8],
clean,
test_status,
)
return snapshot
async def create_branch(self, name: str, base: Optional[str] = None) -> str:
"""Create and checkout a new feature branch.
Args:
name: Branch name (e.g., 'timmy/self-edit/20260226-143022')
base: Base branch to create from (defaults to main_branch)
Returns:
Name of created branch
Raises:
GitOperationError: If branch creation fails
"""
base = base or self.main_branch
# Ensure we're on base branch and it's up to date
await self._run_git("checkout", base)
await self._run_git("pull", "origin", base, check=False) # May fail if no remote
# Create and checkout new branch
await self._run_git("checkout", "-b", name)
logger.info("Created branch %s from %s", name, base)
return name
async def commit(
self,
message: str,
files: Optional[list[str | Path]] = None,
allow_empty: bool = False,
) -> str:
"""Commit changes to current branch.
Args:
message: Commit message
files: Specific files to commit (None = all changes)
allow_empty: Whether to allow empty commits
Returns:
Commit hash of new commit
Raises:
GitOperationError: If commit fails
"""
# Add files
if files:
for file_path in files:
full_path = self.repo_path / file_path
if not full_path.exists():
logger.warning("File does not exist: %s", file_path)
await self._run_git("add", str(file_path))
else:
await self._run_git("add", "-A")
# Check if there's anything to commit
if not allow_empty:
diff_result = await self._run_git(
"diff", "--cached", "--quiet", check=False
)
if diff_result.returncode == 0:
logger.warning("No changes to commit")
return await self.get_current_commit()
# Commit
commit_args = ["commit", "-m", message]
if allow_empty:
commit_args.append("--allow-empty")
await self._run_git(*commit_args)
commit_hash = await self.get_current_commit()
logger.info("Committed %s: %s", commit_hash[:8], message)
return commit_hash
async def get_diff(self, from_hash: str, to_hash: Optional[str] = None) -> str:
"""Get diff between commits.
Args:
from_hash: Starting commit hash (or Snapshot object hash)
to_hash: Ending commit hash (None = current)
Returns:
Git diff as string
"""
args = ["diff", from_hash]
if to_hash:
args.append(to_hash)
result = await self._run_git(*args)
return result.stdout
async def rollback(self, snapshot: Snapshot | str) -> str:
"""Rollback to a previous snapshot.
Hard resets to the snapshot commit and deletes any uncommitted changes.
Use with caution — this is destructive.
Args:
snapshot: Snapshot object or commit hash to rollback to
Returns:
Commit hash after rollback
Raises:
GitOperationError: If rollback fails
"""
if isinstance(snapshot, Snapshot):
target_hash = snapshot.commit_hash
target_branch = snapshot.branch
else:
target_hash = snapshot
target_branch = None
logger.warning("Rolling back to %s", target_hash[:8])
# Reset to target commit
await self._run_git("reset", "--hard", target_hash)
# Clean any untracked files
await self._run_git("clean", "-fd")
# If we know the original branch, switch back to it
if target_branch:
branch_exists = await self._run_git(
"branch", "--list", target_branch, check=False
)
if branch_exists.stdout.strip():
await self._run_git("checkout", target_branch)
logger.info("Switched back to branch %s", target_branch)
current = await self.get_current_commit()
logger.info("Rolled back to %s", current[:8])
return current
async def merge_to_main(
self,
branch: str,
require_tests: bool = True,
) -> str:
"""Merge a feature branch into main after tests pass.
Args:
branch: Feature branch to merge
require_tests: Whether to require tests to pass before merging
Returns:
Merge commit hash
Raises:
GitOperationError: If merge fails or tests don't pass
"""
logger.info("Preparing to merge %s into %s", branch, self.main_branch)
# Checkout the feature branch and run tests
await self._run_git("checkout", branch)
if require_tests:
passed, output = await self._run_tests()
if not passed:
raise GitOperationError(
f"Cannot merge {branch}: tests failed\n{output}"
)
# Checkout main and merge
await self._run_git("checkout", self.main_branch)
await self._run_git("merge", "--no-ff", "-m", f"Merge {branch}", branch)
# Optionally delete the feature branch
await self._run_git("branch", "-d", branch, check=False)
merge_hash = await self.get_current_commit()
logger.info("Merged %s into %s: %s", branch, self.main_branch, merge_hash[:8])
return merge_hash
async def get_modified_files(self, since_hash: Optional[str] = None) -> list[str]:
"""Get list of files modified since a commit.
Args:
since_hash: Commit to compare against (None = uncommitted changes)
Returns:
List of modified file paths
"""
if since_hash:
result = await self._run_git(
"diff", "--name-only", since_hash, "HEAD"
)
else:
result = await self._run_git(
"diff", "--name-only", "HEAD"
)
files = [f.strip() for f in result.stdout.split("\n") if f.strip()]
return files
async def stage_file(self, file_path: str | Path) -> None:
"""Stage a single file for commit.
Args:
file_path: Path to file relative to repo root
"""
await self._run_git("add", str(file_path))
logger.debug("Staged %s", file_path)

View File

@@ -1,425 +0,0 @@
"""Modification Journal — Persistent log of self-modification attempts.
Tracks successes and failures so Timmy can learn from experience.
Supports semantic search for similar past attempts.
"""
from __future__ import annotations
import json
import logging
import sqlite3
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Default database location
DEFAULT_DB_PATH = Path("data/self_coding.db")
class Outcome(str, Enum):
"""Possible outcomes of a modification attempt."""
SUCCESS = "success"
FAILURE = "failure"
ROLLBACK = "rollback"
@dataclass
class ModificationAttempt:
"""A single self-modification attempt.
Attributes:
id: Unique identifier (auto-generated by database)
timestamp: When the attempt was made
task_description: What was Timmy trying to do
approach: Strategy/approach planned
files_modified: List of file paths that were modified
diff: The actual git diff of changes
test_results: Pytest output
outcome: success, failure, or rollback
failure_analysis: LLM-generated analysis of why it failed
reflection: LLM-generated lessons learned
retry_count: Number of retry attempts
embedding: Vector embedding of task_description (for semantic search)
"""
task_description: str
approach: str = ""
files_modified: list[str] = field(default_factory=list)
diff: str = ""
test_results: str = ""
outcome: Outcome = Outcome.FAILURE
failure_analysis: str = ""
reflection: str = ""
retry_count: int = 0
id: Optional[int] = None
timestamp: Optional[datetime] = None
embedding: Optional[bytes] = None
class ModificationJournal:
"""Persistent log of self-modification attempts.
Before any self-modification, Timmy should query the journal for
similar past attempts and include relevant ones in the LLM context.
Usage:
journal = ModificationJournal()
# Log an attempt
attempt = ModificationAttempt(
task_description="Add error handling",
files_modified=["src/app.py"],
outcome=Outcome.SUCCESS,
)
await journal.log_attempt(attempt)
# Find similar past attempts
similar = await journal.find_similar("Add error handling to endpoints")
# Get success metrics
metrics = await journal.get_success_rate()
"""
def __init__(
self,
db_path: Optional[str | Path] = None,
) -> None:
"""Initialize ModificationJournal.
Args:
db_path: SQLite database path. Defaults to data/self_coding.db
"""
self.db_path = Path(db_path) if db_path else DEFAULT_DB_PATH
self._ensure_schema()
logger.info("ModificationJournal initialized at %s", self.db_path)
def _get_conn(self) -> sqlite3.Connection:
"""Get database connection with schema ensured."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _ensure_schema(self) -> None:
"""Create database tables if they don't exist."""
with self._get_conn() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS modification_journal (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
task_description TEXT NOT NULL,
approach TEXT,
files_modified JSON,
diff TEXT,
test_results TEXT,
outcome TEXT CHECK(outcome IN ('success', 'failure', 'rollback')),
failure_analysis TEXT,
reflection TEXT,
retry_count INTEGER DEFAULT 0,
embedding BLOB
)
"""
)
# Create indexes for common queries
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_journal_outcome ON modification_journal(outcome)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_journal_timestamp ON modification_journal(timestamp)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_journal_task ON modification_journal(task_description)"
)
conn.commit()
async def log_attempt(self, attempt: ModificationAttempt) -> int:
"""Log a modification attempt to the journal.
Args:
attempt: The modification attempt to log
Returns:
ID of the logged entry
"""
with self._get_conn() as conn:
cursor = conn.execute(
"""
INSERT INTO modification_journal
(task_description, approach, files_modified, diff, test_results,
outcome, failure_analysis, reflection, retry_count, embedding)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
attempt.task_description,
attempt.approach,
json.dumps(attempt.files_modified),
attempt.diff,
attempt.test_results,
attempt.outcome.value,
attempt.failure_analysis,
attempt.reflection,
attempt.retry_count,
attempt.embedding,
),
)
conn.commit()
attempt_id = cursor.lastrowid
logger.info(
"Logged modification attempt %d: %s (%s)",
attempt_id,
attempt.task_description[:50],
attempt.outcome.value,
)
return attempt_id
async def find_similar(
self,
task_description: str,
limit: int = 5,
include_outcomes: Optional[list[Outcome]] = None,
) -> list[ModificationAttempt]:
"""Find similar past modification attempts.
Uses keyword matching for now. In Phase 2, will use vector embeddings
for semantic search.
Args:
task_description: Task to find similar attempts for
limit: Maximum number of results
include_outcomes: Filter by outcomes (None = all)
Returns:
List of similar modification attempts
"""
# Extract keywords from task description
keywords = set(task_description.lower().split())
keywords -= {"the", "a", "an", "to", "in", "on", "at", "for", "with", "and", "or", "of", "is", "are"}
with self._get_conn() as conn:
# Build query
if include_outcomes:
outcome_filter = "AND outcome IN ({})".format(
",".join("?" * len(include_outcomes))
)
outcome_values = [o.value for o in include_outcomes]
else:
outcome_filter = ""
outcome_values = []
rows = conn.execute(
f"""
SELECT id, timestamp, task_description, approach, files_modified,
diff, test_results, outcome, failure_analysis, reflection,
retry_count
FROM modification_journal
WHERE 1=1 {outcome_filter}
ORDER BY timestamp DESC
LIMIT ?
""",
outcome_values + [limit * 3], # Get more for scoring
).fetchall()
# Score by keyword match
scored = []
for row in rows:
score = 0
task = row["task_description"].lower()
approach = (row["approach"] or "").lower()
for kw in keywords:
if kw in task:
score += 3
if kw in approach:
score += 1
# Boost recent attempts (only if already matched)
if score > 0:
timestamp = datetime.fromisoformat(row["timestamp"])
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - timestamp).days
if age_days < 7:
score += 2
elif age_days < 30:
score += 1
if score > 0:
scored.append((score, row))
# Sort by score, take top N
scored.sort(reverse=True, key=lambda x: x[0])
top_rows = scored[:limit]
# Convert to ModificationAttempt objects
return [self._row_to_attempt(row) for _, row in top_rows]
async def get_success_rate(self) -> dict[str, float]:
"""Get success rate metrics.
Returns:
Dict with overall and per-category success rates:
{
"overall": float, # 0.0 to 1.0
"success": int, # count
"failure": int, # count
"rollback": int, # count
"total": int, # total attempts
}
"""
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT outcome, COUNT(*) as count
FROM modification_journal
GROUP BY outcome
"""
).fetchall()
counts = {row["outcome"]: row["count"] for row in rows}
success = counts.get("success", 0)
failure = counts.get("failure", 0)
rollback = counts.get("rollback", 0)
total = success + failure + rollback
overall = success / total if total > 0 else 0.0
return {
"overall": overall,
"success": success,
"failure": failure,
"rollback": rollback,
"total": total,
}
async def get_recent_failures(self, limit: int = 10) -> list[ModificationAttempt]:
"""Get recent failed attempts with their analyses.
Args:
limit: Maximum number of failures to return
Returns:
List of failed modification attempts
"""
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT id, timestamp, task_description, approach, files_modified,
diff, test_results, outcome, failure_analysis, reflection,
retry_count
FROM modification_journal
WHERE outcome IN ('failure', 'rollback')
ORDER BY timestamp DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [self._row_to_attempt(row) for row in rows]
async def get_by_id(self, attempt_id: int) -> Optional[ModificationAttempt]:
"""Get a specific modification attempt by ID.
Args:
attempt_id: ID of the attempt
Returns:
ModificationAttempt or None if not found
"""
with self._get_conn() as conn:
row = conn.execute(
"""
SELECT id, timestamp, task_description, approach, files_modified,
diff, test_results, outcome, failure_analysis, reflection,
retry_count
FROM modification_journal
WHERE id = ?
""",
(attempt_id,),
).fetchone()
if not row:
return None
return self._row_to_attempt(row)
async def update_reflection(self, attempt_id: int, reflection: str) -> bool:
"""Update the reflection for a modification attempt.
Args:
attempt_id: ID of the attempt
reflection: New reflection text
Returns:
True if updated, False if not found
"""
with self._get_conn() as conn:
cursor = conn.execute(
"""
UPDATE modification_journal
SET reflection = ?
WHERE id = ?
""",
(reflection, attempt_id),
)
conn.commit()
if cursor.rowcount > 0:
logger.info("Updated reflection for attempt %d", attempt_id)
return True
return False
async def get_attempts_for_file(
self,
file_path: str,
limit: int = 10,
) -> list[ModificationAttempt]:
"""Get all attempts that modified a specific file.
Args:
file_path: Path to file (relative to repo root)
limit: Maximum number of attempts
Returns:
List of modification attempts affecting this file
"""
with self._get_conn() as conn:
# Try exact match first, then partial match
rows = conn.execute(
"""
SELECT id, timestamp, task_description, approach, files_modified,
diff, test_results, outcome, failure_analysis, reflection,
retry_count
FROM modification_journal
WHERE files_modified LIKE ? OR files_modified LIKE ?
ORDER BY timestamp DESC
LIMIT ?
""",
(f'%"{file_path}"%', f'%{file_path}%', limit),
).fetchall()
return [self._row_to_attempt(row) for row in rows]
def _row_to_attempt(self, row: sqlite3.Row) -> ModificationAttempt:
"""Convert a database row to ModificationAttempt."""
return ModificationAttempt(
id=row["id"],
timestamp=datetime.fromisoformat(row["timestamp"]),
task_description=row["task_description"],
approach=row["approach"] or "",
files_modified=json.loads(row["files_modified"] or "[]"),
diff=row["diff"] or "",
test_results=row["test_results"] or "",
outcome=Outcome(row["outcome"]),
failure_analysis=row["failure_analysis"] or "",
reflection=row["reflection"] or "",
retry_count=row["retry_count"] or 0,
)

View File

@@ -1,259 +0,0 @@
"""Reflection Service — Generate lessons learned from modification attempts.
After every self-modification (success or failure), the Reflection Service
prompts an LLM to analyze the attempt and extract actionable insights.
"""
from __future__ import annotations
import logging
from typing import Optional
from self_coding.modification_journal import ModificationAttempt, Outcome
logger = logging.getLogger(__name__)
REFLECTION_SYSTEM_PROMPT = """You are a software engineering mentor analyzing a self-modification attempt.
Your goal is to provide constructive, specific feedback that helps improve future attempts.
Focus on patterns and principles rather than one-off issues.
Be concise but insightful. Maximum 300 words."""
REFLECTION_PROMPT_TEMPLATE = """A software agent just attempted to modify its own source code.
Task: {task_description}
Approach: {approach}
Files modified: {files_modified}
Outcome: {outcome}
Test results: {test_results}
{failure_section}
Reflect on this attempt:
1. What went well? (Be specific about techniques or strategies)
2. What could be improved? (Focus on process, not just the code)
3. What would you do differently next time?
4. What general lesson can be extracted for future similar tasks?
Provide your reflection in a structured format:
**What went well:**
[Your analysis]
**What could be improved:**
[Your analysis]
**Next time:**
[Specific actionable change]
**General lesson:**
[Extracted principle for similar tasks]"""
class ReflectionService:
"""Generates reflections on self-modification attempts.
Uses an LLM to analyze attempts and extract lessons learned.
Stores reflections in the Modification Journal for future reference.
Usage:
from self_coding.reflection import ReflectionService
from timmy.cascade_adapter import TimmyCascadeAdapter
adapter = TimmyCascadeAdapter()
reflection_service = ReflectionService(llm_adapter=adapter)
# After a modification attempt
reflection_text = await reflection_service.reflect_on_attempt(attempt)
# Store in journal
await journal.update_reflection(attempt_id, reflection_text)
"""
def __init__(
self,
llm_adapter: Optional[object] = None,
model_preference: str = "fast", # "fast" or "quality"
) -> None:
"""Initialize ReflectionService.
Args:
llm_adapter: LLM adapter (e.g., TimmyCascadeAdapter)
model_preference: "fast" for quick reflections, "quality" for deeper analysis
"""
self.llm_adapter = llm_adapter
self.model_preference = model_preference
logger.info("ReflectionService initialized")
async def reflect_on_attempt(self, attempt: ModificationAttempt) -> str:
"""Generate a reflection on a modification attempt.
Args:
attempt: The modification attempt to reflect on
Returns:
Reflection text (structured markdown)
"""
# Build the prompt
failure_section = ""
if attempt.outcome == Outcome.FAILURE and attempt.failure_analysis:
failure_section = f"\nFailure analysis: {attempt.failure_analysis}"
prompt = REFLECTION_PROMPT_TEMPLATE.format(
task_description=attempt.task_description,
approach=attempt.approach or "(No approach documented)",
files_modified=", ".join(attempt.files_modified) if attempt.files_modified else "(No files modified)",
outcome=attempt.outcome.value.upper(),
test_results=attempt.test_results[:500] if attempt.test_results else "(No test results)",
failure_section=failure_section,
)
# Call LLM if available
if self.llm_adapter:
try:
response = await self.llm_adapter.chat(
message=prompt,
context=REFLECTION_SYSTEM_PROMPT,
)
reflection = response.content.strip()
logger.info("Generated reflection for attempt (via %s)",
response.provider_used)
return reflection
except Exception as e:
logger.error("LLM reflection failed: %s", e)
return self._generate_fallback_reflection(attempt)
else:
# No LLM available, use fallback
return self._generate_fallback_reflection(attempt)
def _generate_fallback_reflection(self, attempt: ModificationAttempt) -> str:
"""Generate a basic reflection without LLM.
Used when no LLM adapter is available or LLM call fails.
Args:
attempt: The modification attempt
Returns:
Basic reflection text
"""
if attempt.outcome == Outcome.SUCCESS:
return f"""**What went well:**
Successfully completed: {attempt.task_description}
Files modified: {', '.join(attempt.files_modified) if attempt.files_modified else 'N/A'}
**What could be improved:**
Document the approach taken for future reference.
**Next time:**
Use the same pattern for similar tasks.
**General lesson:**
Modifications to {', '.join(attempt.files_modified) if attempt.files_modified else 'these files'} should include proper test coverage."""
elif attempt.outcome == Outcome.FAILURE:
return f"""**What went well:**
Attempted: {attempt.task_description}
**What could be improved:**
The modification failed after {attempt.retry_count} retries.
{attempt.failure_analysis if attempt.failure_analysis else 'Failure reason not documented.'}
**Next time:**
Consider breaking the task into smaller steps.
Validate approach with simpler test case first.
**General lesson:**
Changes affecting {', '.join(attempt.files_modified) if attempt.files_modified else 'multiple files'} require careful dependency analysis."""
else: # ROLLBACK
return f"""**What went well:**
Recognized failure and rolled back to maintain stability.
**What could be improved:**
Early detection of issues before full implementation.
**Next time:**
Run tests more frequently during development.
Use smaller incremental commits.
**General lesson:**
Rollback is preferable to shipping broken code."""
async def reflect_with_context(
self,
attempt: ModificationAttempt,
similar_attempts: list[ModificationAttempt],
) -> str:
"""Generate reflection with context from similar past attempts.
Includes relevant past reflections to build cumulative learning.
Args:
attempt: The current modification attempt
similar_attempts: Similar past attempts (with reflections)
Returns:
Reflection text incorporating past learnings
"""
# Build context from similar attempts
context_parts = []
for past in similar_attempts[:3]: # Top 3 similar
if past.reflection:
context_parts.append(
f"Past similar task ({past.outcome.value}):\n"
f"Task: {past.task_description}\n"
f"Lesson: {past.reflection[:200]}..."
)
context = "\n\n".join(context_parts)
# Build enhanced prompt
failure_section = ""
if attempt.outcome == Outcome.FAILURE and attempt.failure_analysis:
failure_section = f"\nFailure analysis: {attempt.failure_analysis}"
enhanced_prompt = f"""A software agent just attempted to modify its own source code.
Task: {attempt.task_description}
Approach: {attempt.approach or "(No approach documented)"}
Files modified: {', '.join(attempt.files_modified) if attempt.files_modified else "(No files modified)"}
Outcome: {attempt.outcome.value.upper()}
Test results: {attempt.test_results[:500] if attempt.test_results else "(No test results)"}
{failure_section}
---
Relevant past attempts:
{context if context else "(No similar past attempts)"}
---
Given this history, reflect on the current attempt:
1. What went well?
2. What could be improved?
3. How does this compare to past similar attempts?
4. What pattern or principle should guide future similar tasks?
Provide your reflection in a structured format:
**What went well:**
**What could be improved:**
**Comparison to past attempts:**
**Guiding principle:**"""
if self.llm_adapter:
try:
response = await self.llm_adapter.chat(
message=enhanced_prompt,
context=REFLECTION_SYSTEM_PROMPT,
)
return response.content.strip()
except Exception as e:
logger.error("LLM reflection with context failed: %s", e)
return await self.reflect_on_attempt(attempt)
else:
return await self.reflect_on_attempt(attempt)

View File

@@ -1 +0,0 @@
"""Self-Modify — Runtime self-modification with safety constraints."""

View File

@@ -1,134 +0,0 @@
"""CLI for self-modification — run from the terminal.
Usage:
self-modify run "Add a docstring to src/timmy/prompts.py" --file src/timmy/prompts.py
self-modify run "Fix the bug in config" --dry-run
self-modify run "Add logging" --backend anthropic --autonomous
self-modify status
"""
import logging
import os
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
console = Console()
app = typer.Typer(help="Timmy self-modify — edit code, run tests, commit")
@app.command()
def run(
instruction: str = typer.Argument(..., help="What to change (natural language)"),
file: Optional[list[str]] = typer.Option(None, "--file", "-f", help="Target file(s) to modify"),
dry_run: bool = typer.Option(False, "--dry-run", "-n", help="Generate edits but don't write"),
retries: int = typer.Option(2, "--retries", "-r", help="Max retry attempts on test failure"),
backend: Optional[str] = typer.Option(None, "--backend", "-b", help="LLM backend: ollama, anthropic, auto"),
autonomous: bool = typer.Option(False, "--autonomous", "-a", help="Enable autonomous self-correction"),
max_cycles: int = typer.Option(3, "--max-cycles", help="Max autonomous correction cycles"),
branch: bool = typer.Option(False, "--branch", help="Create a git branch (off by default to avoid container restarts)"),
speak: bool = typer.Option(False, "--speak", "-s", help="Speak the result via TTS"),
):
"""Run the self-modification loop."""
# Force enable for CLI usage
os.environ["SELF_MODIFY_ENABLED"] = "true"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s -- %(message)s",
datefmt="%H:%M:%S",
)
# Skip branch creation unless explicitly requested
if not branch:
os.environ["SELF_MODIFY_SKIP_BRANCH"] = "1"
from self_coding.self_modify.loop import SelfModifyLoop, ModifyRequest
target_files = list(file) if file else []
effective_backend = backend or os.environ.get("SELF_MODIFY_BACKEND", "auto")
console.print(Panel(
f"[bold]Instruction:[/bold] {instruction}\n"
f"[bold]Files:[/bold] {', '.join(target_files) or '(auto-detect)'}\n"
f"[bold]Backend:[/bold] {effective_backend}\n"
f"[bold]Autonomous:[/bold] {autonomous}\n"
f"[bold]Dry run:[/bold] {dry_run}\n"
f"[bold]Max retries:[/bold] {retries}",
title="Self-Modify",
border_style="cyan",
))
loop = SelfModifyLoop(
max_retries=retries,
backend=effective_backend,
autonomous=autonomous,
max_autonomous_cycles=max_cycles,
)
request = ModifyRequest(
instruction=instruction,
target_files=target_files,
dry_run=dry_run,
)
with console.status("[bold cyan]Running self-modification loop..."):
result = loop.run(request)
if result.report_path:
console.print(f"\n[dim]Report saved: {result.report_path}[/dim]\n")
if result.success:
console.print(Panel(
f"[green bold]SUCCESS[/green bold]\n\n"
f"Files changed: {', '.join(result.files_changed)}\n"
f"Tests passed: {result.test_passed}\n"
f"Commit: {result.commit_sha or 'none (dry run)'}\n"
f"Branch: {result.branch_name or 'current'}\n"
f"Attempts: {result.attempts}\n"
f"Autonomous cycles: {result.autonomous_cycles}",
title="Result",
border_style="green",
))
else:
console.print(Panel(
f"[red bold]FAILED[/red bold]\n\n"
f"Error: {result.error}\n"
f"Attempts: {result.attempts}\n"
f"Autonomous cycles: {result.autonomous_cycles}",
title="Result",
border_style="red",
))
raise typer.Exit(1)
if speak and result.success:
try:
from timmy_serve.voice_tts import voice_tts
if voice_tts.available:
voice_tts.speak_sync(
f"Code modification complete. "
f"{len(result.files_changed)} files changed. Tests passing."
)
except Exception:
pass
@app.command()
def status():
"""Show whether self-modification is enabled."""
from config import settings
enabled = settings.self_modify_enabled
color = "green" if enabled else "red"
console.print(f"Self-modification: [{color}]{'ENABLED' if enabled else 'DISABLED'}[/{color}]")
console.print(f"Max retries: {settings.self_modify_max_retries}")
console.print(f"Backend: {settings.self_modify_backend}")
console.print(f"Allowed dirs: {settings.self_modify_allowed_dirs}")
def main():
app()
if __name__ == "__main__":
main()

View File

@@ -1,741 +0,0 @@
"""Self-modification loop — read source, generate edits, test, commit.
Orchestrates the full cycle for Timmy to modify its own codebase:
1. Create a working git branch
2. Read target source files
3. Send instruction + source to the LLM
4. Validate syntax before writing
5. Write edits to disk
6. Run pytest
7. On success -> git add + commit; on failure -> revert
8. On total failure -> diagnose from report, restart autonomously
Supports multiple LLM backends:
- "ollama" — local Ollama (default, sovereign)
- "anthropic" — Claude API via Anthropic SDK
- "auto" — try anthropic first (if key set), fall back to ollama
Reports are saved to data/self_modify_reports/ for debugging.
"""
from __future__ import annotations
import logging
import os
import re
import subprocess
import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from config import settings
logger = logging.getLogger(__name__)
# Project root — use settings.repo_root (works in Docker and local dev)
PROJECT_ROOT = Path(settings.repo_root)
# Reports directory
REPORTS_DIR = PROJECT_ROOT / "data" / "self_modify_reports"
# Only one self-modification at a time
_LOCK = threading.Lock()
# Maximum file size we'll send to the LLM (bytes)
_MAX_FILE_SIZE = 50_000
# Delimiter format the LLM is instructed to use
_FILE_BLOCK_RE = re.compile(
r"---\s*FILE:\s*(.+?)\s*---\n(.*?)---\s*END\s*FILE\s*---",
re.DOTALL,
)
# Backend type literal
BACKENDS = ("ollama", "anthropic", "auto")
@dataclass
class ModifyRequest:
"""A request to modify code."""
instruction: str
target_files: list[str] = field(default_factory=list)
dry_run: bool = False
@dataclass
class ModifyResult:
"""Result of a self-modification attempt."""
success: bool
files_changed: list[str] = field(default_factory=list)
test_passed: bool = False
commit_sha: Optional[str] = None
branch_name: Optional[str] = None
error: Optional[str] = None
llm_response: str = ""
attempts: int = 0
report_path: Optional[str] = None
autonomous_cycles: int = 0
class SelfModifyLoop:
"""Orchestrates the read -> edit -> test -> commit cycle.
Supports autonomous self-correction: when all retries fail, reads its own
failure report, diagnoses the root cause, and restarts with a corrected
instruction.
"""
def __init__(
self,
repo_path: Optional[Path] = None,
max_retries: Optional[int] = None,
backend: Optional[str] = None,
autonomous: bool = False,
max_autonomous_cycles: int = 3,
) -> None:
self._repo_path = repo_path or PROJECT_ROOT
self._max_retries = (
max_retries if max_retries is not None else settings.self_modify_max_retries
)
self._allowed_dirs = [
d.strip() for d in settings.self_modify_allowed_dirs.split(",") if d.strip()
]
self._run_id = f"{int(time.time())}"
self._attempt_reports: list[dict] = []
self._backend = backend or settings.self_modify_backend
self._autonomous = autonomous
self._max_autonomous_cycles = max_autonomous_cycles
# ── Public API ────────────────────────────────────────────────────────────
def run(self, request: ModifyRequest) -> ModifyResult:
"""Execute the full self-modification loop."""
if not settings.self_modify_enabled:
return ModifyResult(
success=False,
error="Self-modification is disabled. Set SELF_MODIFY_ENABLED=true.",
)
if not _LOCK.acquire(blocking=False):
return ModifyResult(
success=False,
error="Another self-modification is already running.",
)
try:
result = self._run_locked(request)
report_path = self._save_report(request, result)
result.report_path = str(report_path)
# Autonomous mode: if failed, diagnose and restart
if self._autonomous and not result.success and not request.dry_run:
result = self._autonomous_loop(request, result, report_path)
return result
finally:
_LOCK.release()
# ── Autonomous self-correction ─────────────────────────────────────────
def _autonomous_loop(
self, original_request: ModifyRequest, last_result: ModifyResult, last_report: Path
) -> ModifyResult:
"""Read the failure report, diagnose, and restart with a fix."""
for cycle in range(1, self._max_autonomous_cycles + 1):
logger.info("Autonomous cycle %d/%d", cycle, self._max_autonomous_cycles)
# Diagnose what went wrong
diagnosis = self._diagnose_failure(last_report)
if not diagnosis:
logger.warning("Could not diagnose failure, stopping autonomous loop")
last_result.autonomous_cycles = cycle
return last_result
logger.info("Diagnosis: %s", diagnosis[:200])
# Build a corrected instruction
corrected_instruction = (
f"{original_request.instruction}\n\n"
f"IMPORTANT CORRECTION from previous failure:\n{diagnosis}"
)
# Reset attempt reports for this cycle
self._attempt_reports = []
corrected_request = ModifyRequest(
instruction=corrected_instruction,
target_files=original_request.target_files,
dry_run=original_request.dry_run,
)
result = self._run_locked(corrected_request)
report_path = self._save_report(corrected_request, result)
result.report_path = str(report_path)
result.autonomous_cycles = cycle
if result.success:
logger.info("Autonomous cycle %d succeeded!", cycle)
return result
last_result = result
last_report = report_path
logger.warning("Autonomous loop exhausted after %d cycles", self._max_autonomous_cycles)
return last_result
def _diagnose_failure(self, report_path: Path) -> Optional[str]:
"""Read a failure report and produce a diagnosis + fix instruction.
Uses the best available LLM to analyze the report. This is the
'meta-reasoning' step — the agent reasoning about its own failures.
"""
try:
report_text = report_path.read_text(encoding="utf-8")
except Exception as exc:
logger.error("Could not read report %s: %s", report_path, exc)
return None
# Truncate to keep within context limits
if len(report_text) > 8000:
report_text = report_text[:8000] + "\n... (truncated)"
diagnosis_prompt = f"""You are a code debugging expert. Analyze this self-modification failure report and provide a concise diagnosis.
FAILURE REPORT:
{report_text}
Analyze the report and provide:
1. ROOT CAUSE: What specifically went wrong (syntax error, logic error, missing import, etc.)
2. FIX INSTRUCTIONS: Exact instructions for a code-generation LLM to avoid this mistake.
Be very specific — e.g. "Do NOT start the file with triple-quotes" or
"The em-dash character U+2014 must stay INSIDE a string literal, never outside one."
Keep your response under 500 words. Focus on actionable fix instructions."""
try:
raw = self._call_llm(diagnosis_prompt)
return raw.strip() if raw else None
except Exception as exc:
logger.error("Diagnosis LLM call failed: %s", exc)
return None
# ── Internal orchestration ────────────────────────────────────────────────
def _run_locked(self, request: ModifyRequest) -> ModifyResult:
branch_name = None
attempt = 0
# Skip branch creation — writing files triggers container restarts
# which kills the process mid-operation. Work on the current branch.
if not os.environ.get("SELF_MODIFY_SKIP_BRANCH"):
try:
branch_name = self._create_branch()
except Exception as exc:
logger.warning("Could not create branch: %s (continuing on current)", exc)
# Resolve target files
target_files = request.target_files or self._infer_target_files(
request.instruction
)
if not target_files:
return ModifyResult(
success=False,
error="No target files identified. Specify target_files or use more specific language.",
branch_name=branch_name,
)
# Validate paths
try:
self._validate_paths(target_files)
except ValueError as exc:
return ModifyResult(success=False, error=str(exc), branch_name=branch_name)
last_test_output = ""
last_llm_response = ""
last_syntax_errors: dict[str, str] = {}
while attempt <= self._max_retries:
attempt += 1
logger.info(
"Self-modify attempt %d/%d: %s",
attempt,
self._max_retries + 1,
request.instruction[:80],
)
# Read current contents
file_contents = self._read_files(target_files)
if not file_contents:
return ModifyResult(
success=False,
error="Could not read any target files.",
branch_name=branch_name,
attempts=attempt,
)
# Generate edits via LLM
try:
edits, llm_response = self._generate_edits(
request.instruction, file_contents,
prev_test_output=last_test_output if attempt > 1 else None,
prev_syntax_errors=last_syntax_errors if attempt > 1 else None,
)
last_llm_response = llm_response
except Exception as exc:
self._attempt_reports.append({
"attempt": attempt,
"phase": "llm_generation",
"error": str(exc),
})
return ModifyResult(
success=False,
error=f"LLM generation failed: {exc}",
branch_name=branch_name,
attempts=attempt,
)
if not edits:
self._attempt_reports.append({
"attempt": attempt,
"phase": "parse_edits",
"error": "No file edits parsed from LLM response",
"llm_response": llm_response,
})
return ModifyResult(
success=False,
error="LLM produced no file edits.",
llm_response=llm_response,
branch_name=branch_name,
attempts=attempt,
)
# Syntax validation — check BEFORE writing to disk
syntax_errors = self._validate_syntax(edits)
if syntax_errors:
last_syntax_errors = syntax_errors
error_summary = "; ".join(
f"{fp}: {err}" for fp, err in syntax_errors.items()
)
logger.warning("Syntax errors in LLM output: %s", error_summary)
self._attempt_reports.append({
"attempt": attempt,
"phase": "syntax_validation",
"error": error_summary,
"edits_content": {fp: content for fp, content in edits.items()},
"llm_response": llm_response,
})
# Don't write — go straight to retry
continue
last_syntax_errors = {}
if request.dry_run:
self._attempt_reports.append({
"attempt": attempt,
"phase": "dry_run",
"edits": {fp: content[:500] + "..." if len(content) > 500 else content
for fp, content in edits.items()},
"llm_response": llm_response,
})
return ModifyResult(
success=True,
files_changed=list(edits.keys()),
llm_response=llm_response,
branch_name=branch_name,
attempts=attempt,
)
# Write edits
written = self._write_files(edits)
# Run tests
test_passed, test_output = self._run_tests()
last_test_output = test_output
# Save per-attempt report
self._attempt_reports.append({
"attempt": attempt,
"phase": "complete",
"files_written": written,
"edits_content": {fp: content for fp, content in edits.items()},
"test_passed": test_passed,
"test_output": test_output,
"llm_response": llm_response,
})
if test_passed:
sha = self._git_commit(
f"self-modify: {request.instruction[:72]}", written
)
return ModifyResult(
success=True,
files_changed=written,
test_passed=True,
commit_sha=sha,
branch_name=branch_name,
llm_response=llm_response,
attempts=attempt,
)
# Tests failed — revert and maybe retry
logger.warning(
"Tests failed on attempt %d: %s", attempt, test_output[:200]
)
self._revert_files(written)
return ModifyResult(
success=False,
files_changed=[],
test_passed=False,
error=f"Tests failed after {attempt} attempt(s).",
llm_response=last_llm_response,
branch_name=branch_name,
attempts=attempt,
)
# ── Syntax validation ──────────────────────────────────────────────────
def _validate_syntax(self, edits: dict[str, str]) -> dict[str, str]:
"""Compile-check each .py file edit. Returns {path: error} for failures."""
errors: dict[str, str] = {}
for fp, content in edits.items():
if not fp.endswith(".py"):
continue
try:
compile(content, fp, "exec")
except SyntaxError as exc:
errors[fp] = f"line {exc.lineno}: {exc.msg}"
return errors
# ── Report saving ─────────────────────────────────────────────────────────
def _save_report(self, request: ModifyRequest, result: ModifyResult) -> Path:
"""Save a detailed report to data/self_modify_reports/."""
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
slug = re.sub(r"[^a-z0-9]+", "_", request.instruction[:40].lower()).strip("_")
report_file = REPORTS_DIR / f"{ts}_{slug}.md"
lines = [
f"# Self-Modify Report: {ts}",
"",
f"**Instruction:** {request.instruction[:200]}",
f"**Target files:** {', '.join(request.target_files) or '(auto-detected)'}",
f"**Dry run:** {request.dry_run}",
f"**Backend:** {self._backend}",
f"**Branch:** {result.branch_name or 'N/A'}",
f"**Result:** {'SUCCESS' if result.success else 'FAILED'}",
f"**Error:** {result.error or 'none'}",
f"**Commit:** {result.commit_sha or 'none'}",
f"**Attempts:** {result.attempts}",
f"**Autonomous cycles:** {result.autonomous_cycles}",
"",
]
for attempt_data in self._attempt_reports:
n = attempt_data.get("attempt", "?")
phase = attempt_data.get("phase", "?")
lines.append(f"## Attempt {n} -- {phase}")
lines.append("")
if "error" in attempt_data and attempt_data.get("phase") != "complete":
lines.append(f"**Error:** {attempt_data['error']}")
lines.append("")
if "llm_response" in attempt_data:
lines.append("### LLM Response")
lines.append("```")
lines.append(attempt_data["llm_response"])
lines.append("```")
lines.append("")
if "edits_content" in attempt_data:
lines.append("### Edits Written")
for fp, content in attempt_data["edits_content"].items():
lines.append(f"#### {fp}")
lines.append("```python")
lines.append(content)
lines.append("```")
lines.append("")
if "test_output" in attempt_data:
lines.append(f"### Test Result: {'PASSED' if attempt_data.get('test_passed') else 'FAILED'}")
lines.append("```")
lines.append(attempt_data["test_output"])
lines.append("```")
lines.append("")
report_text = "\n".join(lines)
report_file.write_text(report_text, encoding="utf-8")
logger.info("Report saved: %s", report_file)
return report_file
# ── Git helpers ───────────────────────────────────────────────────────────
def _create_branch(self) -> str:
"""Create and switch to a working branch."""
from creative.tools.git_tools import git_branch
branch_name = f"timmy/self-modify-{int(time.time())}"
git_branch(self._repo_path, create=branch_name, switch=branch_name)
logger.info("Created branch: %s", branch_name)
return branch_name
def _git_commit(self, message: str, files: list[str]) -> Optional[str]:
"""Stage files and commit."""
from creative.tools.git_tools import git_add, git_commit
try:
git_add(self._repo_path, paths=files)
result = git_commit(self._repo_path, message)
sha = result.get("sha")
logger.info("Committed %s: %s", sha[:8] if sha else "?", message)
return sha
except Exception as exc:
logger.error("Git commit failed: %s", exc)
return None
def _revert_files(self, file_paths: list[str]) -> None:
"""Restore files from git HEAD."""
for fp in file_paths:
try:
subprocess.run(
["git", "checkout", "HEAD", "--", fp],
cwd=self._repo_path,
capture_output=True,
timeout=10,
)
except Exception as exc:
logger.error("Failed to revert %s: %s", fp, exc)
# ── File I/O ──────────────────────────────────────────────────────────────
def _validate_paths(self, file_paths: list[str]) -> None:
"""Ensure all paths are within allowed directories."""
for fp in file_paths:
resolved = (self._repo_path / fp).resolve()
repo_resolved = self._repo_path.resolve()
if not str(resolved).startswith(str(repo_resolved)):
raise ValueError(f"Path escapes repository: {fp}")
rel = str(resolved.relative_to(repo_resolved))
if not any(rel.startswith(d) for d in self._allowed_dirs):
raise ValueError(
f"Path not in allowed directories ({self._allowed_dirs}): {fp}"
)
def _read_files(self, file_paths: list[str]) -> dict[str, str]:
"""Read file contents from disk."""
contents: dict[str, str] = {}
for fp in file_paths:
full = self._repo_path / fp
if not full.is_file():
logger.warning("File not found: %s", full)
continue
if full.stat().st_size > _MAX_FILE_SIZE:
logger.warning("File too large, skipping: %s", fp)
continue
try:
contents[fp] = full.read_text(encoding="utf-8")
except Exception as exc:
logger.warning("Could not read %s: %s", fp, exc)
return contents
def _write_files(self, edits: dict[str, str]) -> list[str]:
"""Write edited content to disk. Returns paths written."""
written: list[str] = []
for fp, content in edits.items():
full = self._repo_path / fp
full.parent.mkdir(parents=True, exist_ok=True)
full.write_text(content, encoding="utf-8")
written.append(fp)
logger.info("Wrote %d bytes to %s", len(content), fp)
return written
def _infer_target_files(self, instruction: str) -> list[str]:
"""Guess which files to modify from the instruction text."""
paths = re.findall(r"[\w/._-]+\.py", instruction)
if paths:
return paths
keyword_files = {
"config": ["src/config.py"],
"health": ["src/dashboard/routes/health.py"],
"swarm": ["src/swarm/coordinator.py"],
"voice": ["src/voice/nlu.py"],
"agent": ["src/timmy/agent.py"],
"tool": ["src/timmy/tools.py"],
"dashboard": ["src/dashboard/app.py"],
"prompt": ["src/timmy/prompts.py"],
}
instruction_lower = instruction.lower()
for keyword, files in keyword_files.items():
if keyword in instruction_lower:
return files
return []
# ── Test runner ───────────────────────────────────────────────────────────
def _run_tests(self) -> tuple[bool, str]:
"""Run the test suite. Returns (passed, output)."""
try:
result = subprocess.run(
[sys.executable, "-m", "pytest", "tests/", "-q", "--tb=short"],
capture_output=True,
text=True,
cwd=self._repo_path,
timeout=120,
)
output = (result.stdout + result.stderr).strip()
return result.returncode == 0, output
except subprocess.TimeoutExpired:
return False, "Tests timed out after 120s"
except Exception as exc:
return False, f"Failed to run tests: {exc}"
# ── Multi-backend LLM ─────────────────────────────────────────────────────
def _resolve_backend(self) -> str:
"""Resolve 'auto' backend to a concrete one."""
if self._backend == "auto":
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if api_key:
return "anthropic"
return "ollama"
return self._backend
def _call_llm(self, prompt: str) -> str:
"""Route a prompt to the configured LLM backend. Returns raw text."""
backend = self._resolve_backend()
if backend == "anthropic":
return self._call_anthropic(prompt)
else:
return self._call_ollama(prompt)
def _call_anthropic(self, prompt: str) -> str:
"""Call Claude via the Anthropic SDK."""
import anthropic
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
raise RuntimeError("ANTHROPIC_API_KEY not set — cannot use anthropic backend")
client = anthropic.Anthropic(api_key=api_key)
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
return message.content[0].text
def _call_ollama(self, prompt: str) -> str:
"""Call the local Ollama instance via Agno."""
from agno.agent import Agent
from agno.models.ollama import Ollama
agent = Agent(
name="SelfModify",
model=Ollama(id=settings.ollama_model, host=settings.ollama_url),
markdown=False,
)
run_result = agent.run(prompt, stream=False)
return run_result.content if hasattr(run_result, "content") else str(run_result)
# ── LLM interaction ───────────────────────────────────────────────────────
def _generate_edits(
self,
instruction: str,
file_contents: dict[str, str],
prev_test_output: Optional[str] = None,
prev_syntax_errors: Optional[dict[str, str]] = None,
) -> tuple[dict[str, str], str]:
"""Ask the LLM to generate file edits.
Returns (edits_dict, raw_llm_response).
"""
# Build the prompt
files_block = ""
for fp, content in file_contents.items():
files_block += f"\n<FILE path=\"{fp}\">\n{content}\n</FILE>\n"
retry_context = ""
if prev_test_output:
retry_context += f"""
PREVIOUS ATTEMPT FAILED with test errors:
<TEST_OUTPUT>
{prev_test_output[:2000]}
</TEST_OUTPUT>
Fix the issues shown above.
"""
if prev_syntax_errors:
errors_text = "\n".join(f" {fp}: {err}" for fp, err in prev_syntax_errors.items())
retry_context += f"""
PREVIOUS ATTEMPT HAD SYNTAX ERRORS (code was rejected before writing):
{errors_text}
You MUST produce syntactically valid Python. Run through the code mentally
and make sure all strings are properly terminated, all indentation is correct,
and there are no invalid characters outside of string literals.
"""
prompt = f"""You are a precise code modification agent. Edit source files according to the instruction.
INSTRUCTION: {instruction}
CURRENT FILES:
{files_block}
{retry_context}
OUTPUT FORMAT — wrap each modified file like this:
<MODIFIED path="filepath">
complete file content here
</MODIFIED>
CRITICAL RULES:
- Output the COMPLETE file content, not just changed lines
- Keep ALL existing functionality unless told to remove it
- The output must be syntactically valid Python — verify mentally before outputting
- Preserve all special characters (unicode, em-dashes, etc.) exactly as they appear in the original
- Do NOT wrap the file content in triple-quotes or markdown code fences
- Do NOT start the file content with \"\"\" — that would turn the code into a string literal
- Follow the existing code style
Generate the modified files now:"""
raw = self._call_llm(prompt)
# Parse <MODIFIED path="..."> ... </MODIFIED> blocks
edits = {}
xml_re = re.compile(
r'<MODIFIED\s+path=["\'](.+?)["\']\s*>\n?(.*?)</MODIFIED>',
re.DOTALL,
)
for match in xml_re.finditer(raw):
filepath = match.group(1).strip()
content = match.group(2)
# Strip trailing whitespace but keep a final newline
content = content.rstrip() + "\n"
edits[filepath] = content
# Fallback: try the old delimiter format
if not edits:
for match in _FILE_BLOCK_RE.finditer(raw):
filepath = match.group(1).strip()
content = match.group(2).rstrip() + "\n"
edits[filepath] = content
# Last resort: single file + code block
if not edits and len(file_contents) == 1:
only_path = next(iter(file_contents))
code_match = re.search(r"```(?:python)?\n(.*?)```", raw, re.DOTALL)
if code_match:
edits[only_path] = code_match.group(1).rstrip() + "\n"
return edits, raw

View File

@@ -1 +0,0 @@
"""Self-TDD — Continuous test watchdog with regression alerting."""

View File

@@ -1,71 +0,0 @@
"""Self-TDD Watchdog — polls pytest on a schedule and reports regressions.
Run in a terminal alongside your normal dev work:
self-tdd watch
self-tdd watch --interval 30
The watchdog runs silently while tests pass. When a regression appears it
prints the full short-traceback output so you can see exactly what broke.
No files are modified; no commits are made. Ctrl-C to stop.
"""
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
import typer
# Project root is three levels up from src/self_tdd/watchdog.py
PROJECT_ROOT = Path(__file__).parent.parent.parent
app = typer.Typer(help="Self-TDD watchdog — continuous test runner")
def _run_tests() -> tuple[bool, str]:
"""Run the test suite and return (passed, combined_output)."""
result = subprocess.run(
[sys.executable, "-m", "pytest", "tests/", "-q", "--tb=short"],
capture_output=True,
text=True,
cwd=PROJECT_ROOT,
timeout=60,
)
return result.returncode == 0, (result.stdout + result.stderr).strip()
@app.command()
def watch(
interval: int = typer.Option(60, "--interval", "-i", help="Seconds between test runs"),
) -> None:
"""Poll pytest continuously and print regressions as they appear."""
typer.echo(f"Self-TDD watchdog started — polling every {interval}s. Ctrl-C to stop.")
last_passed: bool | None = None
try:
while True:
passed, output = _run_tests()
stamp = datetime.now().strftime("%H:%M:%S")
if passed:
if last_passed is not True:
typer.secho(f"[{stamp}] All tests passing.", fg=typer.colors.GREEN)
else:
typer.secho(f"[{stamp}] Regression detected:", fg=typer.colors.RED)
typer.echo(output)
last_passed = passed
time.sleep(interval)
except KeyboardInterrupt:
typer.echo("\nWatchdog stopped.")
def main() -> None:
app()
if __name__ == "__main__":
main()

View File

@@ -1 +0,0 @@
"""Upgrades — System upgrade queue and execution pipeline."""

View File

@@ -1,331 +0,0 @@
"""Database models for Self-Upgrade Approval Queue."""
import json
import sqlite3
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Optional
DB_PATH = Path("data/swarm.db")
class UpgradeStatus(str, Enum):
"""Status of an upgrade proposal."""
PROPOSED = "proposed"
APPROVED = "approved"
REJECTED = "rejected"
APPLIED = "applied"
FAILED = "failed"
EXPIRED = "expired"
@dataclass
class Upgrade:
"""A self-modification upgrade proposal."""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
status: UpgradeStatus = UpgradeStatus.PROPOSED
# Timestamps
proposed_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
approved_at: Optional[str] = None
applied_at: Optional[str] = None
rejected_at: Optional[str] = None
# Proposal details
branch_name: str = ""
description: str = ""
files_changed: list[str] = field(default_factory=list)
diff_preview: str = ""
# Test results
test_passed: bool = False
test_output: str = ""
# Execution results
error_message: Optional[str] = None
approved_by: Optional[str] = None
def _get_conn() -> sqlite3.Connection:
"""Get database connection with schema initialized."""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute(
"""
CREATE TABLE IF NOT EXISTS upgrades (
id TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'proposed',
proposed_at TEXT NOT NULL,
approved_at TEXT,
applied_at TEXT,
rejected_at TEXT,
branch_name TEXT NOT NULL,
description TEXT NOT NULL,
files_changed TEXT, -- JSON array
diff_preview TEXT,
test_passed INTEGER DEFAULT 0,
test_output TEXT,
error_message TEXT,
approved_by TEXT
)
"""
)
# Indexes
conn.execute("CREATE INDEX IF NOT EXISTS idx_upgrades_status ON upgrades(status)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_upgrades_proposed ON upgrades(proposed_at)")
conn.commit()
return conn
def create_upgrade(
branch_name: str,
description: str,
files_changed: list[str],
diff_preview: str,
test_passed: bool = False,
test_output: str = "",
) -> Upgrade:
"""Create a new upgrade proposal.
Args:
branch_name: Git branch name for the upgrade
description: Human-readable description
files_changed: List of files that would be modified
diff_preview: Short diff preview for review
test_passed: Whether tests passed on the branch
test_output: Test output text
Returns:
The created Upgrade
"""
upgrade = Upgrade(
branch_name=branch_name,
description=description,
files_changed=files_changed,
diff_preview=diff_preview,
test_passed=test_passed,
test_output=test_output,
)
conn = _get_conn()
conn.execute(
"""
INSERT INTO upgrades (id, status, proposed_at, branch_name, description,
files_changed, diff_preview, test_passed, test_output)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
upgrade.id,
upgrade.status.value,
upgrade.proposed_at,
upgrade.branch_name,
upgrade.description,
json.dumps(files_changed),
upgrade.diff_preview,
int(test_passed),
test_output,
),
)
conn.commit()
conn.close()
return upgrade
def get_upgrade(upgrade_id: str) -> Optional[Upgrade]:
"""Get upgrade by ID."""
conn = _get_conn()
row = conn.execute(
"SELECT * FROM upgrades WHERE id = ?", (upgrade_id,)
).fetchone()
conn.close()
if not row:
return None
return Upgrade(
id=row["id"],
status=UpgradeStatus(row["status"]),
proposed_at=row["proposed_at"],
approved_at=row["approved_at"],
applied_at=row["applied_at"],
rejected_at=row["rejected_at"],
branch_name=row["branch_name"],
description=row["description"],
files_changed=json.loads(row["files_changed"]) if row["files_changed"] else [],
diff_preview=row["diff_preview"] or "",
test_passed=bool(row["test_passed"]),
test_output=row["test_output"] or "",
error_message=row["error_message"],
approved_by=row["approved_by"],
)
def list_upgrades(
status: Optional[UpgradeStatus] = None,
limit: int = 100,
) -> list[Upgrade]:
"""List upgrades, optionally filtered by status."""
conn = _get_conn()
if status:
rows = conn.execute(
"SELECT * FROM upgrades WHERE status = ? ORDER BY proposed_at DESC LIMIT ?",
(status.value, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM upgrades ORDER BY proposed_at DESC LIMIT ?",
(limit,),
).fetchall()
conn.close()
return [
Upgrade(
id=r["id"],
status=UpgradeStatus(r["status"]),
proposed_at=r["proposed_at"],
approved_at=r["approved_at"],
applied_at=r["applied_at"],
rejected_at=r["rejected_at"],
branch_name=r["branch_name"],
description=r["description"],
files_changed=json.loads(r["files_changed"]) if r["files_changed"] else [],
diff_preview=r["diff_preview"] or "",
test_passed=bool(r["test_passed"]),
test_output=r["test_output"] or "",
error_message=r["error_message"],
approved_by=r["approved_by"],
)
for r in rows
]
def approve_upgrade(upgrade_id: str, approved_by: str = "dashboard") -> Optional[Upgrade]:
"""Approve an upgrade proposal."""
now = datetime.now(timezone.utc).isoformat()
conn = _get_conn()
cursor = conn.execute(
"""
UPDATE upgrades
SET status = ?, approved_at = ?, approved_by = ?
WHERE id = ? AND status = ?
""",
(UpgradeStatus.APPROVED.value, now, approved_by, upgrade_id, UpgradeStatus.PROPOSED.value),
)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
if not updated:
return None
return get_upgrade(upgrade_id)
def reject_upgrade(upgrade_id: str) -> Optional[Upgrade]:
"""Reject an upgrade proposal."""
now = datetime.now(timezone.utc).isoformat()
conn = _get_conn()
cursor = conn.execute(
"""
UPDATE upgrades
SET status = ?, rejected_at = ?
WHERE id = ? AND status = ?
""",
(UpgradeStatus.REJECTED.value, now, upgrade_id, UpgradeStatus.PROPOSED.value),
)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
if not updated:
return None
return get_upgrade(upgrade_id)
def mark_applied(upgrade_id: str) -> Optional[Upgrade]:
"""Mark upgrade as successfully applied."""
now = datetime.now(timezone.utc).isoformat()
conn = _get_conn()
cursor = conn.execute(
"""
UPDATE upgrades
SET status = ?, applied_at = ?
WHERE id = ? AND status = ?
""",
(UpgradeStatus.APPLIED.value, now, upgrade_id, UpgradeStatus.APPROVED.value),
)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
if not updated:
return None
return get_upgrade(upgrade_id)
def mark_failed(upgrade_id: str, error_message: str) -> Optional[Upgrade]:
"""Mark upgrade as failed."""
conn = _get_conn()
cursor = conn.execute(
"""
UPDATE upgrades
SET status = ?, error_message = ?
WHERE id = ? AND status = ?
""",
(UpgradeStatus.FAILED.value, error_message, upgrade_id, UpgradeStatus.APPROVED.value),
)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
if not updated:
return None
return get_upgrade(upgrade_id)
def get_pending_count() -> int:
"""Get count of pending (proposed) upgrades."""
conn = _get_conn()
row = conn.execute(
"SELECT COUNT(*) as count FROM upgrades WHERE status = ?",
(UpgradeStatus.PROPOSED.value,),
).fetchone()
conn.close()
return row["count"]
def prune_old_upgrades(older_than_days: int = 30) -> int:
"""Delete old completed upgrades."""
from datetime import timedelta
cutoff = (datetime.now(timezone.utc) - timedelta(days=older_than_days)).isoformat()
conn = _get_conn()
cursor = conn.execute(
"""
DELETE FROM upgrades
WHERE proposed_at < ? AND status IN ('applied', 'rejected', 'failed')
""",
(cutoff,),
)
deleted = cursor.rowcount
conn.commit()
conn.close()
return deleted

View File

@@ -1,285 +0,0 @@
"""Upgrade Queue management - bridges self-modify loop with approval workflow."""
import logging
import subprocess
from pathlib import Path
from typing import Optional
from self_coding.upgrades.models import (
Upgrade,
UpgradeStatus,
create_upgrade,
get_upgrade,
approve_upgrade,
reject_upgrade,
mark_applied,
mark_failed,
)
logger = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent.parent
class UpgradeQueue:
"""Manages the upgrade approval and application workflow."""
@staticmethod
def propose(
branch_name: str,
description: str,
files_changed: list[str],
diff_preview: str,
test_passed: bool = False,
test_output: str = "",
) -> Upgrade:
"""Propose a new upgrade for approval.
This is called by the self-modify loop when it generates changes.
The upgrade is created in 'proposed' state and waits for human approval.
Args:
branch_name: Git branch with the changes
description: What the upgrade does
files_changed: List of modified files
diff_preview: Short diff for review
test_passed: Whether tests passed
test_output: Test output
Returns:
The created Upgrade proposal
"""
upgrade = create_upgrade(
branch_name=branch_name,
description=description,
files_changed=files_changed,
diff_preview=diff_preview,
test_passed=test_passed,
test_output=test_output,
)
logger.info(
"Upgrade proposed: %s (%s) - %d files",
upgrade.id[:8],
branch_name,
len(files_changed),
)
# Log to event log
try:
from swarm.event_log import log_event, EventType
log_event(
EventType.SYSTEM_INFO,
source="upgrade_queue",
data={
"upgrade_id": upgrade.id,
"branch": branch_name,
"description": description,
"test_passed": test_passed,
},
)
except Exception:
pass
return upgrade
@staticmethod
def approve(upgrade_id: str, approved_by: str = "dashboard") -> Optional[Upgrade]:
"""Approve an upgrade proposal.
Called from dashboard when user clicks "Approve".
Does NOT apply the upgrade - that happens separately.
Args:
upgrade_id: The upgrade to approve
approved_by: Who approved it (for audit)
Returns:
Updated Upgrade or None if not found/not in proposed state
"""
upgrade = approve_upgrade(upgrade_id, approved_by)
if upgrade:
logger.info("Upgrade approved: %s by %s", upgrade_id[:8], approved_by)
return upgrade
@staticmethod
def reject(upgrade_id: str) -> Optional[Upgrade]:
"""Reject an upgrade proposal.
Called from dashboard when user clicks "Reject".
Cleans up the branch.
Args:
upgrade_id: The upgrade to reject
Returns:
Updated Upgrade or None
"""
upgrade = reject_upgrade(upgrade_id)
if upgrade:
logger.info("Upgrade rejected: %s", upgrade_id[:8])
# Clean up branch
try:
subprocess.run(
["git", "branch", "-D", upgrade.branch_name],
cwd=PROJECT_ROOT,
capture_output=True,
check=False,
)
except Exception as exc:
logger.warning("Failed to delete branch %s: %s", upgrade.branch_name, exc)
return upgrade
@staticmethod
def apply(upgrade_id: str) -> tuple[bool, str]:
"""Apply an approved upgrade.
This is the critical operation that actually modifies the codebase:
1. Checks out the branch
2. Runs tests
3. If tests pass: merges to main
4. Updates upgrade status
Args:
upgrade_id: The approved upgrade to apply
Returns:
(success, message) tuple
"""
upgrade = get_upgrade(upgrade_id)
if not upgrade:
return False, "Upgrade not found"
if upgrade.status != UpgradeStatus.APPROVED:
return False, f"Upgrade not approved (status: {upgrade.status.value})"
logger.info("Applying upgrade: %s (%s)", upgrade_id[:8], upgrade.branch_name)
try:
# 1. Checkout branch
result = subprocess.run(
["git", "checkout", upgrade.branch_name],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
)
if result.returncode != 0:
mark_failed(upgrade_id, f"Checkout failed: {result.stderr}")
return False, f"Failed to checkout branch: {result.stderr}"
# 2. Run tests
result = subprocess.run(
["python", "-m", "pytest", "tests/", "-x", "-q"],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
mark_failed(upgrade_id, f"Tests failed: {result.stdout}\n{result.stderr}")
# Switch back to main
subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
return False, "Tests failed"
# 3. Merge to main
result = subprocess.run(
["git", "checkout", "main"],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
)
if result.returncode != 0:
mark_failed(upgrade_id, f"Failed to checkout main: {result.stderr}")
return False, "Failed to checkout main"
result = subprocess.run(
["git", "merge", "--no-ff", upgrade.branch_name, "-m", f"Apply upgrade: {upgrade.description}"],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
)
if result.returncode != 0:
mark_failed(upgrade_id, f"Merge failed: {result.stderr}")
return False, "Merge failed"
# 4. Mark as applied
mark_applied(upgrade_id)
# 5. Clean up branch
subprocess.run(
["git", "branch", "-d", upgrade.branch_name],
cwd=PROJECT_ROOT,
capture_output=True,
check=False,
)
logger.info("Upgrade applied successfully: %s", upgrade_id[:8])
return True, "Upgrade applied successfully"
except subprocess.TimeoutExpired:
mark_failed(upgrade_id, "Tests timed out")
subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
return False, "Tests timed out"
except Exception as exc:
error_msg = str(exc)
mark_failed(upgrade_id, error_msg)
subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
return False, f"Error: {error_msg}"
@staticmethod
def get_full_diff(upgrade_id: str) -> str:
"""Get full git diff for an upgrade.
Args:
upgrade_id: The upgrade to get diff for
Returns:
Git diff output
"""
upgrade = get_upgrade(upgrade_id)
if not upgrade:
return "Upgrade not found"
try:
result = subprocess.run(
["git", "diff", "main..." + upgrade.branch_name],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
)
return result.stdout if result.returncode == 0 else result.stderr
except Exception as exc:
return f"Error getting diff: {exc}"
# Convenience functions for self-modify loop
def propose_upgrade_from_loop(
branch_name: str,
description: str,
files_changed: list[str],
diff: str,
test_output: str = "",
) -> Upgrade:
"""Called by self-modify loop to propose an upgrade.
Tests are expected to have been run by the loop before calling this.
"""
# Check if tests passed from output
test_passed = "passed" in test_output.lower() or " PASSED " in test_output
return UpgradeQueue.propose(
branch_name=branch_name,
description=description,
files_changed=files_changed,
diff_preview=diff[:2000], # First 2000 chars
test_passed=test_passed,
test_output=test_output,
)