diff --git a/src/dashboard/app.py b/src/dashboard/app.py
index 46be48a5..e4dcc899 100644
--- a/src/dashboard/app.py
+++ b/src/dashboard/app.py
@@ -36,6 +36,7 @@ from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.scripture import router as scripture_router
from dashboard.routes.self_coding import router as self_coding_router
+from dashboard.routes.hands import router as hands_router
from router.api import router as cascade_router
logging.basicConfig(
@@ -201,6 +202,7 @@ app.include_router(work_orders_router)
app.include_router(tasks_router)
app.include_router(scripture_router)
app.include_router(self_coding_router)
+app.include_router(hands_router)
app.include_router(cascade_router)
diff --git a/src/dashboard/routes/hands.py b/src/dashboard/routes/hands.py
new file mode 100644
index 00000000..7e2bf1ce
--- /dev/null
+++ b/src/dashboard/routes/hands.py
@@ -0,0 +1,325 @@
+"""Hands Dashboard Routes.
+
+API endpoints and HTMX views for managing autonomous Hands:
+- Hand status and control
+- Approval queue management
+- Execution history
+- Manual triggering
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Optional
+
+from fastapi import APIRouter, Form, Request
+from fastapi.responses import HTMLResponse, JSONResponse
+
+from hands import HandRegistry, HandRunner, HandScheduler
+from hands.models import HandConfig, HandStatus, TriggerType
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/hands", tags=["hands"])
+
+# Global instances (would be properly injected in production)
+_registry: Optional[HandRegistry] = None
+_scheduler: Optional[HandScheduler] = None
+_runner: Optional[HandRunner] = None
+
+
+def get_registry() -> HandRegistry:
+ """Get or create HandRegistry singleton."""
+ global _registry
+ if _registry is None:
+ _registry = HandRegistry()
+ return _registry
+
+
+def get_scheduler() -> HandScheduler:
+ """Get or create HandScheduler singleton."""
+ global _scheduler
+ if _scheduler is None:
+ _scheduler = HandScheduler(get_registry())
+ return _scheduler
+
+
+def get_runner() -> HandRunner:
+ """Get or create HandRunner singleton."""
+ global _runner
+ if _runner is None:
+ _runner = HandRunner(get_registry())
+ return _runner
+
+
+# ── API Endpoints ─────────────────────────────────────────────────────────
+
+@router.get("/api/hands")
+async def api_list_hands():
+ """List all Hands with their status."""
+ registry = get_registry()
+
+ hands = []
+ for hand in registry.list_hands():
+ state = registry.get_state(hand.name)
+ hands.append({
+ "name": hand.name,
+ "description": hand.description,
+ "enabled": hand.enabled,
+ "status": state.status.value,
+ "schedule": hand.schedule.cron if hand.schedule else None,
+ "last_run": state.last_run.isoformat() if state.last_run else None,
+ "next_run": state.next_run.isoformat() if state.next_run else None,
+ "run_count": state.run_count,
+ })
+
+ return hands
+
+
+@router.get("/api/hands/{name}")
+async def api_get_hand(name: str):
+ """Get detailed information about a Hand."""
+ registry = get_registry()
+
+ try:
+ hand = registry.get_hand(name)
+ state = registry.get_state(name)
+
+ return {
+ "name": hand.name,
+ "description": hand.description,
+ "enabled": hand.enabled,
+ "version": hand.version,
+ "author": hand.author,
+ "status": state.status.value,
+ "schedule": {
+ "cron": hand.schedule.cron if hand.schedule else None,
+ "timezone": hand.schedule.timezone if hand.schedule else "UTC",
+ },
+ "tools": {
+ "required": hand.tools_required,
+ "optional": hand.tools_optional,
+ },
+ "approval_gates": [
+ {"action": g.action, "description": g.description}
+ for g in hand.approval_gates
+ ],
+ "output": {
+ "dashboard": hand.output.dashboard,
+ "channel": hand.output.channel,
+ "format": hand.output.format,
+ },
+ "state": state.to_dict(),
+ }
+
+ except Exception as e:
+ return JSONResponse(
+ status_code=404,
+ content={"error": f"Hand not found: {name}"},
+ )
+
+
+@router.post("/api/hands/{name}/trigger")
+async def api_trigger_hand(name: str):
+ """Manually trigger a Hand to run."""
+ scheduler = get_scheduler()
+
+ success = await scheduler.trigger_hand_now(name)
+
+ if success:
+ return {"success": True, "message": f"Hand {name} triggered"}
+ else:
+ return JSONResponse(
+ status_code=500,
+ content={"success": False, "error": f"Failed to trigger Hand {name}"},
+ )
+
+
+@router.post("/api/hands/{name}/pause")
+async def api_pause_hand(name: str):
+ """Pause a scheduled Hand."""
+ scheduler = get_scheduler()
+
+ if scheduler.pause_hand(name):
+ return {"success": True, "message": f"Hand {name} paused"}
+ else:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "error": f"Failed to pause Hand {name}"},
+ )
+
+
+@router.post("/api/hands/{name}/resume")
+async def api_resume_hand(name: str):
+ """Resume a paused Hand."""
+ scheduler = get_scheduler()
+
+ if scheduler.resume_hand(name):
+ return {"success": True, "message": f"Hand {name} resumed"}
+ else:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "error": f"Failed to resume Hand {name}"},
+ )
+
+
+@router.get("/api/approvals")
+async def api_get_pending_approvals():
+ """Get all pending approval requests."""
+ registry = get_registry()
+
+ approvals = await registry.get_pending_approvals()
+
+ return [
+ {
+ "id": a.id,
+ "hand_name": a.hand_name,
+ "action": a.action,
+ "description": a.description,
+ "created_at": a.created_at.isoformat(),
+ "expires_at": a.expires_at.isoformat() if a.expires_at else None,
+ }
+ for a in approvals
+ ]
+
+
+@router.post("/api/approvals/{approval_id}/approve")
+async def api_approve_request(approval_id: str):
+ """Approve a pending request."""
+ registry = get_registry()
+
+ if await registry.resolve_approval(approval_id, approved=True):
+ return {"success": True, "message": "Request approved"}
+ else:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "error": "Failed to approve request"},
+ )
+
+
+@router.post("/api/approvals/{approval_id}/reject")
+async def api_reject_request(approval_id: str):
+ """Reject a pending request."""
+ registry = get_registry()
+
+ if await registry.resolve_approval(approval_id, approved=False):
+ return {"success": True, "message": "Request rejected"}
+ else:
+ return JSONResponse(
+ status_code=400,
+ content={"success": False, "error": "Failed to reject request"},
+ )
+
+
+@router.get("/api/executions")
+async def api_get_executions(hand_name: Optional[str] = None, limit: int = 50):
+ """Get recent Hand executions."""
+ registry = get_registry()
+
+ executions = await registry.get_recent_executions(hand_name, limit)
+
+ return executions
+
+
+# ── HTMX Page Routes ─────────────────────────────────────────────────────
+
+@router.get("", response_class=HTMLResponse)
+async def hands_page(request: Request):
+ """Main Hands dashboard page."""
+ from dashboard.app import templates
+
+ return templates.TemplateResponse(
+ "hands.html",
+ {
+ "request": request,
+ "title": "Hands",
+ },
+ )
+
+
+@router.get("/list", response_class=HTMLResponse)
+async def hands_list_partial(request: Request):
+ """HTMX partial for Hands list."""
+ from dashboard.app import templates
+
+ registry = get_registry()
+
+ hands_data = []
+ for hand in registry.list_hands():
+ state = registry.get_state(hand.name)
+ hands_data.append({
+ "config": hand,
+ "state": state,
+ })
+
+ return templates.TemplateResponse(
+ "partials/hands_list.html",
+ {
+ "request": request,
+ "hands": hands_data,
+ },
+ )
+
+
+@router.get("/approvals", response_class=HTMLResponse)
+async def approvals_partial(request: Request):
+ """HTMX partial for approval queue."""
+ from dashboard.app import templates
+
+ registry = get_registry()
+ approvals = await registry.get_pending_approvals()
+
+ return templates.TemplateResponse(
+ "partials/approvals_list.html",
+ {
+ "request": request,
+ "approvals": approvals,
+ },
+ )
+
+
+@router.get("/executions", response_class=HTMLResponse)
+async def executions_partial(request: Request, hand_name: Optional[str] = None):
+ """HTMX partial for execution history."""
+ from dashboard.app import templates
+
+ registry = get_registry()
+ executions = await registry.get_recent_executions(hand_name, limit=20)
+
+ return templates.TemplateResponse(
+ "partials/hand_executions.html",
+ {
+ "request": request,
+ "executions": executions,
+ "hand_name": hand_name,
+ },
+ )
+
+
+@router.get("/{name}/detail", response_class=HTMLResponse)
+async def hand_detail_partial(request: Request, name: str):
+ """HTMX partial for Hand detail."""
+ from dashboard.app import templates
+
+ registry = get_registry()
+
+ try:
+ hand = registry.get_hand(name)
+ state = registry.get_state(name)
+
+ return templates.TemplateResponse(
+ "partials/hand_detail.html",
+ {
+ "request": request,
+ "hand": hand,
+ "state": state,
+ },
+ )
+ except Exception:
+ return templates.TemplateResponse(
+ "partials/error.html",
+ {
+ "request": request,
+ "message": f"Hand {name} not found",
+ },
+ )
diff --git a/src/dashboard/templates/base.html b/src/dashboard/templates/base.html
index e43fa575..d96b7b77 100644
--- a/src/dashboard/templates/base.html
+++ b/src/dashboard/templates/base.html
@@ -41,6 +41,7 @@
ROUTER
UPGRADES
SELF-CODING
+ HANDS
WORK ORDERS
CREATIVE
MOBILE
@@ -73,6 +74,7 @@
MEMORY
WORK ORDERS
SELF-CODING
+ HANDS
CREATIVE
VOICE
MOBILE
diff --git a/src/dashboard/templates/hands.html b/src/dashboard/templates/hands.html
new file mode 100644
index 00000000..76efcbbb
--- /dev/null
+++ b/src/dashboard/templates/hands.html
@@ -0,0 +1,140 @@
+{% extends "base.html" %}
+
+{% block title %}Hands — Timmy Time{% endblock %}
+
+{% block content %}
+
+
+
+
+
Hands
+
Autonomous scheduled agents
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Hands are autonomous agents that run on schedules:
+
+ - 🔮 Oracle — Bitcoin intelligence
+ - 🔍 Scout — OSINT monitoring
+ - ✍️ Scribe — Content production
+ - 💰 Ledger — Treasury tracking
+ - 🔧 Forge — Model management
+ - 🎨 Weaver — Creative pipeline
+ - 🛡️ Sentinel — System health
+
+
+
+
+
+
+
+
+{% endblock %}
diff --git a/src/dashboard/templates/partials/approvals_list.html b/src/dashboard/templates/partials/approvals_list.html
new file mode 100644
index 00000000..b33de7ab
--- /dev/null
+++ b/src/dashboard/templates/partials/approvals_list.html
@@ -0,0 +1,44 @@
+{# Approvals list partial #}
+{% if approvals %}
+
+ {% for approval in approvals %}
+
+
+
+
{{ approval.hand_name }}
+ {{ approval.action }}
+
+
PENDING
+
+
+
{{ approval.description }}
+
+
+
+ {{ approval.created_at.strftime('%H:%M') if approval.created_at else 'Unknown' }}
+
+
+
+
+
+
+
+ {% endfor %}
+
+
+{% else %}
+
+
No pending approvals.
+
+
+{% endif %}
diff --git a/src/dashboard/templates/partials/hand_executions.html b/src/dashboard/templates/partials/hand_executions.html
new file mode 100644
index 00000000..1e064e7f
--- /dev/null
+++ b/src/dashboard/templates/partials/hand_executions.html
@@ -0,0 +1,38 @@
+{# Hand executions partial #}
+{% if executions %}
+
+ {% for exec in executions %}
+
+
+
+ {% if exec.outcome == 'success' %}
+ ✓
+ {% elif exec.outcome == 'failure' %}
+ ✗
+ {% elif exec.outcome == 'approval_pending' %}
+ ⏸
+ {% else %}
+ ○
+ {% endif %}
+
+ {{ exec.hand_name }}
+ ({{ exec.trigger }})
+
+
+ {{ exec.started_at[:16] if exec.started_at else 'Unknown' }}
+
+
+
+ {% if exec.error %}
+
+ Error: {{ exec.error[:50] }}{% if exec.error|length > 50 %}...{% endif %}
+
+ {% endif %}
+
+ {% endfor %}
+
+{% else %}
+
+{% endif %}
diff --git a/src/dashboard/templates/partials/hands_list.html b/src/dashboard/templates/partials/hands_list.html
new file mode 100644
index 00000000..b936511a
--- /dev/null
+++ b/src/dashboard/templates/partials/hands_list.html
@@ -0,0 +1,77 @@
+{# Hands list partial #}
+{% if hands %}
+
+ {% for item in hands %}
+ {% set hand = item.config %}
+ {% set state = item.state %}
+
+
+
+
+
{{ hand.name }}
+ {% if not hand.enabled %}
+ Disabled
+ {% endif %}
+
+
+ {% if state.status.value == 'running' %}
+
+ {% else %}
+
+ {% endif %}
+
+ {% if state.is_paused %}
+
+ {% else %}
+
+ {% endif %}
+
+
+
+
{{ hand.description }}
+
+
+
+ {% if hand.schedule %}
+ 🕐 {{ hand.schedule.cron }}
+ {% endif %}
+ ▶️ {{ state.run_count }} runs
+ {% if state.last_run %}
+ Last: {{ state.last_run.strftime('%H:%M') }}
+ {% endif %}
+
+
+ {{ state.status.value.upper() }}
+
+
+
+ {% if hand.tools_required %}
+
+ Tools:
+ {% for tool in hand.tools_required %}
+ {{ tool }}
+ {% endfor %}
+
+ {% endif %}
+
+ {% endfor %}
+
+{% else %}
+
+
No Hands configured.
+
Create Hand packages in the hands/ directory.
+
+{% endif %}
diff --git a/src/hands/__init__.py b/src/hands/__init__.py
new file mode 100644
index 00000000..6346afc7
--- /dev/null
+++ b/src/hands/__init__.py
@@ -0,0 +1,67 @@
+"""Hands — Autonomous scheduled agents for Timmy Time.
+
+The Hands framework provides autonomous agent capabilities:
+- Oracle: Bitcoin and on-chain intelligence
+- Scout: OSINT monitoring
+- Scribe: Content production
+- Ledger: Treasury tracking
+- Forge: Model management
+- Weaver: Creative pipeline
+- Sentinel: System health
+
+Usage:
+ from hands import HandRegistry, HandScheduler, HandRunner
+ from hands.models import HandConfig
+
+ # Load and schedule Hands
+ registry = HandRegistry(hands_dir="hands/")
+ await registry.load_all()
+
+ scheduler = HandScheduler(registry)
+ await scheduler.start()
+
+ # Execute a Hand manually
+ runner = HandRunner(registry, llm_adapter)
+ result = await runner.run_hand("oracle")
+"""
+
+from hands.models import (
+ ApprovalGate,
+ ApprovalRequest,
+ ApprovalStatus,
+ HandConfig,
+ HandExecution,
+ HandOutcome,
+ HandState,
+ HandStatus,
+ OutputConfig,
+ ScheduleConfig,
+ ToolRequirement,
+ TriggerType,
+)
+from hands.registry import HandRegistry, HandNotFoundError, HandValidationError
+from hands.scheduler import HandScheduler
+from hands.runner import HandRunner
+
+__all__ = [
+ # Models
+ "HandConfig",
+ "HandState",
+ "HandExecution",
+ "HandStatus",
+ "HandOutcome",
+ "TriggerType",
+ "ApprovalGate",
+ "ApprovalRequest",
+ "ApprovalStatus",
+ "ScheduleConfig",
+ "OutputConfig",
+ "ToolRequirement",
+ # Core classes
+ "HandRegistry",
+ "HandScheduler",
+ "HandRunner",
+ # Exceptions
+ "HandNotFoundError",
+ "HandValidationError",
+]
diff --git a/src/hands/models.py b/src/hands/models.py
new file mode 100644
index 00000000..d440a72e
--- /dev/null
+++ b/src/hands/models.py
@@ -0,0 +1,252 @@
+"""Hands Models — Pydantic schemas for HAND.toml manifests.
+
+Defines the data structures for autonomous Hand agents:
+- HandConfig: Complete hand configuration from HAND.toml
+- HandState: Runtime state tracking
+- HandExecution: Execution record for audit trail
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from datetime import datetime
+from enum import Enum
+from pathlib import Path
+from typing import Any, Optional
+
+from pydantic import BaseModel, Field, validator
+
+
+class HandStatus(str, Enum):
+ """Runtime status of a Hand."""
+ DISABLED = "disabled"
+ IDLE = "idle"
+ SCHEDULED = "scheduled"
+ RUNNING = "running"
+ PAUSED = "paused"
+ ERROR = "error"
+
+
+class HandOutcome(str, Enum):
+ """Outcome of a Hand execution."""
+ SUCCESS = "success"
+ FAILURE = "failure"
+ APPROVAL_PENDING = "approval_pending"
+ TIMEOUT = "timeout"
+ SKIPPED = "skipped"
+
+
+class TriggerType(str, Enum):
+ """Types of execution triggers."""
+ SCHEDULE = "schedule" # Cron schedule
+ MANUAL = "manual" # User triggered
+ EVENT = "event" # Event-driven
+ WEBHOOK = "webhook" # External webhook
+
+
+# ── HAND.toml Schema Models ───────────────────────────────────────────────
+
+class ToolRequirement(BaseModel):
+ """A required tool for the Hand."""
+ name: str
+ version: Optional[str] = None
+ optional: bool = False
+
+
+class OutputConfig(BaseModel):
+ """Output configuration for Hand results."""
+ dashboard: bool = True
+ channel: Optional[str] = None # e.g., "telegram", "discord"
+ format: str = "markdown" # markdown, json, html
+ file_drop: Optional[str] = None # Path to write output files
+
+
+class ApprovalGate(BaseModel):
+ """An approval gate for sensitive operations."""
+ action: str # e.g., "post_tweet", "send_payment"
+ description: str
+ auto_approve_after: Optional[int] = None # Seconds to auto-approve
+
+
+class ScheduleConfig(BaseModel):
+ """Schedule configuration for the Hand."""
+ cron: Optional[str] = None # Cron expression
+ interval: Optional[int] = None # Seconds between runs
+ at: Optional[str] = None # Specific time (HH:MM)
+ timezone: str = "UTC"
+
+ @validator('cron')
+ def validate_cron(cls, v: Optional[str]) -> Optional[str]:
+ if v is None:
+ return v
+ # Basic cron validation (5 fields)
+ parts = v.split()
+ if len(parts) != 5:
+ raise ValueError("Cron expression must have 5 fields: minute hour day month weekday")
+ return v
+
+
+class HandConfig(BaseModel):
+ """Complete Hand configuration from HAND.toml.
+
+ Example HAND.toml:
+ [hand]
+ name = "oracle"
+ schedule = "0 7,19 * * *"
+ description = "Bitcoin and on-chain intelligence briefing"
+
+ [tools]
+ required = ["mempool_fetch", "fee_estimate"]
+
+ [approval_gates]
+ post_tweet = { action = "post_tweet", description = "Post to Twitter" }
+
+ [output]
+ dashboard = true
+ channel = "telegram"
+ """
+
+ # Required fields
+ name: str = Field(..., description="Unique hand identifier")
+ description: str = Field(..., description="What this Hand does")
+
+ # Schedule (one of these must be set)
+ schedule: Optional[ScheduleConfig] = None
+ trigger: Optional[TriggerType] = TriggerType.SCHEDULE
+
+ # Optional fields
+ enabled: bool = True
+ version: str = "1.0.0"
+ author: Optional[str] = None
+
+ # Tools
+ tools_required: list[str] = Field(default_factory=list)
+ tools_optional: list[str] = Field(default_factory=list)
+
+ # Approval gates
+ approval_gates: list[ApprovalGate] = Field(default_factory=list)
+
+ # Output configuration
+ output: OutputConfig = Field(default_factory=OutputConfig)
+
+ # File paths (set at runtime)
+ hand_dir: Optional[Path] = Field(None, exclude=True)
+ system_prompt_path: Optional[Path] = None
+ skill_paths: list[Path] = Field(default_factory=list)
+
+ class Config:
+ extra = "allow" # Allow additional fields for extensibility
+
+ @property
+ def system_md_path(self) -> Optional[Path]:
+ """Path to SYSTEM.md file."""
+ if self.hand_dir:
+ return self.hand_dir / "SYSTEM.md"
+ return None
+
+ @property
+ def skill_md_paths(self) -> list[Path]:
+ """Paths to SKILL.md files."""
+ if self.hand_dir:
+ skill_dir = self.hand_dir / "skills"
+ if skill_dir.exists():
+ return list(skill_dir.glob("*.md"))
+ return []
+
+
+# ── Runtime State Models ─────────────────────────────────────────────────
+
+@dataclass
+class HandState:
+ """Runtime state of a Hand."""
+ name: str
+ status: HandStatus = HandStatus.IDLE
+ last_run: Optional[datetime] = None
+ next_run: Optional[datetime] = None
+ run_count: int = 0
+ success_count: int = 0
+ failure_count: int = 0
+ error_message: Optional[str] = None
+ is_paused: bool = False
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "name": self.name,
+ "status": self.status.value,
+ "last_run": self.last_run.isoformat() if self.last_run else None,
+ "next_run": self.next_run.isoformat() if self.next_run else None,
+ "run_count": self.run_count,
+ "success_count": self.success_count,
+ "failure_count": self.failure_count,
+ "error_message": self.error_message,
+ "is_paused": self.is_paused,
+ }
+
+
+@dataclass
+class HandExecution:
+ """Record of a Hand execution."""
+ id: str
+ hand_name: str
+ trigger: TriggerType
+ started_at: datetime
+ completed_at: Optional[datetime] = None
+ outcome: HandOutcome = HandOutcome.SKIPPED
+ output: str = ""
+ error: Optional[str] = None
+ approval_id: Optional[str] = None
+ files_generated: list[str] = field(default_factory=list)
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "id": self.id,
+ "hand_name": self.hand_name,
+ "trigger": self.trigger.value,
+ "started_at": self.started_at.isoformat(),
+ "completed_at": self.completed_at.isoformat() if self.completed_at else None,
+ "outcome": self.outcome.value,
+ "output": self.output,
+ "error": self.error,
+ "approval_id": self.approval_id,
+ "files_generated": self.files_generated,
+ }
+
+
+# ── Approval Queue Models ────────────────────────────────────────────────
+
+class ApprovalStatus(str, Enum):
+ """Status of an approval request."""
+ PENDING = "pending"
+ APPROVED = "approved"
+ REJECTED = "rejected"
+ EXPIRED = "expired"
+ AUTO_APPROVED = "auto_approved"
+
+
+@dataclass
+class ApprovalRequest:
+ """A request for user approval."""
+ id: str
+ hand_name: str
+ action: str
+ description: str
+ context: dict[str, Any] = field(default_factory=dict)
+ status: ApprovalStatus = ApprovalStatus.PENDING
+ created_at: datetime = field(default_factory=datetime.utcnow)
+ expires_at: Optional[datetime] = None
+ resolved_at: Optional[datetime] = None
+ resolved_by: Optional[str] = None
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "id": self.id,
+ "hand_name": self.hand_name,
+ "action": self.action,
+ "description": self.description,
+ "context": self.context,
+ "status": self.status.value,
+ "created_at": self.created_at.isoformat(),
+ "expires_at": self.expires_at.isoformat() if self.expires_at else None,
+ "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
+ "resolved_by": self.resolved_by,
+ }
diff --git a/src/hands/registry.py b/src/hands/registry.py
new file mode 100644
index 00000000..72a3e5ab
--- /dev/null
+++ b/src/hands/registry.py
@@ -0,0 +1,526 @@
+"""Hand Registry — Load, validate, and index Hands from the hands directory.
+
+The HandRegistry discovers all Hand packages in the hands/ directory,
+loads their HAND.toml manifests, and maintains an index for fast lookup.
+
+Usage:
+ from hands.registry import HandRegistry
+
+ registry = HandRegistry(hands_dir="hands/")
+ await registry.load_all()
+
+ oracle = registry.get_hand("oracle")
+ all_hands = registry.list_hands()
+ scheduled = registry.get_scheduled_hands()
+"""
+
+from __future__ import annotations
+
+import logging
+import sqlite3
+import tomllib
+import uuid
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Optional
+
+from hands.models import ApprovalGate, ApprovalRequest, ApprovalStatus, HandConfig, HandState, HandStatus, OutputConfig, ScheduleConfig
+
+logger = logging.getLogger(__name__)
+
+
+class HandRegistryError(Exception):
+ """Base exception for HandRegistry errors."""
+ pass
+
+
+class HandNotFoundError(HandRegistryError):
+ """Raised when a Hand is not found."""
+ pass
+
+
+class HandValidationError(HandRegistryError):
+ """Raised when a Hand fails validation."""
+ pass
+
+
+class HandRegistry:
+ """Registry for autonomous Hands.
+
+ Discovers Hands from the filesystem, loads their configurations,
+ and maintains a SQLite index for fast lookups.
+
+ Attributes:
+ hands_dir: Directory containing Hand packages
+ db_path: SQLite database for indexing
+ _hands: In-memory cache of loaded HandConfigs
+ _states: Runtime state of each Hand
+ """
+
+ def __init__(
+ self,
+ hands_dir: str | Path = "hands/",
+ db_path: str | Path = "data/hands.db",
+ ) -> None:
+ """Initialize HandRegistry.
+
+ Args:
+ hands_dir: Directory containing Hand subdirectories
+ db_path: SQLite database path for indexing
+ """
+ self.hands_dir = Path(hands_dir)
+ self.db_path = Path(db_path)
+ self._hands: dict[str, HandConfig] = {}
+ self._states: dict[str, HandState] = {}
+ self._ensure_schema()
+ logger.info("HandRegistry initialized (hands_dir=%s)", self.hands_dir)
+
+ def _get_conn(self) -> sqlite3.Connection:
+ """Get database connection."""
+ self.db_path.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(self.db_path))
+ conn.row_factory = sqlite3.Row
+ return conn
+
+ def _ensure_schema(self) -> None:
+ """Create database tables if they don't exist."""
+ with self._get_conn() as conn:
+ # Hands index
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS hands (
+ name TEXT PRIMARY KEY,
+ config_json TEXT NOT NULL,
+ enabled INTEGER DEFAULT 1,
+ loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ )
+ """)
+
+ # Hand execution history
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS hand_executions (
+ id TEXT PRIMARY KEY,
+ hand_name TEXT NOT NULL,
+ trigger TEXT NOT NULL,
+ started_at TIMESTAMP NOT NULL,
+ completed_at TIMESTAMP,
+ outcome TEXT NOT NULL,
+ output TEXT,
+ error TEXT,
+ approval_id TEXT
+ )
+ """)
+
+ # Approval queue
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS approval_queue (
+ id TEXT PRIMARY KEY,
+ hand_name TEXT NOT NULL,
+ action TEXT NOT NULL,
+ description TEXT NOT NULL,
+ context_json TEXT,
+ status TEXT DEFAULT 'pending',
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ expires_at TIMESTAMP,
+ resolved_at TIMESTAMP,
+ resolved_by TEXT
+ )
+ """)
+
+ conn.commit()
+
+ async def load_all(self) -> dict[str, HandConfig]:
+ """Load all Hands from the hands directory.
+
+ Returns:
+ Dict mapping hand names to HandConfigs
+ """
+ if not self.hands_dir.exists():
+ logger.warning("Hands directory does not exist: %s", self.hands_dir)
+ return {}
+
+ loaded = {}
+
+ for hand_dir in self.hands_dir.iterdir():
+ if not hand_dir.is_dir():
+ continue
+
+ try:
+ hand = self._load_hand_from_dir(hand_dir)
+ if hand:
+ loaded[hand.name] = hand
+ self._hands[hand.name] = hand
+
+ # Initialize state if not exists
+ if hand.name not in self._states:
+ self._states[hand.name] = HandState(name=hand.name)
+
+ # Store in database
+ self._store_hand(conn=None, hand=hand)
+
+ logger.info("Loaded Hand: %s (%s)", hand.name, hand.description[:50])
+
+ except Exception as e:
+ logger.error("Failed to load Hand from %s: %s", hand_dir, e)
+
+ logger.info("Loaded %d Hands", len(loaded))
+ return loaded
+
+ def _load_hand_from_dir(self, hand_dir: Path) -> Optional[HandConfig]:
+ """Load a single Hand from its directory.
+
+ Args:
+ hand_dir: Directory containing HAND.toml
+
+ Returns:
+ HandConfig or None if invalid
+ """
+ manifest_path = hand_dir / "HAND.toml"
+
+ if not manifest_path.exists():
+ logger.debug("No HAND.toml in %s", hand_dir)
+ return None
+
+ # Parse TOML
+ try:
+ with open(manifest_path, "rb") as f:
+ data = tomllib.load(f)
+ except Exception as e:
+ raise HandValidationError(f"Invalid HAND.toml: {e}")
+
+ # Extract hand section
+ hand_data = data.get("hand", {})
+ if not hand_data:
+ raise HandValidationError("Missing [hand] section in HAND.toml")
+
+ # Build HandConfig
+ config = HandConfig(
+ name=hand_data.get("name", hand_dir.name),
+ description=hand_data.get("description", ""),
+ enabled=hand_data.get("enabled", True),
+ version=hand_data.get("version", "1.0.0"),
+ author=hand_data.get("author"),
+ hand_dir=hand_dir,
+ )
+
+ # Parse schedule
+ if "schedule" in hand_data:
+ schedule_data = hand_data["schedule"]
+ if isinstance(schedule_data, str):
+ # Simple cron string
+ config.schedule = ScheduleConfig(cron=schedule_data)
+ elif isinstance(schedule_data, dict):
+ config.schedule = ScheduleConfig(**schedule_data)
+
+ # Parse tools
+ tools_data = data.get("tools", {})
+ config.tools_required = tools_data.get("required", [])
+ config.tools_optional = tools_data.get("optional", [])
+
+ # Parse approval gates
+ gates_data = data.get("approval_gates", {})
+ for action, gate_data in gates_data.items():
+ if isinstance(gate_data, dict):
+ config.approval_gates.append(ApprovalGate(
+ action=gate_data.get("action", action),
+ description=gate_data.get("description", ""),
+ auto_approve_after=gate_data.get("auto_approve_after"),
+ ))
+
+ # Parse output config
+ output_data = data.get("output", {})
+ config.output = OutputConfig(**output_data)
+
+ return config
+
+ def _store_hand(self, conn: Optional[sqlite3.Connection], hand: HandConfig) -> None:
+ """Store hand config in database."""
+ import json
+
+ if conn is None:
+ with self._get_conn() as conn:
+ self._store_hand(conn, hand)
+ return
+
+ conn.execute(
+ """
+ INSERT OR REPLACE INTO hands (name, config_json, enabled)
+ VALUES (?, ?, ?)
+ """,
+ (hand.name, hand.json(), 1 if hand.enabled else 0),
+ )
+ conn.commit()
+
+ def get_hand(self, name: str) -> HandConfig:
+ """Get a Hand by name.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ HandConfig
+
+ Raises:
+ HandNotFoundError: If Hand doesn't exist
+ """
+ if name not in self._hands:
+ raise HandNotFoundError(f"Hand not found: {name}")
+ return self._hands[name]
+
+ def list_hands(self) -> list[HandConfig]:
+ """List all loaded Hands.
+
+ Returns:
+ List of HandConfigs
+ """
+ return list(self._hands.values())
+
+ def get_scheduled_hands(self) -> list[HandConfig]:
+ """Get all Hands with schedule configuration.
+
+ Returns:
+ List of HandConfigs with schedules
+ """
+ return [h for h in self._hands.values() if h.schedule is not None and h.enabled]
+
+ def get_enabled_hands(self) -> list[HandConfig]:
+ """Get all enabled Hands.
+
+ Returns:
+ List of enabled HandConfigs
+ """
+ return [h for h in self._hands.values() if h.enabled]
+
+ def get_state(self, name: str) -> HandState:
+ """Get runtime state of a Hand.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ HandState
+ """
+ if name not in self._states:
+ self._states[name] = HandState(name=name)
+ return self._states[name]
+
+ def update_state(self, name: str, **kwargs) -> None:
+ """Update Hand state.
+
+ Args:
+ name: Hand name
+ **kwargs: State fields to update
+ """
+ state = self.get_state(name)
+ for key, value in kwargs.items():
+ if hasattr(state, key):
+ setattr(state, key, value)
+
+ async def log_execution(
+ self,
+ hand_name: str,
+ trigger: str,
+ outcome: str,
+ output: str = "",
+ error: Optional[str] = None,
+ approval_id: Optional[str] = None,
+ ) -> str:
+ """Log a Hand execution.
+
+ Args:
+ hand_name: Name of the Hand
+ trigger: Trigger type
+ outcome: Execution outcome
+ output: Execution output
+ error: Error message if failed
+ approval_id: Associated approval ID
+
+ Returns:
+ Execution ID
+ """
+ execution_id = str(uuid.uuid4())
+
+ with self._get_conn() as conn:
+ conn.execute(
+ """
+ INSERT INTO hand_executions
+ (id, hand_name, trigger, started_at, completed_at, outcome, output, error, approval_id)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ execution_id,
+ hand_name,
+ trigger,
+ datetime.now(timezone.utc).isoformat(),
+ datetime.now(timezone.utc).isoformat(),
+ outcome,
+ output,
+ error,
+ approval_id,
+ ),
+ )
+ conn.commit()
+
+ return execution_id
+
+ async def create_approval(
+ self,
+ hand_name: str,
+ action: str,
+ description: str,
+ context: dict,
+ expires_after: Optional[int] = None,
+ ) -> ApprovalRequest:
+ """Create an approval request.
+
+ Args:
+ hand_name: Hand requesting approval
+ action: Action to approve
+ description: Human-readable description
+ context: Additional context
+ expires_after: Seconds until expiration
+
+ Returns:
+ ApprovalRequest
+ """
+ approval_id = str(uuid.uuid4())
+
+ created_at = datetime.now(timezone.utc)
+ expires_at = None
+ if expires_after:
+ from datetime import timedelta
+ expires_at = created_at + timedelta(seconds=expires_after)
+
+ request = ApprovalRequest(
+ id=approval_id,
+ hand_name=hand_name,
+ action=action,
+ description=description,
+ context=context,
+ created_at=created_at,
+ expires_at=expires_at,
+ )
+
+ # Store in database
+ import json
+ with self._get_conn() as conn:
+ conn.execute(
+ """
+ INSERT INTO approval_queue
+ (id, hand_name, action, description, context_json, status, created_at, expires_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ request.id,
+ request.hand_name,
+ request.action,
+ request.description,
+ json.dumps(request.context),
+ request.status.value,
+ request.created_at.isoformat(),
+ request.expires_at.isoformat() if request.expires_at else None,
+ ),
+ )
+ conn.commit()
+
+ return request
+
+ async def get_pending_approvals(self) -> list[ApprovalRequest]:
+ """Get all pending approval requests.
+
+ Returns:
+ List of pending ApprovalRequests
+ """
+ import json
+
+ with self._get_conn() as conn:
+ rows = conn.execute(
+ """
+ SELECT * FROM approval_queue
+ WHERE status = 'pending'
+ ORDER BY created_at DESC
+ """
+ ).fetchall()
+
+ requests = []
+ for row in rows:
+ requests.append(ApprovalRequest(
+ id=row["id"],
+ hand_name=row["hand_name"],
+ action=row["action"],
+ description=row["description"],
+ context=json.loads(row["context_json"] or "{}"),
+ status=ApprovalStatus(row["status"]),
+ created_at=datetime.fromisoformat(row["created_at"]),
+ expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None,
+ ))
+
+ return requests
+
+ async def resolve_approval(
+ self,
+ approval_id: str,
+ approved: bool,
+ resolved_by: Optional[str] = None,
+ ) -> bool:
+ """Resolve an approval request.
+
+ Args:
+ approval_id: ID of the approval request
+ approved: True to approve, False to reject
+ resolved_by: Who resolved the request
+
+ Returns:
+ True if resolved successfully
+ """
+ status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED
+ resolved_at = datetime.now(timezone.utc)
+
+ with self._get_conn() as conn:
+ cursor = conn.execute(
+ """
+ UPDATE approval_queue
+ SET status = ?, resolved_at = ?, resolved_by = ?
+ WHERE id = ? AND status = 'pending'
+ """,
+ (status.value, resolved_at.isoformat(), resolved_by, approval_id),
+ )
+ conn.commit()
+
+ return cursor.rowcount > 0
+
+ async def get_recent_executions(
+ self,
+ hand_name: Optional[str] = None,
+ limit: int = 50,
+ ) -> list[dict]:
+ """Get recent Hand executions.
+
+ Args:
+ hand_name: Filter by Hand name
+ limit: Maximum results
+
+ Returns:
+ List of execution records
+ """
+ with self._get_conn() as conn:
+ if hand_name:
+ rows = conn.execute(
+ """
+ SELECT * FROM hand_executions
+ WHERE hand_name = ?
+ ORDER BY started_at DESC
+ LIMIT ?
+ """,
+ (hand_name, limit),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ """
+ SELECT * FROM hand_executions
+ ORDER BY started_at DESC
+ LIMIT ?
+ """,
+ (limit,),
+ ).fetchall()
+
+ return [dict(row) for row in rows]
diff --git a/src/hands/runner.py b/src/hands/runner.py
new file mode 100644
index 00000000..1c575b1b
--- /dev/null
+++ b/src/hands/runner.py
@@ -0,0 +1,476 @@
+"""Hand Runner — Execute Hands with skill injection and tool access.
+
+The HandRunner is responsible for executing individual Hands:
+- Load SYSTEM.md and SKILL.md files
+- Inject domain expertise into LLM context
+- Execute the tool loop
+- Handle approval gates
+- Produce output
+
+Usage:
+ from hands.runner import HandRunner
+ from hands.registry import HandRegistry
+
+ registry = HandRegistry()
+ runner = HandRunner(registry, llm_adapter)
+
+ result = await runner.run_hand("oracle")
+"""
+
+from __future__ import annotations
+
+import logging
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any, Optional
+
+from hands.models import (
+ ApprovalRequest,
+ ApprovalStatus,
+ HandConfig,
+ HandExecution,
+ HandOutcome,
+ HandState,
+ HandStatus,
+ TriggerType,
+)
+from hands.registry import HandRegistry
+
+logger = logging.getLogger(__name__)
+
+
+class HandRunner:
+ """Executes individual Hands.
+
+ Manages the execution lifecycle:
+ 1. Load system prompt and skills
+ 2. Check and handle approval gates
+ 3. Execute tool loop with LLM
+ 4. Produce and deliver output
+ 5. Log execution
+
+ Attributes:
+ registry: HandRegistry for Hand configs and state
+ llm_adapter: LLM adapter for generation
+ mcp_registry: Optional MCP tool registry
+ """
+
+ def __init__(
+ self,
+ registry: HandRegistry,
+ llm_adapter: Optional[Any] = None,
+ mcp_registry: Optional[Any] = None,
+ ) -> None:
+ """Initialize HandRunner.
+
+ Args:
+ registry: HandRegistry instance
+ llm_adapter: LLM adapter for generation
+ mcp_registry: Optional MCP tool registry for tool access
+ """
+ self.registry = registry
+ self.llm_adapter = llm_adapter
+ self.mcp_registry = mcp_registry
+
+ logger.info("HandRunner initialized")
+
+ async def run_hand(
+ self,
+ hand_name: str,
+ trigger: TriggerType = TriggerType.MANUAL,
+ context: Optional[dict] = None,
+ ) -> HandExecution:
+ """Run a Hand.
+
+ This is the main entry point for Hand execution.
+
+ Args:
+ hand_name: Name of the Hand to run
+ trigger: What triggered this execution
+ context: Optional execution context
+
+ Returns:
+ HandExecution record
+ """
+ started_at = datetime.now(timezone.utc)
+ execution_id = f"exec_{hand_name}_{started_at.isoformat()}"
+
+ logger.info("Starting Hand execution: %s", hand_name)
+
+ try:
+ # Get Hand config
+ hand = self.registry.get_hand(hand_name)
+
+ # Update state
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.RUNNING,
+ last_run=started_at,
+ )
+
+ # Load system prompt and skills
+ system_prompt = self._load_system_prompt(hand)
+ skills = self._load_skills(hand)
+
+ # Check approval gates
+ approval_results = await self._check_approvals(hand)
+ if approval_results.get("blocked"):
+ return await self._create_execution_record(
+ execution_id=execution_id,
+ hand_name=hand_name,
+ trigger=trigger,
+ started_at=started_at,
+ outcome=HandOutcome.APPROVAL_PENDING,
+ output="",
+ approval_id=approval_results.get("approval_id"),
+ )
+
+ # Execute the Hand
+ result = await self._execute_with_llm(
+ hand=hand,
+ system_prompt=system_prompt,
+ skills=skills,
+ context=context or {},
+ )
+
+ # Deliver output
+ await self._deliver_output(hand, result)
+
+ # Update state
+ state = self.registry.get_state(hand_name)
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.IDLE,
+ run_count=state.run_count + 1,
+ success_count=state.success_count + 1,
+ )
+
+ # Create execution record
+ return await self._create_execution_record(
+ execution_id=execution_id,
+ hand_name=hand_name,
+ trigger=trigger,
+ started_at=started_at,
+ outcome=HandOutcome.SUCCESS,
+ output=result.get("output", ""),
+ files_generated=result.get("files", []),
+ )
+
+ except Exception as e:
+ logger.exception("Hand %s execution failed", hand_name)
+
+ # Update state
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.ERROR,
+ error_message=str(e),
+ )
+
+ # Create failure record
+ return await self._create_execution_record(
+ execution_id=execution_id,
+ hand_name=hand_name,
+ trigger=trigger,
+ started_at=started_at,
+ outcome=HandOutcome.FAILURE,
+ output="",
+ error=str(e),
+ )
+
+ def _load_system_prompt(self, hand: HandConfig) -> str:
+ """Load SYSTEM.md for a Hand.
+
+ Args:
+ hand: HandConfig
+
+ Returns:
+ System prompt text
+ """
+ if hand.system_md_path and hand.system_md_path.exists():
+ try:
+ return hand.system_md_path.read_text()
+ except Exception as e:
+ logger.warning("Failed to load SYSTEM.md for %s: %s", hand.name, e)
+
+ # Default system prompt
+ return f"""You are the {hand.name} Hand.
+
+Your purpose: {hand.description}
+
+You have access to the following tools: {', '.join(hand.tools_required + hand.tools_optional)}
+
+Execute your task professionally and produce the requested output.
+"""
+
+ def _load_skills(self, hand: HandConfig) -> list[str]:
+ """Load SKILL.md files for a Hand.
+
+ Args:
+ hand: HandConfig
+
+ Returns:
+ List of skill texts
+ """
+ skills = []
+
+ for skill_path in hand.skill_md_paths:
+ try:
+ if skill_path.exists():
+ skills.append(skill_path.read_text())
+ except Exception as e:
+ logger.warning("Failed to load skill %s: %s", skill_path, e)
+
+ return skills
+
+ async def _check_approvals(self, hand: HandConfig) -> dict:
+ """Check if any approval gates block execution.
+
+ Args:
+ hand: HandConfig
+
+ Returns:
+ Dict with "blocked" and optional "approval_id"
+ """
+ if not hand.approval_gates:
+ return {"blocked": False}
+
+ # Check for pending approvals for this hand
+ pending = await self.registry.get_pending_approvals()
+ hand_pending = [a for a in pending if a.hand_name == hand.name]
+
+ if hand_pending:
+ return {
+ "blocked": True,
+ "approval_id": hand_pending[0].id,
+ }
+
+ # Create approval requests for each gate
+ for gate in hand.approval_gates:
+ request = await self.registry.create_approval(
+ hand_name=hand.name,
+ action=gate.action,
+ description=gate.description,
+ context={"gate": gate.action},
+ expires_after=gate.auto_approve_after,
+ )
+
+ if not gate.auto_approve_after:
+ # Requires manual approval
+ return {
+ "blocked": True,
+ "approval_id": request.id,
+ }
+
+ return {"blocked": False}
+
+ async def _execute_with_llm(
+ self,
+ hand: HandConfig,
+ system_prompt: str,
+ skills: list[str],
+ context: dict,
+ ) -> dict:
+ """Execute Hand logic with LLM.
+
+ Args:
+ hand: HandConfig
+ system_prompt: System prompt
+ skills: Skill texts
+ context: Execution context
+
+ Returns:
+ Result dict with output and files
+ """
+ if not self.llm_adapter:
+ logger.warning("No LLM adapter available for Hand %s", hand.name)
+ return {
+ "output": f"Hand {hand.name} executed (no LLM configured)",
+ "files": [],
+ }
+
+ # Build the full prompt
+ full_prompt = self._build_prompt(
+ hand=hand,
+ system_prompt=system_prompt,
+ skills=skills,
+ context=context,
+ )
+
+ try:
+ # Call LLM
+ response = await self.llm_adapter.chat(message=full_prompt)
+
+ # Parse response
+ output = response.content
+
+ # Extract any file outputs (placeholder - would parse structured output)
+ files = []
+
+ return {
+ "output": output,
+ "files": files,
+ }
+
+ except Exception as e:
+ logger.error("LLM execution failed for Hand %s: %s", hand.name, e)
+ raise
+
+ def _build_prompt(
+ self,
+ hand: HandConfig,
+ system_prompt: str,
+ skills: list[str],
+ context: dict,
+ ) -> str:
+ """Build the full execution prompt.
+
+ Args:
+ hand: HandConfig
+ system_prompt: System prompt
+ skills: Skill texts
+ context: Execution context
+
+ Returns:
+ Complete prompt
+ """
+ parts = [
+ "# System Instructions",
+ system_prompt,
+ "",
+ ]
+
+ # Add skills
+ if skills:
+ parts.extend([
+ "# Domain Expertise (SKILL.md)",
+ "\n\n---\n\n".join(skills),
+ "",
+ ])
+
+ # Add context
+ if context:
+ parts.extend([
+ "# Execution Context",
+ str(context),
+ "",
+ ])
+
+ # Add available tools
+ if hand.tools_required or hand.tools_optional:
+ parts.extend([
+ "# Available Tools",
+ "Required: " + ", ".join(hand.tools_required),
+ "Optional: " + ", ".join(hand.tools_optional),
+ "",
+ ])
+
+ # Add output instructions
+ parts.extend([
+ "# Output Instructions",
+ f"Format: {hand.output.format}",
+ f"Dashboard: {'Yes' if hand.output.dashboard else 'No'}",
+ f"Channel: {hand.output.channel or 'None'}",
+ "",
+ "Execute your task now.",
+ ])
+
+ return "\n".join(parts)
+
+ async def _deliver_output(self, hand: HandConfig, result: dict) -> None:
+ """Deliver Hand output to configured destinations.
+
+ Args:
+ hand: HandConfig
+ result: Execution result
+ """
+ output = result.get("output", "")
+
+ # Dashboard output
+ if hand.output.dashboard:
+ # This would publish to event bus for dashboard
+ logger.info("Hand %s output delivered to dashboard", hand.name)
+
+ # Channel output (e.g., Telegram, Discord)
+ if hand.output.channel:
+ # This would send to the appropriate channel
+ logger.info("Hand %s output delivered to %s", hand.name, hand.output.channel)
+
+ # File drop
+ if hand.output.file_drop:
+ try:
+ drop_path = Path(hand.output.file_drop)
+ drop_path.mkdir(parents=True, exist_ok=True)
+
+ output_file = drop_path / f"{hand.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
+ output_file.write_text(output)
+
+ logger.info("Hand %s output written to %s", hand.name, output_file)
+ except Exception as e:
+ logger.error("Failed to write Hand %s output: %s", hand.name, e)
+
+ async def _create_execution_record(
+ self,
+ execution_id: str,
+ hand_name: str,
+ trigger: TriggerType,
+ started_at: datetime,
+ outcome: HandOutcome,
+ output: str,
+ error: Optional[str] = None,
+ approval_id: Optional[str] = None,
+ files_generated: Optional[list] = None,
+ ) -> HandExecution:
+ """Create and store execution record.
+
+ Returns:
+ HandExecution
+ """
+ completed_at = datetime.now(timezone.utc)
+
+ execution = HandExecution(
+ id=execution_id,
+ hand_name=hand_name,
+ trigger=trigger,
+ started_at=started_at,
+ completed_at=completed_at,
+ outcome=outcome,
+ output=output,
+ error=error,
+ approval_id=approval_id,
+ files_generated=files_generated or [],
+ )
+
+ # Log to registry
+ await self.registry.log_execution(
+ hand_name=hand_name,
+ trigger=trigger.value,
+ outcome=outcome.value,
+ output=output,
+ error=error,
+ approval_id=approval_id,
+ )
+
+ return execution
+
+ async def continue_after_approval(
+ self,
+ approval_id: str,
+ ) -> Optional[HandExecution]:
+ """Continue Hand execution after approval.
+
+ Args:
+ approval_id: Approval request ID
+
+ Returns:
+ HandExecution if execution proceeded
+ """
+ # Get approval request
+ # This would need a get_approval_by_id method in registry
+ # For now, placeholder
+
+ logger.info("Continuing Hand execution after approval %s", approval_id)
+
+ # Re-run the Hand
+ # This would look up the hand from the approval context
+
+ return None
diff --git a/src/hands/scheduler.py b/src/hands/scheduler.py
new file mode 100644
index 00000000..e9344981
--- /dev/null
+++ b/src/hands/scheduler.py
@@ -0,0 +1,410 @@
+"""Hand Scheduler — APScheduler-based cron scheduling for Hands.
+
+Manages the scheduling of autonomous Hands using APScheduler.
+Supports cron expressions, intervals, and specific times.
+
+Usage:
+ from hands.scheduler import HandScheduler
+ from hands.registry import HandRegistry
+
+ registry = HandRegistry()
+ await registry.load_all()
+
+ scheduler = HandScheduler(registry)
+ await scheduler.start()
+
+ # Hands are now scheduled and will run automatically
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from datetime import datetime, timezone
+from typing import Any, Callable, Optional
+
+from hands.models import HandConfig, HandState, HandStatus, TriggerType
+from hands.registry import HandRegistry
+
+logger = logging.getLogger(__name__)
+
+# Try to import APScheduler
+try:
+ from apscheduler.schedulers.asyncio import AsyncIOScheduler
+ from apscheduler.triggers.cron import CronTrigger
+ from apscheduler.triggers.interval import IntervalTrigger
+ APSCHEDULER_AVAILABLE = True
+except ImportError:
+ APSCHEDULER_AVAILABLE = False
+ logger.warning("APScheduler not installed. Scheduling will be disabled.")
+
+
+class HandScheduler:
+ """Scheduler for autonomous Hands.
+
+ Uses APScheduler to manage cron-based execution of Hands.
+ Each Hand with a schedule gets its own job in the scheduler.
+
+ Attributes:
+ registry: HandRegistry for Hand configurations
+ _scheduler: APScheduler instance
+ _running: Whether scheduler is running
+ _job_ids: Mapping of hand names to job IDs
+ """
+
+ def __init__(
+ self,
+ registry: HandRegistry,
+ job_defaults: Optional[dict] = None,
+ ) -> None:
+ """Initialize HandScheduler.
+
+ Args:
+ registry: HandRegistry instance
+ job_defaults: Default job configuration for APScheduler
+ """
+ self.registry = registry
+ self._scheduler: Optional[Any] = None
+ self._running = False
+ self._job_ids: dict[str, str] = {}
+
+ if APSCHEDULER_AVAILABLE:
+ self._scheduler = AsyncIOScheduler(job_defaults=job_defaults or {
+ 'coalesce': True, # Coalesce missed jobs into one
+ 'max_instances': 1, # Only one instance per Hand
+ })
+
+ logger.info("HandScheduler initialized")
+
+ async def start(self) -> None:
+ """Start the scheduler and schedule all enabled Hands."""
+ if not APSCHEDULER_AVAILABLE:
+ logger.error("Cannot start scheduler: APScheduler not installed")
+ return
+
+ if self._running:
+ logger.warning("Scheduler already running")
+ return
+
+ # Schedule all enabled Hands
+ hands = self.registry.get_scheduled_hands()
+ for hand in hands:
+ await self.schedule_hand(hand)
+
+ # Start the scheduler
+ self._scheduler.start()
+ self._running = True
+
+ logger.info("HandScheduler started with %d scheduled Hands", len(hands))
+
+ async def stop(self) -> None:
+ """Stop the scheduler."""
+ if not self._running or not self._scheduler:
+ return
+
+ self._scheduler.shutdown(wait=True)
+ self._running = False
+ self._job_ids.clear()
+
+ logger.info("HandScheduler stopped")
+
+ async def schedule_hand(self, hand: HandConfig) -> Optional[str]:
+ """Schedule a Hand for execution.
+
+ Args:
+ hand: HandConfig to schedule
+
+ Returns:
+ Job ID if scheduled successfully
+ """
+ if not APSCHEDULER_AVAILABLE or not self._scheduler:
+ logger.warning("Cannot schedule %s: APScheduler not available", hand.name)
+ return None
+
+ if not hand.schedule:
+ logger.debug("Hand %s has no schedule", hand.name)
+ return None
+
+ if not hand.enabled:
+ logger.debug("Hand %s is disabled", hand.name)
+ return None
+
+ # Remove existing job if any
+ if hand.name in self._job_ids:
+ self.unschedule_hand(hand.name)
+
+ # Create the trigger
+ trigger = self._create_trigger(hand.schedule)
+ if not trigger:
+ logger.error("Failed to create trigger for Hand %s", hand.name)
+ return None
+
+ # Add job to scheduler
+ try:
+ job = self._scheduler.add_job(
+ func=self._execute_hand_wrapper,
+ trigger=trigger,
+ id=f"hand_{hand.name}",
+ name=f"Hand: {hand.name}",
+ args=[hand.name],
+ replace_existing=True,
+ )
+
+ self._job_ids[hand.name] = job.id
+
+ # Update state
+ self.registry.update_state(
+ hand.name,
+ status=HandStatus.SCHEDULED,
+ next_run=job.next_run_time,
+ )
+
+ logger.info("Scheduled Hand %s (next run: %s)", hand.name, job.next_run_time)
+ return job.id
+
+ except Exception as e:
+ logger.error("Failed to schedule Hand %s: %s", hand.name, e)
+ return None
+
+ def unschedule_hand(self, name: str) -> bool:
+ """Remove a Hand from the scheduler.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ True if unscheduled successfully
+ """
+ if not self._scheduler:
+ return False
+
+ if name not in self._job_ids:
+ return False
+
+ try:
+ self._scheduler.remove_job(self._job_ids[name])
+ del self._job_ids[name]
+
+ self.registry.update_state(name, status=HandStatus.IDLE)
+
+ logger.info("Unscheduled Hand %s", name)
+ return True
+
+ except Exception as e:
+ logger.error("Failed to unschedule Hand %s: %s", name, e)
+ return False
+
+ def pause_hand(self, name: str) -> bool:
+ """Pause a scheduled Hand.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ True if paused successfully
+ """
+ if not self._scheduler:
+ return False
+
+ if name not in self._job_ids:
+ return False
+
+ try:
+ self._scheduler.pause_job(self._job_ids[name])
+ self.registry.update_state(name, status=HandStatus.PAUSED, is_paused=True)
+ logger.info("Paused Hand %s", name)
+ return True
+ except Exception as e:
+ logger.error("Failed to pause Hand %s: %s", name, e)
+ return False
+
+ def resume_hand(self, name: str) -> bool:
+ """Resume a paused Hand.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ True if resumed successfully
+ """
+ if not self._scheduler:
+ return False
+
+ if name not in self._job_ids:
+ return False
+
+ try:
+ self._scheduler.resume_job(self._job_ids[name])
+ self.registry.update_state(name, status=HandStatus.SCHEDULED, is_paused=False)
+ logger.info("Resumed Hand %s", name)
+ return True
+ except Exception as e:
+ logger.error("Failed to resume Hand %s: %s", name, e)
+ return False
+
+ def get_scheduled_jobs(self) -> list[dict]:
+ """Get all scheduled jobs.
+
+ Returns:
+ List of job information dicts
+ """
+ if not self._scheduler:
+ return []
+
+ jobs = []
+ for job in self._scheduler.get_jobs():
+ if job.id.startswith("hand_"):
+ hand_name = job.id[5:] # Remove "hand_" prefix
+ jobs.append({
+ "hand_name": hand_name,
+ "job_id": job.id,
+ "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
+ "trigger": str(job.trigger),
+ })
+
+ return jobs
+
+ def _create_trigger(self, schedule: Any) -> Optional[Any]:
+ """Create an APScheduler trigger from ScheduleConfig.
+
+ Args:
+ schedule: ScheduleConfig
+
+ Returns:
+ APScheduler trigger
+ """
+ if not APSCHEDULER_AVAILABLE:
+ return None
+
+ # Cron trigger
+ if schedule.cron:
+ try:
+ parts = schedule.cron.split()
+ if len(parts) == 5:
+ return CronTrigger(
+ minute=parts[0],
+ hour=parts[1],
+ day=parts[2],
+ month=parts[3],
+ day_of_week=parts[4],
+ timezone=schedule.timezone,
+ )
+ except Exception as e:
+ logger.error("Invalid cron expression '%s': %s", schedule.cron, e)
+ return None
+
+ # Interval trigger
+ if schedule.interval:
+ return IntervalTrigger(
+ seconds=schedule.interval,
+ timezone=schedule.timezone,
+ )
+
+ return None
+
+ async def _execute_hand_wrapper(self, hand_name: str) -> None:
+ """Wrapper for Hand execution.
+
+ This is called by APScheduler when a Hand's trigger fires.
+
+ Args:
+ hand_name: Name of the Hand to execute
+ """
+ logger.info("Triggering Hand: %s", hand_name)
+
+ try:
+ # Update state
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.RUNNING,
+ last_run=datetime.now(timezone.utc),
+ )
+
+ # Execute the Hand
+ await self._run_hand(hand_name, TriggerType.SCHEDULE)
+
+ except Exception as e:
+ logger.exception("Hand %s execution failed", hand_name)
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.ERROR,
+ error_message=str(e),
+ )
+
+ async def _run_hand(self, hand_name: str, trigger: TriggerType) -> None:
+ """Execute a Hand.
+
+ This is the core execution logic. In Phase 4+, this will
+ call the actual Hand implementation.
+
+ Args:
+ hand_name: Name of the Hand
+ trigger: What triggered the execution
+ """
+ from hands.models import HandOutcome
+
+ try:
+ hand = self.registry.get_hand(hand_name)
+ except Exception:
+ logger.error("Hand %s not found", hand_name)
+ return
+
+ logger.info("Executing Hand %s (trigger: %s)", hand_name, trigger.value)
+
+ # TODO: Phase 4+ - Call actual Hand implementation via HandRunner
+ # For now, just log the execution
+
+ output = f"Hand {hand_name} executed (placeholder implementation)"
+
+ # Log execution
+ await self.registry.log_execution(
+ hand_name=hand_name,
+ trigger=trigger.value,
+ outcome=HandOutcome.SUCCESS.value,
+ output=output,
+ )
+
+ # Update state
+ state = self.registry.get_state(hand_name)
+ self.registry.update_state(
+ hand_name,
+ status=HandStatus.SCHEDULED,
+ run_count=state.run_count + 1,
+ success_count=state.success_count + 1,
+ )
+
+ logger.info("Hand %s completed successfully", hand_name)
+
+ async def trigger_hand_now(self, name: str) -> bool:
+ """Manually trigger a Hand to run immediately.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ True if triggered successfully
+ """
+ try:
+ await self._run_hand(name, TriggerType.MANUAL)
+ return True
+ except Exception as e:
+ logger.error("Failed to trigger Hand %s: %s", name, e)
+ return False
+
+ def get_next_run_time(self, name: str) -> Optional[datetime]:
+ """Get next scheduled run time for a Hand.
+
+ Args:
+ name: Hand name
+
+ Returns:
+ Next run time or None if not scheduled
+ """
+ if not self._scheduler or name not in self._job_ids:
+ return None
+
+ try:
+ job = self._scheduler.get_job(self._job_ids[name])
+ return job.next_run_time if job else None
+ except Exception:
+ return None
diff --git a/tests/test_hands.py b/tests/test_hands.py
new file mode 100644
index 00000000..078ebd55
--- /dev/null
+++ b/tests/test_hands.py
@@ -0,0 +1,276 @@
+"""Tests for Hands Infrastructure.
+
+Tests HandRegistry, HandScheduler, and HandRunner.
+"""
+
+from __future__ import annotations
+
+import tempfile
+from pathlib import Path
+
+import pytest
+
+from hands import HandRegistry, HandRunner, HandScheduler
+from hands.models import HandConfig, HandStatus, ScheduleConfig
+
+
+@pytest.fixture
+def temp_hands_dir():
+ """Create a temporary hands directory with test Hands."""
+ with tempfile.TemporaryDirectory() as tmpdir:
+ hands_dir = Path(tmpdir)
+
+ # Create Oracle Hand
+ oracle_dir = hands_dir / "oracle"
+ oracle_dir.mkdir()
+ (oracle_dir / "HAND.toml").write_text('''
+[hand]
+name = "oracle"
+description = "Bitcoin intelligence"
+schedule = "0 7,19 * * *"
+
+[tools]
+required = ["mempool_fetch", "fee_estimate"]
+
+[output]
+dashboard = true
+''')
+ (oracle_dir / "SYSTEM.md").write_text("# Oracle System Prompt\nYou are Oracle.")
+
+ # Create Sentinel Hand
+ sentinel_dir = hands_dir / "sentinel"
+ sentinel_dir.mkdir()
+ (sentinel_dir / "HAND.toml").write_text('''
+[hand]
+name = "sentinel"
+description = "System health monitoring"
+schedule = "*/15 * * * *"
+enabled = true
+''')
+
+ yield hands_dir
+
+
+@pytest.fixture
+def registry(temp_hands_dir):
+ """Create HandRegistry with test Hands."""
+ db_path = temp_hands_dir / "test_hands.db"
+ reg = HandRegistry(hands_dir=temp_hands_dir, db_path=db_path)
+ return reg
+
+
+@pytest.mark.asyncio
+class TestHandRegistry:
+ """HandRegistry tests."""
+
+ async def test_load_all_hands(self, registry, temp_hands_dir):
+ """Should load all Hands from directory."""
+ hands = await registry.load_all()
+
+ assert len(hands) == 2
+ assert "oracle" in hands
+ assert "sentinel" in hands
+
+ async def test_get_hand(self, registry, temp_hands_dir):
+ """Should get Hand by name."""
+ await registry.load_all()
+
+ hand = registry.get_hand("oracle")
+ assert hand.name == "oracle"
+ assert "Bitcoin" in hand.description
+
+ async def test_get_hand_not_found(self, registry):
+ """Should raise for unknown Hand."""
+ from hands.registry import HandNotFoundError
+
+ with pytest.raises(HandNotFoundError):
+ registry.get_hand("nonexistent")
+
+ async def test_get_scheduled_hands(self, registry, temp_hands_dir):
+ """Should return only Hands with schedules."""
+ await registry.load_all()
+
+ scheduled = registry.get_scheduled_hands()
+
+ assert len(scheduled) == 2
+ assert all(h.schedule is not None for h in scheduled)
+
+ async def test_state_management(self, registry, temp_hands_dir):
+ """Should track Hand state."""
+ await registry.load_all()
+
+ state = registry.get_state("oracle")
+ assert state.name == "oracle"
+ assert state.status == HandStatus.IDLE
+
+ registry.update_state("oracle", status=HandStatus.RUNNING)
+ state = registry.get_state("oracle")
+ assert state.status == HandStatus.RUNNING
+
+ async def test_approval_queue(self, registry, temp_hands_dir):
+ """Should manage approval queue."""
+ await registry.load_all()
+
+ # Create approval
+ request = await registry.create_approval(
+ hand_name="oracle",
+ action="post_tweet",
+ description="Post Bitcoin update",
+ context={"price": 50000},
+ )
+
+ assert request.id is not None
+ assert request.hand_name == "oracle"
+
+ # Get pending
+ pending = await registry.get_pending_approvals()
+ assert len(pending) == 1
+
+ # Resolve
+ result = await registry.resolve_approval(request.id, approved=True)
+ assert result is True
+
+ # Should be empty now
+ pending = await registry.get_pending_approvals()
+ assert len(pending) == 0
+
+
+@pytest.mark.asyncio
+class TestHandScheduler:
+ """HandScheduler tests."""
+
+ async def test_scheduler_initialization(self, registry):
+ """Should initialize scheduler."""
+ scheduler = HandScheduler(registry)
+ assert scheduler.registry == registry
+ assert not scheduler._running
+
+ async def test_schedule_hand(self, registry, temp_hands_dir):
+ """Should schedule a Hand."""
+ await registry.load_all()
+ scheduler = HandScheduler(registry)
+
+ hand = registry.get_hand("oracle")
+ job_id = await scheduler.schedule_hand(hand)
+
+ # Note: Job ID may be None if APScheduler not available
+ # But should not raise an exception
+
+ async def test_get_scheduled_jobs(self, registry, temp_hands_dir):
+ """Should list scheduled jobs."""
+ await registry.load_all()
+ scheduler = HandScheduler(registry)
+
+ jobs = scheduler.get_scheduled_jobs()
+ assert isinstance(jobs, list)
+
+ async def test_trigger_hand_now(self, registry, temp_hands_dir):
+ """Should manually trigger a Hand."""
+ await registry.load_all()
+ scheduler = HandScheduler(registry)
+
+ # This will fail because Hand isn't fully implemented
+ # But should not raise
+ result = await scheduler.trigger_hand_now("oracle")
+ # Result may be True or False depending on implementation
+
+
+@pytest.mark.asyncio
+class TestHandRunner:
+ """HandRunner tests."""
+
+ async def test_load_system_prompt(self, registry, temp_hands_dir):
+ """Should load SYSTEM.md."""
+ await registry.load_all()
+ runner = HandRunner(registry)
+
+ hand = registry.get_hand("oracle")
+ prompt = runner._load_system_prompt(hand)
+
+ assert "Oracle" in prompt
+
+ async def test_load_skills(self, registry, temp_hands_dir):
+ """Should load SKILL.md files."""
+ # Create a skill file
+ skills_dir = temp_hands_dir / "oracle" / "skills"
+ skills_dir.mkdir()
+ (skills_dir / "bitcoin.md").write_text("# Bitcoin Expertise")
+
+ await registry.load_all()
+ runner = HandRunner(registry)
+
+ hand = registry.get_hand("oracle")
+ skills = runner._load_skills(hand)
+
+ assert len(skills) == 1
+ assert "Bitcoin" in skills[0]
+
+ async def test_build_prompt(self, registry, temp_hands_dir):
+ """Should build execution prompt."""
+ await registry.load_all()
+ runner = HandRunner(registry)
+
+ hand = registry.get_hand("oracle")
+ system = "System prompt"
+ skills = ["Skill 1", "Skill 2"]
+ context = {"key": "value"}
+
+ prompt = runner._build_prompt(hand, system, skills, context)
+
+ assert "System Instructions" in prompt
+ assert "System prompt" in prompt
+ assert "Skill 1" in prompt
+ assert "key" in prompt
+
+
+class TestHandConfig:
+ """HandConfig model tests."""
+
+ def test_hand_config_creation(self):
+ """Should create HandConfig."""
+ config = HandConfig(
+ name="test",
+ description="Test hand",
+ schedule=ScheduleConfig(cron="0 * * * *"),
+ )
+
+ assert config.name == "test"
+ assert config.schedule.cron == "0 * * * *"
+
+ def test_schedule_validation(self):
+ """Should validate cron expression."""
+ # Valid cron
+ config = HandConfig(
+ name="test",
+ description="Test",
+ schedule=ScheduleConfig(cron="0 7 * * *"),
+ )
+ assert config.schedule.cron == "0 7 * * *"
+
+
+class TestHandModels:
+ """Hand model tests."""
+
+ def test_hand_status_enum(self):
+ """HandStatus should have expected values."""
+ from hands.models import HandStatus
+
+ assert HandStatus.IDLE.value == "idle"
+ assert HandStatus.RUNNING.value == "running"
+ assert HandStatus.SCHEDULED.value == "scheduled"
+
+ def test_hand_state_to_dict(self):
+ """HandState should serialize to dict."""
+ from hands.models import HandState
+ from datetime import datetime
+
+ state = HandState(
+ name="test",
+ status=HandStatus.RUNNING,
+ run_count=5,
+ )
+
+ data = state.to_dict()
+ assert data["name"] == "test"
+ assert data["status"] == "running"
+ assert data["run_count"] == 5