Files
hermes-agent/agent/claw_runtime.py
Ezra ab7fd52ae3
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 4s
Nix / nix (ubuntu-latest) (pull_request) Failing after 1s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 1s
Tests / test (pull_request) Failing after 4s
Tests / e2e (pull_request) Failing after 2s
Nix / nix (macos-latest) (pull_request) Has been cancelled
[EPIC-999] Phase II — The Forge: claw_runtime scaffold + forge pipeline
- agent/claw_runtime.py: 5-class decomposition of AIAgent (ConversationLoop, ModelDispatcher, ToolExecutor, MemoryInterceptor, PromptBuilder)
- scripts/forge.py: competing sub-agent rewrite pipeline with Arbiter scoring

Both are facades today; logic migrates incrementally from run_agent.py.

Authored-by: Ezra
2026-04-05 23:32:53 +00:00

159 lines
5.6 KiB
Python

"""
agent/claw_runtime.py — Claw Code runtime decomposition scaffold.
Part of EPIC-999 Phase II — The Forge.
This module introduces the 5-class decomposition of the monolithic AIAgent
to enable competing sub-agent rewrites and future runtime replacement.
Migration rule: each class begins as a thin facade over AIAgent methods.
Logic migrates incrementally from run_agent.py into these classes.
"""
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
class ModelResponse:
"""Normalized model response, independent of provider."""
def __init__(self, content: str = None, tool_calls: list = None, reasoning: str = None):
self.content = content or ""
self.tool_calls = tool_calls or []
self.reasoning = reasoning or ""
class ToolResult:
"""Normalized tool execution result."""
def __init__(self, tool_call_id: str, output: str, error: str = None):
self.tool_call_id = tool_call_id
self.output = output
self.error = error
class ConversationLoop:
"""
Owns the while-loop invariant: iteration budget, termination conditions,
and the high-level orchestration of turn-taking.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def run(
self,
messages: List[Dict[str, Any]],
tools: List[Dict[str, Any]],
system_message: str = None,
) -> Dict[str, Any]:
"""
Run the conversation until completion or budget exhaustion.
Invariant: must terminate before max_iterations and iteration_budget <= 0.
"""
# Facade: delegate to AIAgent.run_conversation for now.
return self.agent.run_conversation(
user_message=messages[-1]["content"] if messages else "",
system_message=system_message,
conversation_history=messages[:-1] if len(messages) > 1 else None,
)
class ModelDispatcher:
"""
Owns all interaction with the LLM client: streaming, fallback activation,
response normalization, and provider-specific quirks.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def call(self, model: str, messages: List[Dict], tools: List[Dict], **kwargs) -> ModelResponse:
"""
Dispatch a single API call and return a normalized response.
Invariant: always returns a ModelResponse with .content, .tool_calls, .reasoning.
"""
# Facade: will be populated with logic from AIAgent._interruptible_streaming_api_call
# and related normalization helpers.
raise NotImplementedError("ModelDispatcher.call() — migrate from AIAgent streaming logic")
class ToolExecutor:
"""
Owns tool execution: sequential vs concurrent dispatch, error wrapping,
and result formatting.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def execute(self, tool_calls: List[Any], task_id: str = None) -> List[ToolResult]:
"""
Execute a list of tool calls and return normalized results.
Invariant: every tool_call produces exactly one ToolResult.
"""
# Facade: delegate to AIAgent._execute_tool_calls_sequential / _concurrent
if hasattr(self.agent, "_execute_tool_calls_sequential"):
return self.agent._execute_tool_calls_sequential(tool_calls, task_id=task_id)
raise NotImplementedError("ToolExecutor.execute() — migrate from AIAgent tool execution")
class MemoryInterceptor:
"""
Intercepts agent-level tools (memory, todo) before they reach the global registry.
Also handles flush-on-exit for pending memories.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def intercept(self, tool_name: str, args: Dict[str, Any], task_id: str = None) -> Optional[str]:
"""
If the tool_name is 'memory' or 'todo', handle it directly and return the result.
Otherwise return None to signal pass-through to the ToolExecutor.
Invariant: must not mutate agent state except through explicit flush().
"""
# Facade: will be populated with logic from run_agent.py memory/todo interception.
if tool_name in ("memory", "todo"):
# Placeholder: actual migration will move the interception block here.
return None
return None
def flush(self):
"""Flush any pending memories to persistent storage."""
if hasattr(self.agent, "flush_memories"):
self.agent.flush_memories()
class PromptBuilder:
"""
Owns system prompt assembly, skill injection, context compression,
and prompt caching marker placement.
"""
def __init__(self, agent: "AIAgent"):
self.agent = agent
def build(
self,
user_message: str,
conversation_history: List[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""
Build the full message list for the API call.
Invariant: output list must start with a system message (or equivalent).
"""
# Facade: delegate to AIAgent._build_system_prompt and related helpers.
if hasattr(self.agent, "_build_system_prompt"):
system_msg = self.agent._build_system_prompt(user_message)
messages = []
if system_msg:
messages.append({"role": "system", "content": system_msg})
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})
return messages
raise NotImplementedError("PromptBuilder.build() — migrate from AIAgent prompt assembly")