[EPIC-999/Phase II] The Forge — claw_runtime scaffold + competing rewrite pipeline #108

Closed
ezra wants to merge 4 commits from epic-999-phase-ii-forge into main
9 changed files with 52508 additions and 0 deletions

158
agent/claw_runtime.py Normal file
View File

@@ -0,0 +1,158 @@
"""
agent/claw_runtime.py — Claw Code runtime decomposition scaffold.
Part of EPIC-999 Phase II — The Forge.
This module introduces the 5-class decomposition of the monolithic AIAgent
to enable competing sub-agent rewrites and future runtime replacement.
Migration rule: each class begins as a thin facade over AIAgent methods.
Logic migrates incrementally from run_agent.py into these classes.
"""
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
class ModelResponse:
"""Normalized model response, independent of provider."""
def __init__(self, content: str = None, tool_calls: list = None, reasoning: str = None):
self.content = content or ""
self.tool_calls = tool_calls or []
self.reasoning = reasoning or ""
class ToolResult:
"""Normalized tool execution result."""
def __init__(self, tool_call_id: str, output: str, error: str = None):
self.tool_call_id = tool_call_id
self.output = output
self.error = error
class ConversationLoop:
"""
Owns the while-loop invariant: iteration budget, termination conditions,
and the high-level orchestration of turn-taking.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def run(
self,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
system_message: str = None,
) -> Dict[str, Any]:
"""
Run the conversation until completion or budget exhaustion.
Invariant: must terminate before max_iterations and iteration_budget <= 0.
"""
# Facade: delegate to AIAgent.run_conversation for now.
return self.agent.run_conversation(
user_message=messages[-1]["content"] if messages else "",
system_message=system_message,
conversation_history=messages[:-1] if len(messages) > 1 else None,
)
class ModelDispatcher:
"""
Owns all interaction with the LLM client: streaming, fallback activation,
response normalization, and provider-specific quirks.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def call(self, model: str, messages: List[Dict], tools: List[Dict], **kwargs) -> ModelResponse:
"""
Dispatch a single API call and return a normalized response.
Invariant: always returns a ModelResponse with .content, .tool_calls, .reasoning.
"""
# Facade: will be populated with logic from AIAgent._interruptible_streaming_api_call
# and related normalization helpers.
raise NotImplementedError("ModelDispatcher.call() — migrate from AIAgent streaming logic")
class ToolExecutor:
"""
Owns tool execution: sequential vs concurrent dispatch, error wrapping,
and result formatting.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def execute(self, tool_calls: List[Any], task_id: str = None) -> List[ToolResult]:
"""
Execute a list of tool calls and return normalized results.
Invariant: every tool_call produces exactly one ToolResult.
"""
# Facade: delegate to AIAgent._execute_tool_calls_sequential / _concurrent
if hasattr(self.agent, "_execute_tool_calls_sequential"):
return self.agent._execute_tool_calls_sequential(tool_calls, task_id=task_id)
raise NotImplementedError("ToolExecutor.execute() — migrate from AIAgent tool execution")
class MemoryInterceptor:
"""
Intercepts agent-level tools (memory, todo) before they reach the global registry.
Also handles flush-on-exit for pending memories.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def intercept(self, tool_name: str, args: Dict[str, Any], task_id: str = None) -> Optional[str]:
"""
If the tool_name is 'memory' or 'todo', handle it directly and return the result.
Otherwise return None to signal pass-through to the ToolExecutor.
Invariant: must not mutate agent state except through explicit flush().
"""
# Facade: will be populated with logic from run_agent.py memory/todo interception.
if tool_name in ("memory", "todo"):
# Placeholder: actual migration will move the interception block here.
return None
return None
def flush(self):
"""Flush any pending memories to persistent storage."""
if hasattr(self.agent, "flush_memories"):
self.agent.flush_memories()
class PromptBuilder:
"""
Owns system prompt assembly, skill injection, context compression,
and prompt caching marker placement.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def build(
self,
user_message: str,
conversation_history: List[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""
Build the full message list for the API call.
Invariant: output list must start with a system message (or equivalent).
"""
# Facade: delegate to AIAgent._build_system_prompt and related helpers.
if hasattr(self.agent, "_build_system_prompt"):
system_msg = self.agent._build_system_prompt(user_message)
messages = []
if system_msg:
messages.append({"role": "system", "content": system_msg})
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})
return messages
raise NotImplementedError("PromptBuilder.build() — migrate from AIAgent prompt assembly")

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,74 @@
# AIAgent Decomposition Plan (EPIC-999 Phase II Prep)
## Current State
`run_agent.py` contains `AIAgent` — a ~7,000-SLOC class that is the highest-blast-radius module in Hermes.
## Goal
Decompose `AIAgent` into 5 focused classes with strict interfaces, enabling:
- Parallel rewrites by competing sub-agents (Phase II)
- Independent testing of loop semantics vs. model I/O vs. memory
- Future runtime replacement (Hermes Ω) without touching tool infrastructure
## Proposed Decomposition
### 1. `ConversationLoop`
**Responsibility:** Own the `while` loop invariant, iteration budget, and termination conditions.
**Interface:**
```python
class ConversationLoop:
def run(self, messages: list, tools: list, client) -> dict:
...
```
**Invariant:** Must terminate before `max_iterations` and `iteration_budget.remaining <= 0`.
### 2. `ModelDispatcher`
**Responsibility:** All interaction with `client.chat.completions.create`, including streaming, fallback activation, and response normalization.
**Interface:**
```python
class ModelDispatcher:
def call(self, model: str, messages: list, tools: list, **kwargs) -> ModelResponse:
...
```
**Invariant:** Must always return a normalized object with `.content`, `.tool_calls`, `.reasoning`.
### 3. `ToolExecutor`
**Responsibility:** Execute tool calls (sequential or concurrent), handle errors, and format results.
**Interface:**
```python
class ToolExecutor:
def execute(self, tool_calls: list, task_id: str = None) -> list[ToolResult]:
...
```
**Invariant:** Every tool_call produces exactly one ToolResult, and errors are JSON-serializable.
### 4. `MemoryInterceptor`
**Responsibility:** Intercept `memory` and `todo` tool calls before they reach the registry, plus flush memories on session end.
**Interface:**
```python
class MemoryInterceptor:
def intercept(self, tool_name: str, args: dict, task_id: str = None) -> str | None:
... # returns result if intercepted, None if pass-through
```
**Invariant:** Must not mutate agent state except through explicit `flush()` calls.
### 5. `PromptBuilder`
**Responsibility:** Assemble system prompt, inject skills, apply context compression, and manage prompt caching markers.
**Interface:**
```python
class PromptBuilder:
def build(self, user_message: str, conversation_history: list) -> list:
...
```
**Invariant:** Output list must start with a system message (or equivalent provider parameter).
## Migration Path
1. Create the 5 classes as thin facades that delegate back to `AIAgent` methods.
2. Move logic incrementally from `AIAgent` into the new classes.
3. Once `AIAgent` is a pure coordinator (~500 SLOC), freeze the interface.
4. Phase II competing agents rewrite one class at a time.
## Acceptance Criteria
- [ ] `AIAgent` reduced to < 1,000 SLOC
- [ ] Each new class has > 80% test coverage
- [ ] Full existing test suite still passes
- [ ] No behavioral regressions in shadow mode

View File

@@ -0,0 +1,263 @@
# Hermes Ω Specification Draft (Ouroboros Phase I)
> Auto-generated by Ezra as part of EPIC-999. This document is a living artifact.
## Scope
This specification covers the core runtime of Hermes agent v0.7.x as found in the `hermes-agent` codebase.
## High-Level Architecture
```
User Message
Gateway (gateway/run.py) — platform adapter (Telegram, Discord, CLI, etc.)
HermesCLI (cli.py) or AIAgent.chat() (run_agent.py)
ModelTools (model_tools.py) — tool discovery, schema assembly, dispatch
Tool Registry (tools/registry.py) — handler lookup, availability checks
Individual Tool Implementations (tools/*.py)
Results returned up the stack
```
## Module Specifications
### `run_agent.py`
**Lines of Code:** 8948
**Classes:**
- `_SafeWriter`
- *Transparent stdio wrapper that catches OSError/ValueError from broken pipes.*
- `__init__(self, inner)`
- `write(self, data)`
- `flush(self)`
- `fileno(self)`
- `isatty(self)`
- ... and 1 more methods
- `IterationBudget`
- *Thread-safe iteration counter for an agent.*
- `__init__(self, max_total)`
- `consume(self)`
- `refund(self)`
- `used(self)`
- `remaining(self)`
- `AIAgent`
- *AI Agent with tool calling capabilities.*
- `base_url(self)`
- `base_url(self, value)`
- `__init__(self, base_url, api_key, provider, api_mode, acp_command, acp_args, command, args, model, max_iterations, tool_delay, enabled_toolsets, disabled_toolsets, save_trajectories, verbose_logging, quiet_mode, ephemeral_system_prompt, log_prefix_chars, log_prefix, providers_allowed, providers_ignored, providers_order, provider_sort, provider_require_parameters, provider_data_collection, session_id, tool_progress_callback, tool_start_callback, tool_complete_callback, thinking_callback, reasoning_callback, clarify_callback, step_callback, stream_delta_callback, tool_gen_callback, status_callback, max_tokens, reasoning_config, prefill_messages, platform, skip_context_files, skip_memory, session_db, iteration_budget, fallback_model, credential_pool, checkpoints_enabled, checkpoint_max_snapshots, pass_session_id, persist_session)`
- `reset_session_state(self)`
- `_safe_print(self)`
- ... and 100 more methods
**Top-Level Functions:**
- `_install_safe_stdio()`
- `_is_destructive_command(cmd)`
- `_should_parallelize_tool_batch(tool_calls)`
- `_extract_parallel_scope_path(tool_name, function_args)`
- `_paths_overlap(left, right)`
- `_sanitize_surrogates(text)`
- `_sanitize_messages_surrogates(messages)`
- `_strip_budget_warnings_from_history(messages)`
- `main(query, model, api_key, base_url, max_turns, enabled_toolsets, disabled_toolsets, list_tools, save_trajectories, save_sample, verbose, log_prefix_chars)`
**Inferred Side Effects & Invariants:**
- Persists state to SQLite database.
- Performs file I/O.
- Makes HTTP network calls.
- Uses global mutable state (risk factor).
### `model_tools.py`
**Lines of Code:** 466
**Top-Level Functions:**
- `_get_tool_loop()`
- `_get_worker_loop()`
- `_run_async(coro)`
- `_discover_tools()`
- `get_tool_definitions(enabled_toolsets, disabled_toolsets, quiet_mode)`
- `handle_function_call(function_name, function_args, task_id, user_task, enabled_tools)`
- `get_all_tool_names()`
- `get_toolset_for_tool(tool_name)`
- `get_available_toolsets()`
- `check_toolset_requirements()`
- ... and 1 more functions
**Inferred Side Effects & Invariants:**
- Uses global mutable state (risk factor).
- Primarily pure Python logic / orchestration.
### `cli.py`
**Lines of Code:** 8280
**Classes:**
- `ChatConsole`
- *Rich Console adapter for prompt_toolkit's patch_stdout context.*
- `__init__(self)`
- `print(self)`
- `HermesCLI`
- *Interactive CLI for the Hermes Agent.*
- `__init__(self, model, toolsets, provider, api_key, base_url, max_turns, verbose, compact, resume, checkpoints, pass_session_id)`
- `_invalidate(self, min_interval)`
- `_status_bar_context_style(self, percent_used)`
- `_build_context_bar(self, percent_used, width)`
- `_get_status_bar_snapshot(self)`
- ... and 106 more methods
**Top-Level Functions:**
- `_load_prefill_messages(file_path)`
- `_parse_reasoning_config(effort)`
- `load_cli_config()`
- `_run_cleanup()`
- `_git_repo_root()`
- `_path_is_within_root(path, root)`
- `_setup_worktree(repo_root)`
- `_cleanup_worktree(info)`
- `_prune_stale_worktrees(repo_root, max_age_hours)`
- `_accent_hex()`
- ... and 9 more functions
**Inferred Side Effects & Invariants:**
- Persists state to SQLite database.
- Performs file I/O.
- Spawns subprocesses / shell commands.
- Uses global mutable state (risk factor).
### `tools/registry.py`
**Lines of Code:** 275
**Classes:**
- `ToolEntry`
- *Metadata for a single registered tool.*
- `__init__(self, name, toolset, schema, handler, check_fn, requires_env, is_async, description, emoji)`
- `ToolRegistry`
- *Singleton registry that collects tool schemas + handlers from tool files.*
- `__init__(self)`
- `register(self, name, toolset, schema, handler, check_fn, requires_env, is_async, description, emoji)`
- `deregister(self, name)`
- `get_definitions(self, tool_names, quiet)`
- `dispatch(self, name, args)`
- ... and 10 more methods
**Inferred Side Effects & Invariants:**
- Primarily pure Python logic / orchestration.
### `gateway/run.py`
**Lines of Code:** 6657
**Classes:**
- `GatewayRunner`
- *Main gateway controller.*
- `__init__(self, config)`
- `_has_setup_skill(self)`
- `_load_voice_modes(self)`
- `_save_voice_modes(self)`
- `_set_adapter_auto_tts_disabled(self, adapter, chat_id, disabled)`
- ... and 78 more methods
**Top-Level Functions:**
- `_ensure_ssl_certs()`
- `_normalize_whatsapp_identifier(value)`
- `_expand_whatsapp_auth_aliases(identifier)`
- `_resolve_runtime_agent_kwargs()`
- `_build_media_placeholder(event)`
- `_dequeue_pending_text(adapter, session_key)`
- `_check_unavailable_skill(command_name)`
- `_platform_config_key(platform)`
- `_load_gateway_config()`
- `_resolve_gateway_model(config)`
- ... and 4 more functions
**Inferred Side Effects & Invariants:**
- Persists state to SQLite database.
- Performs file I/O.
- Spawns subprocesses / shell commands.
- Contains async code paths.
- Uses global mutable state (risk factor).
### `hermes_state.py`
**Lines of Code:** 1270
**Classes:**
- `SessionDB`
- *SQLite-backed session storage with FTS5 search.*
- `__init__(self, db_path)`
- `_execute_write(self, fn)`
- `_try_wal_checkpoint(self)`
- `close(self)`
- `_init_schema(self)`
- ... and 29 more methods
**Inferred Side Effects & Invariants:**
- Persists state to SQLite database.
### `agent/context_compressor.py`
**Lines of Code:** 676
**Classes:**
- `ContextCompressor`
- *Compresses conversation context when approaching the model's context limit.*
- `__init__(self, model, threshold_percent, protect_first_n, protect_last_n, summary_target_ratio, quiet_mode, summary_model_override, base_url, api_key, config_context_length, provider)`
- `update_from_response(self, usage)`
- `should_compress(self, prompt_tokens)`
- `should_compress_preflight(self, messages)`
- `get_status(self)`
- ... and 11 more methods
**Inferred Side Effects & Invariants:**
- Primarily pure Python logic / orchestration.
### `agent/prompt_caching.py`
**Lines of Code:** 72
**Top-Level Functions:**
- `_apply_cache_marker(msg, cache_marker, native_anthropic)`
- `apply_anthropic_cache_control(api_messages, cache_ttl, native_anthropic)`
**Inferred Side Effects & Invariants:**
- Primarily pure Python logic / orchestration.
### `agent/skill_commands.py`
**Lines of Code:** 297
**Top-Level Functions:**
- `build_plan_path(user_instruction)`
- `_load_skill_payload(skill_identifier, task_id)`
- `_build_skill_message(loaded_skill, skill_dir, activation_note, user_instruction, runtime_note)`
- `scan_skill_commands()`
- `get_skill_commands()`
- `build_skill_invocation_message(cmd_key, user_instruction, task_id, runtime_note)`
- `build_preloaded_skills_prompt(skill_identifiers, task_id)`
**Inferred Side Effects & Invariants:**
- Uses global mutable state (risk factor).
- Primarily pure Python logic / orchestration.
## Cross-Module Dependencies
Key data flow:
1. `run_agent.py` defines `AIAgent` — the canonical conversation loop.
2. `model_tools.py` assembles tool schemas and dispatches function calls.
3. `tools/registry.py` maintains the central registry; all tool files import it.
4. `gateway/run.py` adapts platform events into `AIAgent.run_conversation()` calls.
5. `cli.py` (`HermesCLI`) provides the interactive shell and slash-command routing.
## Known Coupling Risks
- `run_agent.py` is ~7k SLOC and contains the core loop, todo/memory interception, context compression, and trajectory saving. High blast radius.
- `cli.py` is ~6.5k SLOC and combines UI (Rich/prompt_toolkit), config loading, and command dispatch. Tightly coupled to display state.
- `model_tools.py` holds a process-global `_last_resolved_tool_names`. Subagent execution saves/restores this global.
- `tools/registry.py` is imported by ALL tool files; schema generation happens at import time.
## Next Actions (Phase II Prep)
1. Decompose `AIAgent` into: `ConversationLoop`, `ContextManager`, `ToolDispatcher`, `MemoryInterceptor`.
2. Extract CLI display logic from command dispatch.
3. Define strict interfaces between gateway → agent → tools.
4. Write property-based tests for the conversation loop invariant: *given the same message history and tool results, the agent must produce deterministic tool_call ordering*.
---
Generated: 2026-04-05 by Ezra (Phase I)

View File

@@ -0,0 +1,137 @@
"""
Property-based test stubs for Hermes core invariants.
Part of EPIC-999 Phase I — The Mirror.
These tests define behavioral contracts that ANY rewrite of the runtime
must satisfy, including the Hermes Ω target.
"""
import pytest
from unittest.mock import Mock, patch
# -----------------------------------------------------------------------------
# Conversation Loop Invariants
# -----------------------------------------------------------------------------
class TestConversationLoopInvariants:
"""
Invariants for AIAgent.run_conversation and its successors.
"""
def test_deterministic_tool_ordering(self):
"""
Given the same message history and available tools,
the agent must produce the same tool_call ordering.
(If non-determinism is introduced by temperature > 0,
this becomes a statistical test.)
"""
pytest.skip("TODO: implement with seeded mock model responses")
def test_tool_result_always_appended_to_history(self):
"""
After any tool_call is executed, its result MUST appear
in the conversation history before the next assistant turn.
"""
pytest.skip("TODO: mock model with forced tool_call and verify history")
def test_iteration_budget_never_exceeded(self):
"""
The loop must terminate before api_call_count >= max_iterations
AND before iteration_budget.remaining <= 0.
"""
pytest.skip("TODO: mock model to always return tool_calls; verify termination")
def test_system_prompt_presence(self):
"""
Every API call must include a system message as the first message
(or system parameter for providers that support it).
"""
pytest.skip("TODO: intercept all client.chat.completions.create calls")
def test_compression_preserves_last_n_messages(self):
"""
After context compression, the final N messages (configurable,
default ~4) must remain uncompressed to preserve local context.
"""
pytest.skip("TODO: create history > threshold, compress, verify tail")
# -----------------------------------------------------------------------------
# Tool Registry Invariants
# -----------------------------------------------------------------------------
class TestToolRegistryInvariants:
"""
Invariants for tools.registry.Registry.
"""
def test_register_then_list_contains_tool(self):
"""
After register() is called with a valid schema and handler,
list_tools() must include the registered name.
"""
pytest.skip("TODO: instantiate fresh Registry, register, assert membership")
def test_dispatch_unknown_tool_returns_error_json(self):
"""
Calling dispatch() with an unregistered tool name must return
a JSON string containing an error key, never raise raw.
"""
pytest.skip("TODO: call dispatch with 'nonexistent_tool', parse result")
def test_handler_receives_task_id_kwarg(self):
"""
Registered handlers that accept **kwargs must receive task_id
when dispatch is called with one.
"""
pytest.skip("TODO: register mock handler, dispatch with task_id, verify")
# -----------------------------------------------------------------------------
# State Persistence Invariants
# -----------------------------------------------------------------------------
class TestStatePersistenceInvariants:
"""
Invariants for hermes_state.SessionDB.
"""
def test_saved_message_is_retrievable_by_session_id(self):
"""
After save_message(session_id, ...), get_messages(session_id)
must return the message.
"""
pytest.skip("TODO: use temp SQLite DB, save, query, assert")
def test_fts_search_returns_relevant_messages(self):
"""
After indexing messages, FTS search for a unique keyword
must return the message containing it.
"""
pytest.skip("TODO: seed DB with messages, search unique token")
# -----------------------------------------------------------------------------
# Context Compressor Invariants
# -----------------------------------------------------------------------------
class TestContextCompressorInvariants:
"""
Invariants for agent.context_compressor.ContextCompressor.
"""
def test_compression_reduces_token_count(self):
"""
compress_messages(output) must have fewer tokens than
the uncompressed input (for any input > threshold).
"""
pytest.skip("TODO: mock tokenizer, provide long history, assert reduction")
def test_compression_never_drops_system_message(self):
"""
The system message must survive compression and remain
at index 0 of the returned message list.
"""
pytest.skip("TODO: compress history with system msg, verify position")

191
scripts/forge.py Normal file
View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""
forge.py — The Forge: competing sub-agent rewrite pipeline.
Part of EPIC-999 Phase II.
Given a target module, spawn N sub-agents to rewrite it independently.
An Arbiter evaluates each candidate on:
1. Test pass rate
2. SLOC reduction (or bounded increase)
3. Cyclomatic complexity
4. API surface stability (diff against original public interface)
The winner is promoted to the integration branch.
"""
import argparse
import json
import os
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict
@dataclass
class RewriteCandidate:
agent_name: str
branch_name: str
module_path: Path
temp_dir: Path
test_pass_rate: float = 0.0
sloc_delta: int = 0
complexity_score: float = 0.0
api_surface_score: float = 0.0
total_score: float = 0.0
logs: List[str] = field(default_factory=list)
class ForgePipeline:
"""Competing rewrite pipeline for clean-room module evolution."""
def __init__(self, repo_path: Path, target_module: str, agents: List[str]):
self.repo_path = repo_path.resolve()
self.target_module = target_module
self.agents = agents
self.work_dir = Path(tempfile.mkdtemp(prefix="forge_"))
self.candidates: List[RewriteCandidate] = []
def _spawn_agent_rewrite(self, agent_name: str, index: int) -> RewriteCandidate:
"""Spawn a single sub-agent rewrite."""
branch_name = f"forge-{agent_name}-{int(time.time())}-{index}"
candidate_dir = self.work_dir / branch_name
candidate_dir.mkdir(parents=True, exist_ok=True)
# Copy repo into candidate workspace
subprocess.run(
["cp", "-r", str(self.repo_path), str(candidate_dir / "repo")],
check=True,
capture_output=True,
)
candidate = RewriteCandidate(
agent_name=agent_name,
branch_name=branch_name,
module_path=candidate_dir / "repo" / self.target_module,
temp_dir=candidate_dir,
)
# TODO: replace with actual sub-agent invocation via delegate_tool.py
# For now, we write a marker file so the pipeline structure is testable.
marker = candidate_dir / "rewrite.marker"
marker.write_text(
f"agent={agent_name}\n"
f"target={self.target_module}\n"
f"timestamp={time.time()}\n"
)
candidate.logs.append(f"Spawned {agent_name} in {branch_name}")
return candidate
def run_rewrites(self) -> List[RewriteCandidate]:
"""Run all competing rewrites in parallel."""
print(f"[Forge] Starting {len(self.agents)} competing rewrites for {self.target_module}")
for idx, agent in enumerate(self.agents):
candidate = self._spawn_agent_rewrite(agent, idx)
self.candidates.append(candidate)
print(f" -> {candidate.branch_name}")
return self.candidates
def evaluate_candidate(self, candidate: RewriteCandidate) -> RewriteCandidate:
"""Run test suite and metrics on a candidate."""
repo = candidate.temp_dir / "repo"
# SLOC calculation
try:
with open(candidate.module_path, "r", encoding="utf-8") as f:
candidate.sloc_delta = len(f.readlines())
except Exception as e:
candidate.logs.append(f"SLOC error: {e}")
# Test execution (best-effort; requires venv + deps)
test_cmd = [
sys.executable, "-m", "pytest",
"tests/", "-q", "--tb=short",
"-x",
]
try:
result = subprocess.run(
test_cmd,
cwd=repo,
capture_output=True,
text=True,
timeout=300,
)
# Naive pass-rate parsing
if "passed" in result.stdout:
parts = result.stdout.split(",")
passed = 0
total = 1
for part in parts:
if "passed" in part:
passed = int(part.strip().split()[0])
if "failed" in part or "error" in part:
total += int(part.strip().split()[0])
total = max(total, passed)
candidate.test_pass_rate = passed / total if total else 0.0
elif result.returncode == 0:
candidate.test_pass_rate = 1.0
else:
candidate.test_pass_rate = 0.0
candidate.logs.append(f"Tests: returncode={result.returncode}")
except Exception as e:
candidate.logs.append(f"Test error: {e}")
candidate.test_pass_rate = 0.0
# Scoring (naive weighted sum; will be refined)
candidate.total_score = (
candidate.test_pass_rate * 100.0
- max(candidate.sloc_delta - 500, 0) * 0.01 # penalty for bloat
)
return candidate
def arbitrate(self) -> RewriteCandidate:
"""Evaluate all candidates and return the winner."""
print("[Forge] Evaluating candidates...")
for candidate in self.candidates:
self.evaluate_candidate(candidate)
print(f" {candidate.agent_name}: tests={candidate.test_pass_rate:.2%} "
f"sloc={candidate.sloc_delta} score={candidate.total_score:.2f}")
winner = max(self.candidates, key=lambda c: c.total_score)
print(f"[Forge] Winner: {winner.agent_name} ({winner.branch_name}) "
f"score={winner.total_score:.2f}")
return winner
def promote_winner(self, winner: RewriteCandidate, integration_branch: str):
"""Promote the winning candidate to the integration branch."""
# TODO: git checkout -b integration_branch, copy winner module, commit, push
print(f"[Forge] Promoting {winner.branch_name} -> {integration_branch}")
marker = self.repo_path / "FORGE_WINNER.marker"
marker.write_text(
f"winner={winner.agent_name}\n"
f"branch={winner.branch_name}\n"
f"score={winner.total_score}\n"
)
def main():
parser = argparse.ArgumentParser(description="The Forge — competing rewrite pipeline")
parser.add_argument("--repo", required=True, help="Path to the target repo")
parser.add_argument("--module", required=True, help="Target module path (relative to repo)")
parser.add_argument("--agents", nargs="+", default=["allegro", "bezalel"],
help="Agent names to compete")
parser.add_argument("--integration-branch", default="forge-integration",
help="Branch to promote winner into")
args = parser.parse_args()
forge = ForgePipeline(
repo_path=Path(args.repo),
target_module=args.module,
agents=args.agents,
)
forge.run_rewrites()
winner = forge.arbitrate()
forge.promote_winner(winner, args.integration_branch)
if __name__ == "__main__":
main()