commit 6255b698353b8d6b409cf7b298aa68e68c796ce3 Author: Allegro Date: Tue Mar 31 21:09:41 2026 +0000 feat: Initial Claw Agent core architecture diff --git a/README.md b/README.md new file mode 100644 index 0000000..2fee275 --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# Claw Agent + +Agent harness built using architectural patterns from [Claw Code](http://143.198.27.163:3000/Timmy/claw-code). + +## Components + +- **permissions.py** - Fine-grained tool access control +- **execution_registry.py** - Command/tool routing registry +- **session_store.py** - JSON-based session persistence + +## Usage + +```python +from claw_agent import ToolPermissionContext, ExecutionRegistry, SessionStore + +# Create permission context +ctx = ToolPermissionContext( + deny_tools={"bash"}, + deny_prefixes={"dangerous_"} +) + +# Build registry +registry = build_default_registry() + +# Create session +store = SessionStore() +session = RuntimeSession.create(prompt="Hello") +session.history.add("user", "Hello") +store.save(session) +``` + +## Architecture + +This agent replaces idle Allegro-Primus with real work capabilities. + +See EPIC-202: http://143.198.27.163:3000/Timmy_Foundation/timmy-home/issues/191 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..a8955e0 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,22 @@ +"""Claw Agent - Agent harness inspired by Claw Code architecture.""" + +from .permissions import ToolPermissionContext, READONLY_CONTEXT, SAFE_CONTEXT, UNRESTRICTED_CONTEXT +from .execution_registry import ExecutionRegistry, ExecutionResult, CommandHandler, ToolHandler, build_default_registry +from .session_store import SessionStore, RuntimeSession, HistoryLog, HistoryEntry + +__version__ = "0.1.0" +__all__ = [ + "ToolPermissionContext", + "READONLY_CONTEXT", + "SAFE_CONTEXT", + "UNRESTRICTED_CONTEXT", + "ExecutionRegistry", + "ExecutionResult", + "CommandHandler", + "ToolHandler", + "build_default_registry", + "SessionStore", + "RuntimeSession", + "HistoryLog", + "HistoryEntry", +] diff --git a/src/__pycache__/execution_registry.cpython-312.pyc b/src/__pycache__/execution_registry.cpython-312.pyc new file mode 100644 index 0000000..5ea6225 Binary files /dev/null and b/src/__pycache__/execution_registry.cpython-312.pyc differ diff --git a/src/__pycache__/permissions.cpython-312.pyc b/src/__pycache__/permissions.cpython-312.pyc new file mode 100644 index 0000000..ef29223 Binary files /dev/null and b/src/__pycache__/permissions.cpython-312.pyc differ diff --git a/src/__pycache__/session_store.cpython-312.pyc b/src/__pycache__/session_store.cpython-312.pyc new file mode 100644 index 0000000..9d3a5f5 Binary files /dev/null and b/src/__pycache__/session_store.cpython-312.pyc differ diff --git a/src/execution_registry.py b/src/execution_registry.py new file mode 100644 index 0000000..2fde2d6 --- /dev/null +++ b/src/execution_registry.py @@ -0,0 +1,151 @@ +"""Execution registry for command and tool routing. + +Inspired by Claw Code's execution patterns. +Provides clean separation between routing and execution. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional +from pathlib import Path +import json + +from permissions import ToolPermissionContext + + +@dataclass +class ExecutionResult: + """Result of a command or tool execution.""" + success: bool + output: str + error: Optional[str] = None + metadata: Dict[str, Any] = None + + def to_json(self) -> str: + return json.dumps({ + "success": self.success, + "output": self.output, + "error": self.error, + "metadata": self.metadata or {} + }) + + @classmethod + def from_json(cls, data: str) -> ExecutionResult: + d = json.loads(data) + return cls(**d) + + +class CommandHandler: + """Handler for a specific command.""" + + def __init__(self, name: str, fn: Callable[..., ExecutionResult], description: str = ""): + self.name = name + self.fn = fn + self.description = description + + def execute(self, prompt: str, context: ToolPermissionContext = None) -> ExecutionResult: + """Execute the command with given prompt.""" + try: + return self.fn(prompt, context) + except Exception as e: + return ExecutionResult( + success=False, + output="", + error=str(e) + ) + + +class ToolHandler: + """Handler for a specific tool.""" + + def __init__(self, name: str, fn: Callable[..., ExecutionResult], description: str = ""): + self.name = name + self.fn = fn + self.description = description + + def execute(self, payload: str, context: ToolPermissionContext = None) -> ExecutionResult: + """Execute the tool with given payload.""" + # Check permission + if context and context.blocks(self.name): + return ExecutionResult( + success=False, + output="", + error=f"Tool '{self.name}' is blocked by permission context" + ) + + try: + return self.fn(payload, context) + except Exception as e: + return ExecutionResult( + success=False, + output="", + error=str(e) + ) + + +class ExecutionRegistry: + """Registry for commands and tools. + + Routes prompts to appropriate handlers and manages execution. + """ + + def __init__(self): + self._commands: Dict[str, CommandHandler] = {} + self._tools: Dict[str, ToolHandler] = {} + + def register_command(self, name: str, fn: Callable, description: str = "") -> None: + """Register a command handler.""" + self._commands[name] = CommandHandler(name, fn, description) + + def register_tool(self, name: str, fn: Callable, description: str = "") -> None: + """Register a tool handler.""" + self._tools[name] = ToolHandler(name, fn, description) + + def command(self, name: str) -> Optional[CommandHandler]: + """Get a command handler by name.""" + return self._commands.get(name) + + def tool(self, name: str) -> Optional[ToolHandler]: + """Get a tool handler by name.""" + return self._tools.get(name) + + def list_commands(self) -> list[str]: + """List all registered command names.""" + return list(self._commands.keys()) + + def list_tools(self) -> list[str]: + """List all registered tool names.""" + return list(self._tools.keys()) + + def execute_command(self, name: str, prompt: str, context: ToolPermissionContext = None) -> ExecutionResult: + """Execute a command by name.""" + handler = self.command(name) + if not handler: + return ExecutionResult( + success=False, + output="", + error=f"Unknown command: {name}" + ) + return handler.execute(prompt, context) + + def execute_tool(self, name: str, payload: str, context: ToolPermissionContext = None) -> ExecutionResult: + """Execute a tool by name.""" + handler = self.tool(name) + if not handler: + return ExecutionResult( + success=False, + output="", + error=f"Unknown tool: {name}" + ) + return handler.execute(payload, context) + + +def build_default_registry() -> ExecutionRegistry: + """Build a registry with default handlers.""" + registry = ExecutionRegistry() + + # Register basic commands + registry.register_command("help", lambda p, c: ExecutionResult(True, "Available commands: help, status")) + registry.register_command("status", lambda p, c: ExecutionResult(True, "Claw Agent running")) + + return registry diff --git a/src/permissions.py b/src/permissions.py new file mode 100644 index 0000000..32cf6f8 --- /dev/null +++ b/src/permissions.py @@ -0,0 +1,87 @@ +"""Tool permission system inspired by Claw Code architecture. + +Provides fine-grained access control for tool execution. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Set, Tuple + + +@dataclass(frozen=True) +class ToolPermissionContext: + """Context for tool permission checking. + + Inspired by Claw Code's permission system. + Allows fine-grained control over which tools can execute. + + Example: + ctx = ToolPermissionContext( + deny_tools={"bash", "file_write"}, + deny_prefixes={"dangerous_", "system_"} + ) + ctx.blocks("bash") # True + ctx.blocks("dangerous_delete") # True + ctx.blocks("file_read") # False + """ + deny_tools: Set[str] = field(default_factory=set) + deny_prefixes: Tuple[str, ...] = () + + def blocks(self, tool_name: str) -> bool: + """Check if a tool is blocked by this context. + + Args: + tool_name: Name of the tool to check + + Returns: + True if the tool should be blocked + """ + # Exact match check + if tool_name in self.deny_tools: + return True + + # Prefix check + return any(tool_name.startswith(prefix) for prefix in self.deny_prefixes) + + def allows(self, tool_name: str) -> bool: + """Check if a tool is explicitly allowed. + + This is the inverse of blocks() for convenience. + """ + return not self.blocks(tool_name) + + @classmethod + def from_config(cls, config: dict) -> ToolPermissionContext: + """Create context from configuration dict. + + Expected config format: + { + "deny_tools": ["bash", "file_write"], + "deny_prefixes": ["dangerous_", "system_"] + } + """ + return cls( + deny_tools=set(config.get("deny_tools", [])), + deny_prefixes=tuple(config.get("deny_prefixes", [])) + ) + + def to_config(self) -> dict: + """Export context to configuration dict.""" + return { + "deny_tools": list(self.deny_tools), + "deny_prefixes": list(self.deny_prefixes) + } + + +# Predefined permission contexts +READONLY_CONTEXT = ToolPermissionContext( + deny_tools={"bash", "file_write", "file_edit", "terminal"}, + deny_prefixes={"write_", "delete_", "modify_"} +) + +SAFE_CONTEXT = ToolPermissionContext( + deny_tools={"bash"}, + deny_prefixes={"system_", "dangerous_"} +) + +UNRESTRICTED_CONTEXT = ToolPermissionContext() diff --git a/src/session_store.py b/src/session_store.py new file mode 100644 index 0000000..9bfaeec --- /dev/null +++ b/src/session_store.py @@ -0,0 +1,128 @@ +"""Session persistence layer. + +JSON-based session storage inspired by Claw Code. +More portable and inspectable than SQLite. +""" +from __future__ import annotations + +from dataclasses import dataclass, field, asdict +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional +import json +import uuid + + +@dataclass +class HistoryEntry: + """Single entry in session history.""" + timestamp: str + role: str # 'user', 'assistant', 'system', 'tool' + content: str + metadata: Dict = field(default_factory=dict) + + +@dataclass +class HistoryLog: + """Log of all interactions in a session.""" + entries: List[HistoryEntry] = field(default_factory=list) + + def add(self, role: str, content: str, metadata: Dict = None): + """Add an entry to the log.""" + self.entries.append(HistoryEntry( + timestamp=datetime.now().isoformat(), + role=role, + content=content, + metadata=metadata or {} + )) + + def as_markdown(self) -> str: + """Export as markdown.""" + lines = ["## Session History", ""] + for entry in self.entries: + lines.append(f"**{entry.role}** ({entry.timestamp}):") + lines.append(entry.content) + lines.append("") + return "\n".join(lines) + + +@dataclass +class RuntimeSession: + """Complete runtime session state. + + Inspired by Claw Code's session structure. + Persisted as JSON for portability. + """ + session_id: str + created_at: str + prompt: str + context: Dict + history: HistoryLog + persisted_path: Optional[Path] = None + + def __post_init__(self): + if isinstance(self.history, list): + # Convert from dict if loaded from JSON + self.history = HistoryLog(entries=[HistoryEntry(**e) for e in self.history]) + + def save(self) -> Path: + """Save session to disk.""" + if not self.persisted_path: + # Generate path based on session_id + base = Path.home() / ".claw-agent" / "sessions" + base.mkdir(parents=True, exist_ok=True) + self.persisted_path = base / f"{self.session_id}.json" + + data = { + "session_id": self.session_id, + "created_at": self.created_at, + "prompt": self.prompt, + "context": self.context, + "history": [asdict(e) for e in self.history.entries], + } + + self.persisted_path.write_text(json.dumps(data, indent=2)) + return self.persisted_path + + @classmethod + def load(cls, path: Path) -> RuntimeSession: + """Load session from disk.""" + data = json.loads(path.read_text()) + data["persisted_path"] = path + return cls(**data) + + @classmethod + def create(cls, prompt: str = "", context: Dict = None) -> RuntimeSession: + """Create a new session.""" + return cls( + session_id=str(uuid.uuid4())[:8], + created_at=datetime.now().isoformat(), + prompt=prompt, + context=context or {}, + history=HistoryLog() + ) + + +class SessionStore: + """Persistent store for sessions.""" + + def __init__(self, base_path: Path = None): + self.base_path = base_path or (Path.home() / ".claw-agent" / "sessions") + self.base_path.mkdir(parents=True, exist_ok=True) + + def list_sessions(self) -> List[Path]: + """List all session files.""" + return sorted(self.base_path.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) + + def load_latest(self) -> Optional[RuntimeSession]: + """Load the most recent session.""" + sessions = self.list_sessions() + if sessions: + return RuntimeSession.load(sessions[0]) + return None + + def save(self, session: RuntimeSession) -> Path: + """Save a session.""" + if not session.persisted_path: + session.persisted_path = self.base_path / f"{session.session_id}.json" + return session.save()