From ab7fd52ae3fe5124dde7ee17c86324da54c23679 Mon Sep 17 00:00:00 2001 From: Ezra Date: Sun, 5 Apr 2026 23:32:53 +0000 Subject: [PATCH] =?UTF-8?q?[EPIC-999]=20Phase=20II=20=E2=80=94=20The=20For?= =?UTF-8?q?ge:=20claw=5Fruntime=20scaffold=20+=20forge=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - agent/claw_runtime.py: 5-class decomposition of AIAgent (ConversationLoop, ModelDispatcher, ToolExecutor, MemoryInterceptor, PromptBuilder) - scripts/forge.py: competing sub-agent rewrite pipeline with Arbiter scoring Both are facades today; logic migrates incrementally from run_agent.py. Authored-by: Ezra --- agent/claw_runtime.py | 158 ++++++++++++++++++++++++++++++++++ scripts/forge.py | 191 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 agent/claw_runtime.py create mode 100644 scripts/forge.py diff --git a/agent/claw_runtime.py b/agent/claw_runtime.py new file mode 100644 index 000000000..1e156053c --- /dev/null +++ b/agent/claw_runtime.py @@ -0,0 +1,158 @@ +""" +agent/claw_runtime.py — Claw Code runtime decomposition scaffold. +Part of EPIC-999 Phase II — The Forge. + +This module introduces the 5-class decomposition of the monolithic AIAgent +to enable competing sub-agent rewrites and future runtime replacement. + +Migration rule: each class begins as a thin facade over AIAgent methods. +Logic migrates incrementally from run_agent.py into these classes. +""" + +from typing import List, Dict, Any, Optional, Callable +from dataclasses import dataclass + + +class ModelResponse: + """Normalized model response, independent of provider.""" + def __init__(self, content: str = None, tool_calls: list = None, reasoning: str = None): + self.content = content or "" + self.tool_calls = tool_calls or [] + self.reasoning = reasoning or "" + + +class ToolResult: + """Normalized tool execution result.""" + def __init__(self, tool_call_id: str, output: str, error: str = None): + self.tool_call_id = tool_call_id + self.output = output + self.error = error + + +class ConversationLoop: + """ + Owns the while-loop invariant: iteration budget, termination conditions, + and the high-level orchestration of turn-taking. + """ + + def __init__(self, agent: "AIAgent"): + self.agent = agent + + def run( + self, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]], + system_message: str = None, + ) -> Dict[str, Any]: + """ + Run the conversation until completion or budget exhaustion. + + Invariant: must terminate before max_iterations and iteration_budget <= 0. + """ + # Facade: delegate to AIAgent.run_conversation for now. + return self.agent.run_conversation( + user_message=messages[-1]["content"] if messages else "", + system_message=system_message, + conversation_history=messages[:-1] if len(messages) > 1 else None, + ) + + +class ModelDispatcher: + """ + Owns all interaction with the LLM client: streaming, fallback activation, + response normalization, and provider-specific quirks. + """ + + def __init__(self, agent: "AIAgent"): + self.agent = agent + + def call(self, model: str, messages: List[Dict], tools: List[Dict], **kwargs) -> ModelResponse: + """ + Dispatch a single API call and return a normalized response. + + Invariant: always returns a ModelResponse with .content, .tool_calls, .reasoning. + """ + # Facade: will be populated with logic from AIAgent._interruptible_streaming_api_call + # and related normalization helpers. + raise NotImplementedError("ModelDispatcher.call() — migrate from AIAgent streaming logic") + + +class ToolExecutor: + """ + Owns tool execution: sequential vs concurrent dispatch, error wrapping, + and result formatting. + """ + + def __init__(self, agent: "AIAgent"): + self.agent = agent + + def execute(self, tool_calls: List[Any], task_id: str = None) -> List[ToolResult]: + """ + Execute a list of tool calls and return normalized results. + + Invariant: every tool_call produces exactly one ToolResult. + """ + # Facade: delegate to AIAgent._execute_tool_calls_sequential / _concurrent + if hasattr(self.agent, "_execute_tool_calls_sequential"): + return self.agent._execute_tool_calls_sequential(tool_calls, task_id=task_id) + raise NotImplementedError("ToolExecutor.execute() — migrate from AIAgent tool execution") + + +class MemoryInterceptor: + """ + Intercepts agent-level tools (memory, todo) before they reach the global registry. + Also handles flush-on-exit for pending memories. + """ + + def __init__(self, agent: "AIAgent"): + self.agent = agent + + def intercept(self, tool_name: str, args: Dict[str, Any], task_id: str = None) -> Optional[str]: + """ + If the tool_name is 'memory' or 'todo', handle it directly and return the result. + Otherwise return None to signal pass-through to the ToolExecutor. + + Invariant: must not mutate agent state except through explicit flush(). + """ + # Facade: will be populated with logic from run_agent.py memory/todo interception. + if tool_name in ("memory", "todo"): + # Placeholder: actual migration will move the interception block here. + return None + return None + + def flush(self): + """Flush any pending memories to persistent storage.""" + if hasattr(self.agent, "flush_memories"): + self.agent.flush_memories() + + +class PromptBuilder: + """ + Owns system prompt assembly, skill injection, context compression, + and prompt caching marker placement. + """ + + def __init__(self, agent: "AIAgent"): + self.agent = agent + + def build( + self, + user_message: str, + conversation_history: List[Dict[str, Any]] = None, + ) -> List[Dict[str, Any]]: + """ + Build the full message list for the API call. + + Invariant: output list must start with a system message (or equivalent). + """ + # Facade: delegate to AIAgent._build_system_prompt and related helpers. + if hasattr(self.agent, "_build_system_prompt"): + system_msg = self.agent._build_system_prompt(user_message) + messages = [] + if system_msg: + messages.append({"role": "system", "content": system_msg}) + if conversation_history: + messages.extend(conversation_history) + messages.append({"role": "user", "content": user_message}) + return messages + raise NotImplementedError("PromptBuilder.build() — migrate from AIAgent prompt assembly") diff --git a/scripts/forge.py b/scripts/forge.py new file mode 100644 index 000000000..4f2d3f504 --- /dev/null +++ b/scripts/forge.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +""" +forge.py — The Forge: competing sub-agent rewrite pipeline. +Part of EPIC-999 Phase II. + +Given a target module, spawn N sub-agents to rewrite it independently. +An Arbiter evaluates each candidate on: + 1. Test pass rate + 2. SLOC reduction (or bounded increase) + 3. Cyclomatic complexity + 4. API surface stability (diff against original public interface) + +The winner is promoted to the integration branch. +""" + +import argparse +import json +import os +import subprocess +import sys +import tempfile +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Dict + + +@dataclass +class RewriteCandidate: + agent_name: str + branch_name: str + module_path: Path + temp_dir: Path + test_pass_rate: float = 0.0 + sloc_delta: int = 0 + complexity_score: float = 0.0 + api_surface_score: float = 0.0 + total_score: float = 0.0 + logs: List[str] = field(default_factory=list) + + +class ForgePipeline: + """Competing rewrite pipeline for clean-room module evolution.""" + + def __init__(self, repo_path: Path, target_module: str, agents: List[str]): + self.repo_path = repo_path.resolve() + self.target_module = target_module + self.agents = agents + self.work_dir = Path(tempfile.mkdtemp(prefix="forge_")) + self.candidates: List[RewriteCandidate] = [] + + def _spawn_agent_rewrite(self, agent_name: str, index: int) -> RewriteCandidate: + """Spawn a single sub-agent rewrite.""" + branch_name = f"forge-{agent_name}-{int(time.time())}-{index}" + candidate_dir = self.work_dir / branch_name + candidate_dir.mkdir(parents=True, exist_ok=True) + + # Copy repo into candidate workspace + subprocess.run( + ["cp", "-r", str(self.repo_path), str(candidate_dir / "repo")], + check=True, + capture_output=True, + ) + + candidate = RewriteCandidate( + agent_name=agent_name, + branch_name=branch_name, + module_path=candidate_dir / "repo" / self.target_module, + temp_dir=candidate_dir, + ) + + # TODO: replace with actual sub-agent invocation via delegate_tool.py + # For now, we write a marker file so the pipeline structure is testable. + marker = candidate_dir / "rewrite.marker" + marker.write_text( + f"agent={agent_name}\n" + f"target={self.target_module}\n" + f"timestamp={time.time()}\n" + ) + candidate.logs.append(f"Spawned {agent_name} in {branch_name}") + return candidate + + def run_rewrites(self) -> List[RewriteCandidate]: + """Run all competing rewrites in parallel.""" + print(f"[Forge] Starting {len(self.agents)} competing rewrites for {self.target_module}") + for idx, agent in enumerate(self.agents): + candidate = self._spawn_agent_rewrite(agent, idx) + self.candidates.append(candidate) + print(f" -> {candidate.branch_name}") + return self.candidates + + def evaluate_candidate(self, candidate: RewriteCandidate) -> RewriteCandidate: + """Run test suite and metrics on a candidate.""" + repo = candidate.temp_dir / "repo" + + # SLOC calculation + try: + with open(candidate.module_path, "r", encoding="utf-8") as f: + candidate.sloc_delta = len(f.readlines()) + except Exception as e: + candidate.logs.append(f"SLOC error: {e}") + + # Test execution (best-effort; requires venv + deps) + test_cmd = [ + sys.executable, "-m", "pytest", + "tests/", "-q", "--tb=short", + "-x", + ] + try: + result = subprocess.run( + test_cmd, + cwd=repo, + capture_output=True, + text=True, + timeout=300, + ) + # Naive pass-rate parsing + if "passed" in result.stdout: + parts = result.stdout.split(",") + passed = 0 + total = 1 + for part in parts: + if "passed" in part: + passed = int(part.strip().split()[0]) + if "failed" in part or "error" in part: + total += int(part.strip().split()[0]) + total = max(total, passed) + candidate.test_pass_rate = passed / total if total else 0.0 + elif result.returncode == 0: + candidate.test_pass_rate = 1.0 + else: + candidate.test_pass_rate = 0.0 + candidate.logs.append(f"Tests: returncode={result.returncode}") + except Exception as e: + candidate.logs.append(f"Test error: {e}") + candidate.test_pass_rate = 0.0 + + # Scoring (naive weighted sum; will be refined) + candidate.total_score = ( + candidate.test_pass_rate * 100.0 + - max(candidate.sloc_delta - 500, 0) * 0.01 # penalty for bloat + ) + return candidate + + def arbitrate(self) -> RewriteCandidate: + """Evaluate all candidates and return the winner.""" + print("[Forge] Evaluating candidates...") + for candidate in self.candidates: + self.evaluate_candidate(candidate) + print(f" {candidate.agent_name}: tests={candidate.test_pass_rate:.2%} " + f"sloc={candidate.sloc_delta} score={candidate.total_score:.2f}") + + winner = max(self.candidates, key=lambda c: c.total_score) + print(f"[Forge] Winner: {winner.agent_name} ({winner.branch_name}) " + f"score={winner.total_score:.2f}") + return winner + + def promote_winner(self, winner: RewriteCandidate, integration_branch: str): + """Promote the winning candidate to the integration branch.""" + # TODO: git checkout -b integration_branch, copy winner module, commit, push + print(f"[Forge] Promoting {winner.branch_name} -> {integration_branch}") + marker = self.repo_path / "FORGE_WINNER.marker" + marker.write_text( + f"winner={winner.agent_name}\n" + f"branch={winner.branch_name}\n" + f"score={winner.total_score}\n" + ) + + +def main(): + parser = argparse.ArgumentParser(description="The Forge — competing rewrite pipeline") + parser.add_argument("--repo", required=True, help="Path to the target repo") + parser.add_argument("--module", required=True, help="Target module path (relative to repo)") + parser.add_argument("--agents", nargs="+", default=["allegro", "bezalel"], + help="Agent names to compete") + parser.add_argument("--integration-branch", default="forge-integration", + help="Branch to promote winner into") + args = parser.parse_args() + + forge = ForgePipeline( + repo_path=Path(args.repo), + target_module=args.module, + agents=args.agents, + ) + forge.run_rewrites() + winner = forge.arbitrate() + forge.promote_winner(winner, args.integration_branch) + + +if __name__ == "__main__": + main()