Merge pull request #55 from AlexanderWhitestone/feature/hands-infrastructure-phase3

Feature/hands infrastructure phase3
This commit is contained in:
Alexander Whitestone
2026-02-26 12:55:28 -05:00
committed by GitHub
13 changed files with 2635 additions and 0 deletions

View File

@@ -36,6 +36,7 @@ from dashboard.routes.work_orders import router as work_orders_router
from dashboard.routes.tasks import router as tasks_router
from dashboard.routes.scripture import router as scripture_router
from dashboard.routes.self_coding import router as self_coding_router
from dashboard.routes.hands import router as hands_router
from router.api import router as cascade_router
logging.basicConfig(
@@ -201,6 +202,7 @@ app.include_router(work_orders_router)
app.include_router(tasks_router)
app.include_router(scripture_router)
app.include_router(self_coding_router)
app.include_router(hands_router)
app.include_router(cascade_router)

View File

@@ -0,0 +1,325 @@
"""Hands Dashboard Routes.
API endpoints and HTMX views for managing autonomous Hands:
- Hand status and control
- Approval queue management
- Execution history
- Manual triggering
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, JSONResponse
from hands import HandRegistry, HandRunner, HandScheduler
from hands.models import HandConfig, HandStatus, TriggerType
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/hands", tags=["hands"])
# Global instances (would be properly injected in production)
_registry: Optional[HandRegistry] = None
_scheduler: Optional[HandScheduler] = None
_runner: Optional[HandRunner] = None
def get_registry() -> HandRegistry:
"""Get or create HandRegistry singleton."""
global _registry
if _registry is None:
_registry = HandRegistry()
return _registry
def get_scheduler() -> HandScheduler:
"""Get or create HandScheduler singleton."""
global _scheduler
if _scheduler is None:
_scheduler = HandScheduler(get_registry())
return _scheduler
def get_runner() -> HandRunner:
"""Get or create HandRunner singleton."""
global _runner
if _runner is None:
_runner = HandRunner(get_registry())
return _runner
# ── API Endpoints ─────────────────────────────────────────────────────────
@router.get("/api/hands")
async def api_list_hands():
"""List all Hands with their status."""
registry = get_registry()
hands = []
for hand in registry.list_hands():
state = registry.get_state(hand.name)
hands.append({
"name": hand.name,
"description": hand.description,
"enabled": hand.enabled,
"status": state.status.value,
"schedule": hand.schedule.cron if hand.schedule else None,
"last_run": state.last_run.isoformat() if state.last_run else None,
"next_run": state.next_run.isoformat() if state.next_run else None,
"run_count": state.run_count,
})
return hands
@router.get("/api/hands/{name}")
async def api_get_hand(name: str):
"""Get detailed information about a Hand."""
registry = get_registry()
try:
hand = registry.get_hand(name)
state = registry.get_state(name)
return {
"name": hand.name,
"description": hand.description,
"enabled": hand.enabled,
"version": hand.version,
"author": hand.author,
"status": state.status.value,
"schedule": {
"cron": hand.schedule.cron if hand.schedule else None,
"timezone": hand.schedule.timezone if hand.schedule else "UTC",
},
"tools": {
"required": hand.tools_required,
"optional": hand.tools_optional,
},
"approval_gates": [
{"action": g.action, "description": g.description}
for g in hand.approval_gates
],
"output": {
"dashboard": hand.output.dashboard,
"channel": hand.output.channel,
"format": hand.output.format,
},
"state": state.to_dict(),
}
except Exception as e:
return JSONResponse(
status_code=404,
content={"error": f"Hand not found: {name}"},
)
@router.post("/api/hands/{name}/trigger")
async def api_trigger_hand(name: str):
"""Manually trigger a Hand to run."""
scheduler = get_scheduler()
success = await scheduler.trigger_hand_now(name)
if success:
return {"success": True, "message": f"Hand {name} triggered"}
else:
return JSONResponse(
status_code=500,
content={"success": False, "error": f"Failed to trigger Hand {name}"},
)
@router.post("/api/hands/{name}/pause")
async def api_pause_hand(name: str):
"""Pause a scheduled Hand."""
scheduler = get_scheduler()
if scheduler.pause_hand(name):
return {"success": True, "message": f"Hand {name} paused"}
else:
return JSONResponse(
status_code=400,
content={"success": False, "error": f"Failed to pause Hand {name}"},
)
@router.post("/api/hands/{name}/resume")
async def api_resume_hand(name: str):
"""Resume a paused Hand."""
scheduler = get_scheduler()
if scheduler.resume_hand(name):
return {"success": True, "message": f"Hand {name} resumed"}
else:
return JSONResponse(
status_code=400,
content={"success": False, "error": f"Failed to resume Hand {name}"},
)
@router.get("/api/approvals")
async def api_get_pending_approvals():
"""Get all pending approval requests."""
registry = get_registry()
approvals = await registry.get_pending_approvals()
return [
{
"id": a.id,
"hand_name": a.hand_name,
"action": a.action,
"description": a.description,
"created_at": a.created_at.isoformat(),
"expires_at": a.expires_at.isoformat() if a.expires_at else None,
}
for a in approvals
]
@router.post("/api/approvals/{approval_id}/approve")
async def api_approve_request(approval_id: str):
"""Approve a pending request."""
registry = get_registry()
if await registry.resolve_approval(approval_id, approved=True):
return {"success": True, "message": "Request approved"}
else:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Failed to approve request"},
)
@router.post("/api/approvals/{approval_id}/reject")
async def api_reject_request(approval_id: str):
"""Reject a pending request."""
registry = get_registry()
if await registry.resolve_approval(approval_id, approved=False):
return {"success": True, "message": "Request rejected"}
else:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Failed to reject request"},
)
@router.get("/api/executions")
async def api_get_executions(hand_name: Optional[str] = None, limit: int = 50):
"""Get recent Hand executions."""
registry = get_registry()
executions = await registry.get_recent_executions(hand_name, limit)
return executions
# ── HTMX Page Routes ─────────────────────────────────────────────────────
@router.get("", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Main Hands dashboard page."""
from dashboard.app import templates
return templates.TemplateResponse(
"hands.html",
{
"request": request,
"title": "Hands",
},
)
@router.get("/list", response_class=HTMLResponse)
async def hands_list_partial(request: Request):
"""HTMX partial for Hands list."""
from dashboard.app import templates
registry = get_registry()
hands_data = []
for hand in registry.list_hands():
state = registry.get_state(hand.name)
hands_data.append({
"config": hand,
"state": state,
})
return templates.TemplateResponse(
"partials/hands_list.html",
{
"request": request,
"hands": hands_data,
},
)
@router.get("/approvals", response_class=HTMLResponse)
async def approvals_partial(request: Request):
"""HTMX partial for approval queue."""
from dashboard.app import templates
registry = get_registry()
approvals = await registry.get_pending_approvals()
return templates.TemplateResponse(
"partials/approvals_list.html",
{
"request": request,
"approvals": approvals,
},
)
@router.get("/executions", response_class=HTMLResponse)
async def executions_partial(request: Request, hand_name: Optional[str] = None):
"""HTMX partial for execution history."""
from dashboard.app import templates
registry = get_registry()
executions = await registry.get_recent_executions(hand_name, limit=20)
return templates.TemplateResponse(
"partials/hand_executions.html",
{
"request": request,
"executions": executions,
"hand_name": hand_name,
},
)
@router.get("/{name}/detail", response_class=HTMLResponse)
async def hand_detail_partial(request: Request, name: str):
"""HTMX partial for Hand detail."""
from dashboard.app import templates
registry = get_registry()
try:
hand = registry.get_hand(name)
state = registry.get_state(name)
return templates.TemplateResponse(
"partials/hand_detail.html",
{
"request": request,
"hand": hand,
"state": state,
},
)
except Exception:
return templates.TemplateResponse(
"partials/error.html",
{
"request": request,
"message": f"Hand {name} not found",
},
)

View File

@@ -41,6 +41,7 @@
<a href="/router/status" class="mc-test-link">ROUTER</a>
<a href="/self-modify/queue" class="mc-test-link">UPGRADES</a>
<a href="/self-coding" class="mc-test-link">SELF-CODING</a>
<a href="/hands" class="mc-test-link">HANDS</a>
<a href="/work-orders/queue" class="mc-test-link">WORK ORDERS</a>
<a href="/creative/ui" class="mc-test-link">CREATIVE</a>
<a href="/mobile" class="mc-test-link" title="Mobile-optimized view">MOBILE</a>
@@ -73,6 +74,7 @@
<a href="/memory" class="mc-mobile-link">MEMORY</a>
<a href="/work-orders/queue" class="mc-mobile-link">WORK ORDERS</a>
<a href="/self-coding" class="mc-mobile-link">SELF-CODING</a>
<a href="/hands" class="mc-mobile-link">HANDS</a>
<a href="/creative/ui" class="mc-mobile-link">CREATIVE</a>
<a href="/voice/button" class="mc-mobile-link">VOICE</a>
<a href="/mobile" class="mc-mobile-link">MOBILE</a>

View File

@@ -0,0 +1,140 @@
{% extends "base.html" %}
{% block title %}Hands — Timmy Time{% endblock %}
{% block content %}
<div class="container-fluid py-4">
<!-- Header -->
<div class="d-flex justify-content-between align-items-center mb-4">
<div>
<h1 class="h3 mb-0">Hands</h1>
<p class="text-muted small mb-0">Autonomous scheduled agents</p>
</div>
<div class="d-flex gap-2">
<button class="btn btn-sm btn-outline-info" hx-get="/hands/list" hx-target="#hands-container">
Refresh
</button>
</div>
</div>
<!-- Main Content Grid -->
<div class="row g-4">
<!-- Left Column: Hands List -->
<div class="col-lg-8">
<div class="card border-0 shadow-sm">
<div class="card-header bg-transparent border-secondary d-flex justify-content-between align-items-center">
<h5 class="mb-0">Active Hands</h5>
<span class="badge bg-info" hx-get="/hands/api/hands" hx-trigger="every 30s" hx-swap="none">
Auto-refresh
</span>
</div>
<div class="card-body p-0">
<div id="hands-container" hx-get="/hands/list" hx-trigger="load">
<div class="d-flex justify-content-center py-5">
<div class="spinner-border text-info" role="status">
<span class="visually-hidden">Loading Hands...</span>
</div>
</div>
</div>
</div>
</div>
<!-- Executions -->
<div class="card border-0 shadow-sm mt-4">
<div class="card-header bg-transparent border-secondary">
<h5 class="mb-0">Recent Executions</h5>
</div>
<div class="card-body p-0">
<div id="executions-container" hx-get="/hands/executions" hx-trigger="load">
<div class="d-flex justify-content-center py-3">
<div class="spinner-border spinner-border-sm text-muted" role="status"></div>
</div>
</div>
</div>
</div>
</div>
<!-- Right Column: Approvals & Info -->
<div class="col-lg-4">
<!-- Pending Approvals -->
<div class="card border-0 shadow-sm mb-4">
<div class="card-header bg-transparent border-secondary d-flex justify-content-between align-items-center">
<h5 class="mb-0">Pending Approvals</h5>
<span class="badge bg-warning text-dark" id="approval-count">-</span>
</div>
<div class="card-body p-0">
<div id="approvals-container" hx-get="/hands/approvals" hx-trigger="load, every 10s">
<div class="d-flex justify-content-center py-3">
<div class="spinner-border spinner-border-sm text-muted" role="status"></div>
</div>
</div>
</div>
</div>
<!-- What are Hands -->
<div class="card border-0 shadow-sm">
<div class="card-header bg-transparent border-secondary">
<h5 class="mb-0">What are Hands?</h5>
</div>
<div class="card-body">
<p class="small mb-2">Hands are autonomous agents that run on schedules:</p>
<ul class="list-unstyled small mb-0">
<li class="mb-1">🔮 <strong>Oracle</strong> — Bitcoin intelligence</li>
<li class="mb-1">🔍 <strong>Scout</strong> — OSINT monitoring</li>
<li class="mb-1">✍️ <strong>Scribe</strong> — Content production</li>
<li class="mb-1">💰 <strong>Ledger</strong> — Treasury tracking</li>
<li class="mb-1">🔧 <strong>Forge</strong> — Model management</li>
<li class="mb-1">🎨 <strong>Weaver</strong> — Creative pipeline</li>
<li class="mb-1">🛡️ <strong>Sentinel</strong> — System health</li>
</ul>
</div>
</div>
</div>
</div>
</div>
<style>
.hand-card {
transition: all 0.2s ease;
border-left: 3px solid transparent;
}
.hand-card:hover {
background-color: rgba(255, 255, 255, 0.03);
}
.hand-card.running {
border-left-color: #0dcaf0;
}
.hand-card.scheduled {
border-left-color: #198754;
}
.hand-card.paused {
border-left-color: #ffc107;
}
.hand-card.error {
border-left-color: #dc3545;
}
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
display: inline-block;
}
.status-dot.running { background-color: #0dcaf0; animation: pulse 1.5s infinite; }
.status-dot.scheduled { background-color: #198754; }
.status-dot.paused { background-color: #ffc107; }
.status-dot.error { background-color: #dc3545; }
.status-dot.idle { background-color: #6c757d; }
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
</style>
{% endblock %}

View File

@@ -0,0 +1,44 @@
{# Approvals list partial #}
{% if approvals %}
<div class="list-group list-group-flush" id="approval-list">
{% for approval in approvals %}
<div class="list-group-item p-3" id="approval-{{ approval.id }}">
<div class="d-flex justify-content-between align-items-start mb-2">
<div>
<h6 class="mb-0">{{ approval.hand_name }}</h6>
<small class="text-muted">{{ approval.action }}</small>
</div>
<span class="badge bg-warning text-dark">PENDING</span>
</div>
<p class="small mb-2">{{ approval.description }}</p>
<div class="d-flex justify-content-between align-items-center">
<small class="text-muted">
{{ approval.created_at.strftime('%H:%M') if approval.created_at else 'Unknown' }}
</small>
<div class="btn-group btn-group-sm">
<button class="btn btn-success"
hx-post="/hands/api/approvals/{{ approval.id }}/approve"
hx-target="#approvals-container"
hx-swap="outerHTML">
✓ Approve
</button>
<button class="btn btn-danger"
hx-post="/hands/api/approvals/{{ approval.id }}/reject"
hx-target="#approvals-container"
hx-swap="outerHTML">
✗ Reject
</button>
</div>
</div>
</div>
{% endfor %}
</div>
<script>document.getElementById('approval-count').textContent = '{{ approvals|length }}';</script>
{% else %}
<div class="text-center py-4 text-muted">
<p class="mb-0 small">No pending approvals.</p>
</div>
<script>document.getElementById('approval-count').textContent = '0';</script>
{% endif %}

View File

@@ -0,0 +1,38 @@
{# Hand executions partial #}
{% if executions %}
<div class="list-group list-group-flush">
{% for exec in executions %}
<div class="list-group-item py-2 px-3">
<div class="d-flex justify-content-between align-items-center">
<div class="d-flex align-items-center gap-2">
{% if exec.outcome == 'success' %}
<span class="badge bg-success"></span>
{% elif exec.outcome == 'failure' %}
<span class="badge bg-danger"></span>
{% elif exec.outcome == 'approval_pending' %}
<span class="badge bg-warning text-dark"></span>
{% else %}
<span class="badge bg-secondary"></span>
{% endif %}
<span class="fw-medium">{{ exec.hand_name }}</span>
<small class="text-muted">({{ exec.trigger }})</small>
</div>
<small class="text-muted">
{{ exec.started_at[:16] if exec.started_at else 'Unknown' }}
</small>
</div>
{% if exec.error %}
<div class="small text-danger mt-1">
Error: {{ exec.error[:50] }}{% if exec.error|length > 50 %}...{% endif %}
</div>
{% endif %}
</div>
{% endfor %}
</div>
{% else %}
<div class="text-center py-3 text-muted">
<p class="mb-0 small">No executions yet.</p>
</div>
{% endif %}

View File

@@ -0,0 +1,77 @@
{# Hands list partial #}
{% if hands %}
<div class="list-group list-group-flush">
{% for item in hands %}
{% set hand = item.config %}
{% set state = item.state %}
<div class="list-group-item hand-card {{ state.status.value }} p-3">
<div class="d-flex justify-content-between align-items-start mb-2">
<div class="d-flex align-items-center gap-2">
<span class="status-dot {{ state.status.value }}"></span>
<h6 class="mb-0">{{ hand.name }}</h6>
{% if not hand.enabled %}
<span class="badge bg-secondary">Disabled</span>
{% endif %}
</div>
<div class="btn-group btn-group-sm">
{% if state.status.value == 'running' %}
<button class="btn btn-outline-info" disabled>Running...</button>
{% else %}
<button class="btn btn-outline-success"
hx-post="/hands/api/hands/{{ hand.name }}/trigger"
hx-swap="none"
hx-confirm="Trigger {{ hand.name }} now?">
Run
</button>
{% endif %}
{% if state.is_paused %}
<button class="btn btn-outline-warning"
hx-post="/hands/api/hands/{{ hand.name }}/resume"
hx-target="#hands-container">
Resume
</button>
{% else %}
<button class="btn btn-outline-secondary"
hx-post="/hands/api/hands/{{ hand.name }}/pause"
hx-target="#hands-container">
Pause
</button>
{% endif %}
</div>
</div>
<p class="small text-muted mb-2">{{ hand.description }}</p>
<div class="d-flex justify-content-between align-items-center small">
<div class="text-muted">
{% if hand.schedule %}
<span class="me-2">🕐 {{ hand.schedule.cron }}</span>
{% endif %}
<span class="me-2">▶️ {{ state.run_count }} runs</span>
{% if state.last_run %}
<span title="Last run: {{ state.last_run }}">Last: {{ state.last_run.strftime('%H:%M') }}</span>
{% endif %}
</div>
<span class="badge bg-{{ 'success' if state.status.value == 'scheduled' else 'warning' if state.status.value == 'paused' else 'danger' if state.status.value == 'error' else 'info' }}">
{{ state.status.value.upper() }}
</span>
</div>
{% if hand.tools_required %}
<div class="mt-2">
<small class="text-muted">Tools:</small>
{% for tool in hand.tools_required %}
<span class="badge bg-dark me-1">{{ tool }}</span>
{% endfor %}
</div>
{% endif %}
</div>
{% endfor %}
</div>
{% else %}
<div class="text-center py-5 text-muted">
<p class="mb-0">No Hands configured.</p>
<small>Create Hand packages in the hands/ directory.</small>
</div>
{% endif %}

67
src/hands/__init__.py Normal file
View File

@@ -0,0 +1,67 @@
"""Hands — Autonomous scheduled agents for Timmy Time.
The Hands framework provides autonomous agent capabilities:
- Oracle: Bitcoin and on-chain intelligence
- Scout: OSINT monitoring
- Scribe: Content production
- Ledger: Treasury tracking
- Forge: Model management
- Weaver: Creative pipeline
- Sentinel: System health
Usage:
from hands import HandRegistry, HandScheduler, HandRunner
from hands.models import HandConfig
# Load and schedule Hands
registry = HandRegistry(hands_dir="hands/")
await registry.load_all()
scheduler = HandScheduler(registry)
await scheduler.start()
# Execute a Hand manually
runner = HandRunner(registry, llm_adapter)
result = await runner.run_hand("oracle")
"""
from hands.models import (
ApprovalGate,
ApprovalRequest,
ApprovalStatus,
HandConfig,
HandExecution,
HandOutcome,
HandState,
HandStatus,
OutputConfig,
ScheduleConfig,
ToolRequirement,
TriggerType,
)
from hands.registry import HandRegistry, HandNotFoundError, HandValidationError
from hands.scheduler import HandScheduler
from hands.runner import HandRunner
__all__ = [
# Models
"HandConfig",
"HandState",
"HandExecution",
"HandStatus",
"HandOutcome",
"TriggerType",
"ApprovalGate",
"ApprovalRequest",
"ApprovalStatus",
"ScheduleConfig",
"OutputConfig",
"ToolRequirement",
# Core classes
"HandRegistry",
"HandScheduler",
"HandRunner",
# Exceptions
"HandNotFoundError",
"HandValidationError",
]

252
src/hands/models.py Normal file
View File

@@ -0,0 +1,252 @@
"""Hands Models — Pydantic schemas for HAND.toml manifests.
Defines the data structures for autonomous Hand agents:
- HandConfig: Complete hand configuration from HAND.toml
- HandState: Runtime state tracking
- HandExecution: Execution record for audit trail
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, Optional
from pydantic import BaseModel, Field, validator
class HandStatus(str, Enum):
"""Runtime status of a Hand."""
DISABLED = "disabled"
IDLE = "idle"
SCHEDULED = "scheduled"
RUNNING = "running"
PAUSED = "paused"
ERROR = "error"
class HandOutcome(str, Enum):
"""Outcome of a Hand execution."""
SUCCESS = "success"
FAILURE = "failure"
APPROVAL_PENDING = "approval_pending"
TIMEOUT = "timeout"
SKIPPED = "skipped"
class TriggerType(str, Enum):
"""Types of execution triggers."""
SCHEDULE = "schedule" # Cron schedule
MANUAL = "manual" # User triggered
EVENT = "event" # Event-driven
WEBHOOK = "webhook" # External webhook
# ── HAND.toml Schema Models ───────────────────────────────────────────────
class ToolRequirement(BaseModel):
"""A required tool for the Hand."""
name: str
version: Optional[str] = None
optional: bool = False
class OutputConfig(BaseModel):
"""Output configuration for Hand results."""
dashboard: bool = True
channel: Optional[str] = None # e.g., "telegram", "discord"
format: str = "markdown" # markdown, json, html
file_drop: Optional[str] = None # Path to write output files
class ApprovalGate(BaseModel):
"""An approval gate for sensitive operations."""
action: str # e.g., "post_tweet", "send_payment"
description: str
auto_approve_after: Optional[int] = None # Seconds to auto-approve
class ScheduleConfig(BaseModel):
"""Schedule configuration for the Hand."""
cron: Optional[str] = None # Cron expression
interval: Optional[int] = None # Seconds between runs
at: Optional[str] = None # Specific time (HH:MM)
timezone: str = "UTC"
@validator('cron')
def validate_cron(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return v
# Basic cron validation (5 fields)
parts = v.split()
if len(parts) != 5:
raise ValueError("Cron expression must have 5 fields: minute hour day month weekday")
return v
class HandConfig(BaseModel):
"""Complete Hand configuration from HAND.toml.
Example HAND.toml:
[hand]
name = "oracle"
schedule = "0 7,19 * * *"
description = "Bitcoin and on-chain intelligence briefing"
[tools]
required = ["mempool_fetch", "fee_estimate"]
[approval_gates]
post_tweet = { action = "post_tweet", description = "Post to Twitter" }
[output]
dashboard = true
channel = "telegram"
"""
# Required fields
name: str = Field(..., description="Unique hand identifier")
description: str = Field(..., description="What this Hand does")
# Schedule (one of these must be set)
schedule: Optional[ScheduleConfig] = None
trigger: Optional[TriggerType] = TriggerType.SCHEDULE
# Optional fields
enabled: bool = True
version: str = "1.0.0"
author: Optional[str] = None
# Tools
tools_required: list[str] = Field(default_factory=list)
tools_optional: list[str] = Field(default_factory=list)
# Approval gates
approval_gates: list[ApprovalGate] = Field(default_factory=list)
# Output configuration
output: OutputConfig = Field(default_factory=OutputConfig)
# File paths (set at runtime)
hand_dir: Optional[Path] = Field(None, exclude=True)
system_prompt_path: Optional[Path] = None
skill_paths: list[Path] = Field(default_factory=list)
class Config:
extra = "allow" # Allow additional fields for extensibility
@property
def system_md_path(self) -> Optional[Path]:
"""Path to SYSTEM.md file."""
if self.hand_dir:
return self.hand_dir / "SYSTEM.md"
return None
@property
def skill_md_paths(self) -> list[Path]:
"""Paths to SKILL.md files."""
if self.hand_dir:
skill_dir = self.hand_dir / "skills"
if skill_dir.exists():
return list(skill_dir.glob("*.md"))
return []
# ── Runtime State Models ─────────────────────────────────────────────────
@dataclass
class HandState:
"""Runtime state of a Hand."""
name: str
status: HandStatus = HandStatus.IDLE
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
run_count: int = 0
success_count: int = 0
failure_count: int = 0
error_message: Optional[str] = None
is_paused: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"status": self.status.value,
"last_run": self.last_run.isoformat() if self.last_run else None,
"next_run": self.next_run.isoformat() if self.next_run else None,
"run_count": self.run_count,
"success_count": self.success_count,
"failure_count": self.failure_count,
"error_message": self.error_message,
"is_paused": self.is_paused,
}
@dataclass
class HandExecution:
"""Record of a Hand execution."""
id: str
hand_name: str
trigger: TriggerType
started_at: datetime
completed_at: Optional[datetime] = None
outcome: HandOutcome = HandOutcome.SKIPPED
output: str = ""
error: Optional[str] = None
approval_id: Optional[str] = None
files_generated: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"hand_name": self.hand_name,
"trigger": self.trigger.value,
"started_at": self.started_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"outcome": self.outcome.value,
"output": self.output,
"error": self.error,
"approval_id": self.approval_id,
"files_generated": self.files_generated,
}
# ── Approval Queue Models ────────────────────────────────────────────────
class ApprovalStatus(str, Enum):
"""Status of an approval request."""
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
EXPIRED = "expired"
AUTO_APPROVED = "auto_approved"
@dataclass
class ApprovalRequest:
"""A request for user approval."""
id: str
hand_name: str
action: str
description: str
context: dict[str, Any] = field(default_factory=dict)
status: ApprovalStatus = ApprovalStatus.PENDING
created_at: datetime = field(default_factory=datetime.utcnow)
expires_at: Optional[datetime] = None
resolved_at: Optional[datetime] = None
resolved_by: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"hand_name": self.hand_name,
"action": self.action,
"description": self.description,
"context": self.context,
"status": self.status.value,
"created_at": self.created_at.isoformat(),
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
"resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
"resolved_by": self.resolved_by,
}

526
src/hands/registry.py Normal file
View File

@@ -0,0 +1,526 @@
"""Hand Registry — Load, validate, and index Hands from the hands directory.
The HandRegistry discovers all Hand packages in the hands/ directory,
loads their HAND.toml manifests, and maintains an index for fast lookup.
Usage:
from hands.registry import HandRegistry
registry = HandRegistry(hands_dir="hands/")
await registry.load_all()
oracle = registry.get_hand("oracle")
all_hands = registry.list_hands()
scheduled = registry.get_scheduled_hands()
"""
from __future__ import annotations
import logging
import sqlite3
import tomllib
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from hands.models import ApprovalGate, ApprovalRequest, ApprovalStatus, HandConfig, HandState, HandStatus, OutputConfig, ScheduleConfig
logger = logging.getLogger(__name__)
class HandRegistryError(Exception):
"""Base exception for HandRegistry errors."""
pass
class HandNotFoundError(HandRegistryError):
"""Raised when a Hand is not found."""
pass
class HandValidationError(HandRegistryError):
"""Raised when a Hand fails validation."""
pass
class HandRegistry:
"""Registry for autonomous Hands.
Discovers Hands from the filesystem, loads their configurations,
and maintains a SQLite index for fast lookups.
Attributes:
hands_dir: Directory containing Hand packages
db_path: SQLite database for indexing
_hands: In-memory cache of loaded HandConfigs
_states: Runtime state of each Hand
"""
def __init__(
self,
hands_dir: str | Path = "hands/",
db_path: str | Path = "data/hands.db",
) -> None:
"""Initialize HandRegistry.
Args:
hands_dir: Directory containing Hand subdirectories
db_path: SQLite database path for indexing
"""
self.hands_dir = Path(hands_dir)
self.db_path = Path(db_path)
self._hands: dict[str, HandConfig] = {}
self._states: dict[str, HandState] = {}
self._ensure_schema()
logger.info("HandRegistry initialized (hands_dir=%s)", self.hands_dir)
def _get_conn(self) -> sqlite3.Connection:
"""Get database connection."""
self.db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _ensure_schema(self) -> None:
"""Create database tables if they don't exist."""
with self._get_conn() as conn:
# Hands index
conn.execute("""
CREATE TABLE IF NOT EXISTS hands (
name TEXT PRIMARY KEY,
config_json TEXT NOT NULL,
enabled INTEGER DEFAULT 1,
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Hand execution history
conn.execute("""
CREATE TABLE IF NOT EXISTS hand_executions (
id TEXT PRIMARY KEY,
hand_name TEXT NOT NULL,
trigger TEXT NOT NULL,
started_at TIMESTAMP NOT NULL,
completed_at TIMESTAMP,
outcome TEXT NOT NULL,
output TEXT,
error TEXT,
approval_id TEXT
)
""")
# Approval queue
conn.execute("""
CREATE TABLE IF NOT EXISTS approval_queue (
id TEXT PRIMARY KEY,
hand_name TEXT NOT NULL,
action TEXT NOT NULL,
description TEXT NOT NULL,
context_json TEXT,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
resolved_at TIMESTAMP,
resolved_by TEXT
)
""")
conn.commit()
async def load_all(self) -> dict[str, HandConfig]:
"""Load all Hands from the hands directory.
Returns:
Dict mapping hand names to HandConfigs
"""
if not self.hands_dir.exists():
logger.warning("Hands directory does not exist: %s", self.hands_dir)
return {}
loaded = {}
for hand_dir in self.hands_dir.iterdir():
if not hand_dir.is_dir():
continue
try:
hand = self._load_hand_from_dir(hand_dir)
if hand:
loaded[hand.name] = hand
self._hands[hand.name] = hand
# Initialize state if not exists
if hand.name not in self._states:
self._states[hand.name] = HandState(name=hand.name)
# Store in database
self._store_hand(conn=None, hand=hand)
logger.info("Loaded Hand: %s (%s)", hand.name, hand.description[:50])
except Exception as e:
logger.error("Failed to load Hand from %s: %s", hand_dir, e)
logger.info("Loaded %d Hands", len(loaded))
return loaded
def _load_hand_from_dir(self, hand_dir: Path) -> Optional[HandConfig]:
"""Load a single Hand from its directory.
Args:
hand_dir: Directory containing HAND.toml
Returns:
HandConfig or None if invalid
"""
manifest_path = hand_dir / "HAND.toml"
if not manifest_path.exists():
logger.debug("No HAND.toml in %s", hand_dir)
return None
# Parse TOML
try:
with open(manifest_path, "rb") as f:
data = tomllib.load(f)
except Exception as e:
raise HandValidationError(f"Invalid HAND.toml: {e}")
# Extract hand section
hand_data = data.get("hand", {})
if not hand_data:
raise HandValidationError("Missing [hand] section in HAND.toml")
# Build HandConfig
config = HandConfig(
name=hand_data.get("name", hand_dir.name),
description=hand_data.get("description", ""),
enabled=hand_data.get("enabled", True),
version=hand_data.get("version", "1.0.0"),
author=hand_data.get("author"),
hand_dir=hand_dir,
)
# Parse schedule
if "schedule" in hand_data:
schedule_data = hand_data["schedule"]
if isinstance(schedule_data, str):
# Simple cron string
config.schedule = ScheduleConfig(cron=schedule_data)
elif isinstance(schedule_data, dict):
config.schedule = ScheduleConfig(**schedule_data)
# Parse tools
tools_data = data.get("tools", {})
config.tools_required = tools_data.get("required", [])
config.tools_optional = tools_data.get("optional", [])
# Parse approval gates
gates_data = data.get("approval_gates", {})
for action, gate_data in gates_data.items():
if isinstance(gate_data, dict):
config.approval_gates.append(ApprovalGate(
action=gate_data.get("action", action),
description=gate_data.get("description", ""),
auto_approve_after=gate_data.get("auto_approve_after"),
))
# Parse output config
output_data = data.get("output", {})
config.output = OutputConfig(**output_data)
return config
def _store_hand(self, conn: Optional[sqlite3.Connection], hand: HandConfig) -> None:
"""Store hand config in database."""
import json
if conn is None:
with self._get_conn() as conn:
self._store_hand(conn, hand)
return
conn.execute(
"""
INSERT OR REPLACE INTO hands (name, config_json, enabled)
VALUES (?, ?, ?)
""",
(hand.name, hand.json(), 1 if hand.enabled else 0),
)
conn.commit()
def get_hand(self, name: str) -> HandConfig:
"""Get a Hand by name.
Args:
name: Hand name
Returns:
HandConfig
Raises:
HandNotFoundError: If Hand doesn't exist
"""
if name not in self._hands:
raise HandNotFoundError(f"Hand not found: {name}")
return self._hands[name]
def list_hands(self) -> list[HandConfig]:
"""List all loaded Hands.
Returns:
List of HandConfigs
"""
return list(self._hands.values())
def get_scheduled_hands(self) -> list[HandConfig]:
"""Get all Hands with schedule configuration.
Returns:
List of HandConfigs with schedules
"""
return [h for h in self._hands.values() if h.schedule is not None and h.enabled]
def get_enabled_hands(self) -> list[HandConfig]:
"""Get all enabled Hands.
Returns:
List of enabled HandConfigs
"""
return [h for h in self._hands.values() if h.enabled]
def get_state(self, name: str) -> HandState:
"""Get runtime state of a Hand.
Args:
name: Hand name
Returns:
HandState
"""
if name not in self._states:
self._states[name] = HandState(name=name)
return self._states[name]
def update_state(self, name: str, **kwargs) -> None:
"""Update Hand state.
Args:
name: Hand name
**kwargs: State fields to update
"""
state = self.get_state(name)
for key, value in kwargs.items():
if hasattr(state, key):
setattr(state, key, value)
async def log_execution(
self,
hand_name: str,
trigger: str,
outcome: str,
output: str = "",
error: Optional[str] = None,
approval_id: Optional[str] = None,
) -> str:
"""Log a Hand execution.
Args:
hand_name: Name of the Hand
trigger: Trigger type
outcome: Execution outcome
output: Execution output
error: Error message if failed
approval_id: Associated approval ID
Returns:
Execution ID
"""
execution_id = str(uuid.uuid4())
with self._get_conn() as conn:
conn.execute(
"""
INSERT INTO hand_executions
(id, hand_name, trigger, started_at, completed_at, outcome, output, error, approval_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
execution_id,
hand_name,
trigger,
datetime.now(timezone.utc).isoformat(),
datetime.now(timezone.utc).isoformat(),
outcome,
output,
error,
approval_id,
),
)
conn.commit()
return execution_id
async def create_approval(
self,
hand_name: str,
action: str,
description: str,
context: dict,
expires_after: Optional[int] = None,
) -> ApprovalRequest:
"""Create an approval request.
Args:
hand_name: Hand requesting approval
action: Action to approve
description: Human-readable description
context: Additional context
expires_after: Seconds until expiration
Returns:
ApprovalRequest
"""
approval_id = str(uuid.uuid4())
created_at = datetime.now(timezone.utc)
expires_at = None
if expires_after:
from datetime import timedelta
expires_at = created_at + timedelta(seconds=expires_after)
request = ApprovalRequest(
id=approval_id,
hand_name=hand_name,
action=action,
description=description,
context=context,
created_at=created_at,
expires_at=expires_at,
)
# Store in database
import json
with self._get_conn() as conn:
conn.execute(
"""
INSERT INTO approval_queue
(id, hand_name, action, description, context_json, status, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
request.id,
request.hand_name,
request.action,
request.description,
json.dumps(request.context),
request.status.value,
request.created_at.isoformat(),
request.expires_at.isoformat() if request.expires_at else None,
),
)
conn.commit()
return request
async def get_pending_approvals(self) -> list[ApprovalRequest]:
"""Get all pending approval requests.
Returns:
List of pending ApprovalRequests
"""
import json
with self._get_conn() as conn:
rows = conn.execute(
"""
SELECT * FROM approval_queue
WHERE status = 'pending'
ORDER BY created_at DESC
"""
).fetchall()
requests = []
for row in rows:
requests.append(ApprovalRequest(
id=row["id"],
hand_name=row["hand_name"],
action=row["action"],
description=row["description"],
context=json.loads(row["context_json"] or "{}"),
status=ApprovalStatus(row["status"]),
created_at=datetime.fromisoformat(row["created_at"]),
expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None,
))
return requests
async def resolve_approval(
self,
approval_id: str,
approved: bool,
resolved_by: Optional[str] = None,
) -> bool:
"""Resolve an approval request.
Args:
approval_id: ID of the approval request
approved: True to approve, False to reject
resolved_by: Who resolved the request
Returns:
True if resolved successfully
"""
status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED
resolved_at = datetime.now(timezone.utc)
with self._get_conn() as conn:
cursor = conn.execute(
"""
UPDATE approval_queue
SET status = ?, resolved_at = ?, resolved_by = ?
WHERE id = ? AND status = 'pending'
""",
(status.value, resolved_at.isoformat(), resolved_by, approval_id),
)
conn.commit()
return cursor.rowcount > 0
async def get_recent_executions(
self,
hand_name: Optional[str] = None,
limit: int = 50,
) -> list[dict]:
"""Get recent Hand executions.
Args:
hand_name: Filter by Hand name
limit: Maximum results
Returns:
List of execution records
"""
with self._get_conn() as conn:
if hand_name:
rows = conn.execute(
"""
SELECT * FROM hand_executions
WHERE hand_name = ?
ORDER BY started_at DESC
LIMIT ?
""",
(hand_name, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT * FROM hand_executions
ORDER BY started_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(row) for row in rows]

476
src/hands/runner.py Normal file
View File

@@ -0,0 +1,476 @@
"""Hand Runner — Execute Hands with skill injection and tool access.
The HandRunner is responsible for executing individual Hands:
- Load SYSTEM.md and SKILL.md files
- Inject domain expertise into LLM context
- Execute the tool loop
- Handle approval gates
- Produce output
Usage:
from hands.runner import HandRunner
from hands.registry import HandRegistry
registry = HandRegistry()
runner = HandRunner(registry, llm_adapter)
result = await runner.run_hand("oracle")
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from hands.models import (
ApprovalRequest,
ApprovalStatus,
HandConfig,
HandExecution,
HandOutcome,
HandState,
HandStatus,
TriggerType,
)
from hands.registry import HandRegistry
logger = logging.getLogger(__name__)
class HandRunner:
"""Executes individual Hands.
Manages the execution lifecycle:
1. Load system prompt and skills
2. Check and handle approval gates
3. Execute tool loop with LLM
4. Produce and deliver output
5. Log execution
Attributes:
registry: HandRegistry for Hand configs and state
llm_adapter: LLM adapter for generation
mcp_registry: Optional MCP tool registry
"""
def __init__(
self,
registry: HandRegistry,
llm_adapter: Optional[Any] = None,
mcp_registry: Optional[Any] = None,
) -> None:
"""Initialize HandRunner.
Args:
registry: HandRegistry instance
llm_adapter: LLM adapter for generation
mcp_registry: Optional MCP tool registry for tool access
"""
self.registry = registry
self.llm_adapter = llm_adapter
self.mcp_registry = mcp_registry
logger.info("HandRunner initialized")
async def run_hand(
self,
hand_name: str,
trigger: TriggerType = TriggerType.MANUAL,
context: Optional[dict] = None,
) -> HandExecution:
"""Run a Hand.
This is the main entry point for Hand execution.
Args:
hand_name: Name of the Hand to run
trigger: What triggered this execution
context: Optional execution context
Returns:
HandExecution record
"""
started_at = datetime.now(timezone.utc)
execution_id = f"exec_{hand_name}_{started_at.isoformat()}"
logger.info("Starting Hand execution: %s", hand_name)
try:
# Get Hand config
hand = self.registry.get_hand(hand_name)
# Update state
self.registry.update_state(
hand_name,
status=HandStatus.RUNNING,
last_run=started_at,
)
# Load system prompt and skills
system_prompt = self._load_system_prompt(hand)
skills = self._load_skills(hand)
# Check approval gates
approval_results = await self._check_approvals(hand)
if approval_results.get("blocked"):
return await self._create_execution_record(
execution_id=execution_id,
hand_name=hand_name,
trigger=trigger,
started_at=started_at,
outcome=HandOutcome.APPROVAL_PENDING,
output="",
approval_id=approval_results.get("approval_id"),
)
# Execute the Hand
result = await self._execute_with_llm(
hand=hand,
system_prompt=system_prompt,
skills=skills,
context=context or {},
)
# Deliver output
await self._deliver_output(hand, result)
# Update state
state = self.registry.get_state(hand_name)
self.registry.update_state(
hand_name,
status=HandStatus.IDLE,
run_count=state.run_count + 1,
success_count=state.success_count + 1,
)
# Create execution record
return await self._create_execution_record(
execution_id=execution_id,
hand_name=hand_name,
trigger=trigger,
started_at=started_at,
outcome=HandOutcome.SUCCESS,
output=result.get("output", ""),
files_generated=result.get("files", []),
)
except Exception as e:
logger.exception("Hand %s execution failed", hand_name)
# Update state
self.registry.update_state(
hand_name,
status=HandStatus.ERROR,
error_message=str(e),
)
# Create failure record
return await self._create_execution_record(
execution_id=execution_id,
hand_name=hand_name,
trigger=trigger,
started_at=started_at,
outcome=HandOutcome.FAILURE,
output="",
error=str(e),
)
def _load_system_prompt(self, hand: HandConfig) -> str:
"""Load SYSTEM.md for a Hand.
Args:
hand: HandConfig
Returns:
System prompt text
"""
if hand.system_md_path and hand.system_md_path.exists():
try:
return hand.system_md_path.read_text()
except Exception as e:
logger.warning("Failed to load SYSTEM.md for %s: %s", hand.name, e)
# Default system prompt
return f"""You are the {hand.name} Hand.
Your purpose: {hand.description}
You have access to the following tools: {', '.join(hand.tools_required + hand.tools_optional)}
Execute your task professionally and produce the requested output.
"""
def _load_skills(self, hand: HandConfig) -> list[str]:
"""Load SKILL.md files for a Hand.
Args:
hand: HandConfig
Returns:
List of skill texts
"""
skills = []
for skill_path in hand.skill_md_paths:
try:
if skill_path.exists():
skills.append(skill_path.read_text())
except Exception as e:
logger.warning("Failed to load skill %s: %s", skill_path, e)
return skills
async def _check_approvals(self, hand: HandConfig) -> dict:
"""Check if any approval gates block execution.
Args:
hand: HandConfig
Returns:
Dict with "blocked" and optional "approval_id"
"""
if not hand.approval_gates:
return {"blocked": False}
# Check for pending approvals for this hand
pending = await self.registry.get_pending_approvals()
hand_pending = [a for a in pending if a.hand_name == hand.name]
if hand_pending:
return {
"blocked": True,
"approval_id": hand_pending[0].id,
}
# Create approval requests for each gate
for gate in hand.approval_gates:
request = await self.registry.create_approval(
hand_name=hand.name,
action=gate.action,
description=gate.description,
context={"gate": gate.action},
expires_after=gate.auto_approve_after,
)
if not gate.auto_approve_after:
# Requires manual approval
return {
"blocked": True,
"approval_id": request.id,
}
return {"blocked": False}
async def _execute_with_llm(
self,
hand: HandConfig,
system_prompt: str,
skills: list[str],
context: dict,
) -> dict:
"""Execute Hand logic with LLM.
Args:
hand: HandConfig
system_prompt: System prompt
skills: Skill texts
context: Execution context
Returns:
Result dict with output and files
"""
if not self.llm_adapter:
logger.warning("No LLM adapter available for Hand %s", hand.name)
return {
"output": f"Hand {hand.name} executed (no LLM configured)",
"files": [],
}
# Build the full prompt
full_prompt = self._build_prompt(
hand=hand,
system_prompt=system_prompt,
skills=skills,
context=context,
)
try:
# Call LLM
response = await self.llm_adapter.chat(message=full_prompt)
# Parse response
output = response.content
# Extract any file outputs (placeholder - would parse structured output)
files = []
return {
"output": output,
"files": files,
}
except Exception as e:
logger.error("LLM execution failed for Hand %s: %s", hand.name, e)
raise
def _build_prompt(
self,
hand: HandConfig,
system_prompt: str,
skills: list[str],
context: dict,
) -> str:
"""Build the full execution prompt.
Args:
hand: HandConfig
system_prompt: System prompt
skills: Skill texts
context: Execution context
Returns:
Complete prompt
"""
parts = [
"# System Instructions",
system_prompt,
"",
]
# Add skills
if skills:
parts.extend([
"# Domain Expertise (SKILL.md)",
"\n\n---\n\n".join(skills),
"",
])
# Add context
if context:
parts.extend([
"# Execution Context",
str(context),
"",
])
# Add available tools
if hand.tools_required or hand.tools_optional:
parts.extend([
"# Available Tools",
"Required: " + ", ".join(hand.tools_required),
"Optional: " + ", ".join(hand.tools_optional),
"",
])
# Add output instructions
parts.extend([
"# Output Instructions",
f"Format: {hand.output.format}",
f"Dashboard: {'Yes' if hand.output.dashboard else 'No'}",
f"Channel: {hand.output.channel or 'None'}",
"",
"Execute your task now.",
])
return "\n".join(parts)
async def _deliver_output(self, hand: HandConfig, result: dict) -> None:
"""Deliver Hand output to configured destinations.
Args:
hand: HandConfig
result: Execution result
"""
output = result.get("output", "")
# Dashboard output
if hand.output.dashboard:
# This would publish to event bus for dashboard
logger.info("Hand %s output delivered to dashboard", hand.name)
# Channel output (e.g., Telegram, Discord)
if hand.output.channel:
# This would send to the appropriate channel
logger.info("Hand %s output delivered to %s", hand.name, hand.output.channel)
# File drop
if hand.output.file_drop:
try:
drop_path = Path(hand.output.file_drop)
drop_path.mkdir(parents=True, exist_ok=True)
output_file = drop_path / f"{hand.name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
output_file.write_text(output)
logger.info("Hand %s output written to %s", hand.name, output_file)
except Exception as e:
logger.error("Failed to write Hand %s output: %s", hand.name, e)
async def _create_execution_record(
self,
execution_id: str,
hand_name: str,
trigger: TriggerType,
started_at: datetime,
outcome: HandOutcome,
output: str,
error: Optional[str] = None,
approval_id: Optional[str] = None,
files_generated: Optional[list] = None,
) -> HandExecution:
"""Create and store execution record.
Returns:
HandExecution
"""
completed_at = datetime.now(timezone.utc)
execution = HandExecution(
id=execution_id,
hand_name=hand_name,
trigger=trigger,
started_at=started_at,
completed_at=completed_at,
outcome=outcome,
output=output,
error=error,
approval_id=approval_id,
files_generated=files_generated or [],
)
# Log to registry
await self.registry.log_execution(
hand_name=hand_name,
trigger=trigger.value,
outcome=outcome.value,
output=output,
error=error,
approval_id=approval_id,
)
return execution
async def continue_after_approval(
self,
approval_id: str,
) -> Optional[HandExecution]:
"""Continue Hand execution after approval.
Args:
approval_id: Approval request ID
Returns:
HandExecution if execution proceeded
"""
# Get approval request
# This would need a get_approval_by_id method in registry
# For now, placeholder
logger.info("Continuing Hand execution after approval %s", approval_id)
# Re-run the Hand
# This would look up the hand from the approval context
return None

410
src/hands/scheduler.py Normal file
View File

@@ -0,0 +1,410 @@
"""Hand Scheduler — APScheduler-based cron scheduling for Hands.
Manages the scheduling of autonomous Hands using APScheduler.
Supports cron expressions, intervals, and specific times.
Usage:
from hands.scheduler import HandScheduler
from hands.registry import HandRegistry
registry = HandRegistry()
await registry.load_all()
scheduler = HandScheduler(registry)
await scheduler.start()
# Hands are now scheduled and will run automatically
"""
from __future__ import annotations
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any, Callable, Optional
from hands.models import HandConfig, HandState, HandStatus, TriggerType
from hands.registry import HandRegistry
logger = logging.getLogger(__name__)
# Try to import APScheduler
try:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
APSCHEDULER_AVAILABLE = True
except ImportError:
APSCHEDULER_AVAILABLE = False
logger.warning("APScheduler not installed. Scheduling will be disabled.")
class HandScheduler:
"""Scheduler for autonomous Hands.
Uses APScheduler to manage cron-based execution of Hands.
Each Hand with a schedule gets its own job in the scheduler.
Attributes:
registry: HandRegistry for Hand configurations
_scheduler: APScheduler instance
_running: Whether scheduler is running
_job_ids: Mapping of hand names to job IDs
"""
def __init__(
self,
registry: HandRegistry,
job_defaults: Optional[dict] = None,
) -> None:
"""Initialize HandScheduler.
Args:
registry: HandRegistry instance
job_defaults: Default job configuration for APScheduler
"""
self.registry = registry
self._scheduler: Optional[Any] = None
self._running = False
self._job_ids: dict[str, str] = {}
if APSCHEDULER_AVAILABLE:
self._scheduler = AsyncIOScheduler(job_defaults=job_defaults or {
'coalesce': True, # Coalesce missed jobs into one
'max_instances': 1, # Only one instance per Hand
})
logger.info("HandScheduler initialized")
async def start(self) -> None:
"""Start the scheduler and schedule all enabled Hands."""
if not APSCHEDULER_AVAILABLE:
logger.error("Cannot start scheduler: APScheduler not installed")
return
if self._running:
logger.warning("Scheduler already running")
return
# Schedule all enabled Hands
hands = self.registry.get_scheduled_hands()
for hand in hands:
await self.schedule_hand(hand)
# Start the scheduler
self._scheduler.start()
self._running = True
logger.info("HandScheduler started with %d scheduled Hands", len(hands))
async def stop(self) -> None:
"""Stop the scheduler."""
if not self._running or not self._scheduler:
return
self._scheduler.shutdown(wait=True)
self._running = False
self._job_ids.clear()
logger.info("HandScheduler stopped")
async def schedule_hand(self, hand: HandConfig) -> Optional[str]:
"""Schedule a Hand for execution.
Args:
hand: HandConfig to schedule
Returns:
Job ID if scheduled successfully
"""
if not APSCHEDULER_AVAILABLE or not self._scheduler:
logger.warning("Cannot schedule %s: APScheduler not available", hand.name)
return None
if not hand.schedule:
logger.debug("Hand %s has no schedule", hand.name)
return None
if not hand.enabled:
logger.debug("Hand %s is disabled", hand.name)
return None
# Remove existing job if any
if hand.name in self._job_ids:
self.unschedule_hand(hand.name)
# Create the trigger
trigger = self._create_trigger(hand.schedule)
if not trigger:
logger.error("Failed to create trigger for Hand %s", hand.name)
return None
# Add job to scheduler
try:
job = self._scheduler.add_job(
func=self._execute_hand_wrapper,
trigger=trigger,
id=f"hand_{hand.name}",
name=f"Hand: {hand.name}",
args=[hand.name],
replace_existing=True,
)
self._job_ids[hand.name] = job.id
# Update state
self.registry.update_state(
hand.name,
status=HandStatus.SCHEDULED,
next_run=job.next_run_time,
)
logger.info("Scheduled Hand %s (next run: %s)", hand.name, job.next_run_time)
return job.id
except Exception as e:
logger.error("Failed to schedule Hand %s: %s", hand.name, e)
return None
def unschedule_hand(self, name: str) -> bool:
"""Remove a Hand from the scheduler.
Args:
name: Hand name
Returns:
True if unscheduled successfully
"""
if not self._scheduler:
return False
if name not in self._job_ids:
return False
try:
self._scheduler.remove_job(self._job_ids[name])
del self._job_ids[name]
self.registry.update_state(name, status=HandStatus.IDLE)
logger.info("Unscheduled Hand %s", name)
return True
except Exception as e:
logger.error("Failed to unschedule Hand %s: %s", name, e)
return False
def pause_hand(self, name: str) -> bool:
"""Pause a scheduled Hand.
Args:
name: Hand name
Returns:
True if paused successfully
"""
if not self._scheduler:
return False
if name not in self._job_ids:
return False
try:
self._scheduler.pause_job(self._job_ids[name])
self.registry.update_state(name, status=HandStatus.PAUSED, is_paused=True)
logger.info("Paused Hand %s", name)
return True
except Exception as e:
logger.error("Failed to pause Hand %s: %s", name, e)
return False
def resume_hand(self, name: str) -> bool:
"""Resume a paused Hand.
Args:
name: Hand name
Returns:
True if resumed successfully
"""
if not self._scheduler:
return False
if name not in self._job_ids:
return False
try:
self._scheduler.resume_job(self._job_ids[name])
self.registry.update_state(name, status=HandStatus.SCHEDULED, is_paused=False)
logger.info("Resumed Hand %s", name)
return True
except Exception as e:
logger.error("Failed to resume Hand %s: %s", name, e)
return False
def get_scheduled_jobs(self) -> list[dict]:
"""Get all scheduled jobs.
Returns:
List of job information dicts
"""
if not self._scheduler:
return []
jobs = []
for job in self._scheduler.get_jobs():
if job.id.startswith("hand_"):
hand_name = job.id[5:] # Remove "hand_" prefix
jobs.append({
"hand_name": hand_name,
"job_id": job.id,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"trigger": str(job.trigger),
})
return jobs
def _create_trigger(self, schedule: Any) -> Optional[Any]:
"""Create an APScheduler trigger from ScheduleConfig.
Args:
schedule: ScheduleConfig
Returns:
APScheduler trigger
"""
if not APSCHEDULER_AVAILABLE:
return None
# Cron trigger
if schedule.cron:
try:
parts = schedule.cron.split()
if len(parts) == 5:
return CronTrigger(
minute=parts[0],
hour=parts[1],
day=parts[2],
month=parts[3],
day_of_week=parts[4],
timezone=schedule.timezone,
)
except Exception as e:
logger.error("Invalid cron expression '%s': %s", schedule.cron, e)
return None
# Interval trigger
if schedule.interval:
return IntervalTrigger(
seconds=schedule.interval,
timezone=schedule.timezone,
)
return None
async def _execute_hand_wrapper(self, hand_name: str) -> None:
"""Wrapper for Hand execution.
This is called by APScheduler when a Hand's trigger fires.
Args:
hand_name: Name of the Hand to execute
"""
logger.info("Triggering Hand: %s", hand_name)
try:
# Update state
self.registry.update_state(
hand_name,
status=HandStatus.RUNNING,
last_run=datetime.now(timezone.utc),
)
# Execute the Hand
await self._run_hand(hand_name, TriggerType.SCHEDULE)
except Exception as e:
logger.exception("Hand %s execution failed", hand_name)
self.registry.update_state(
hand_name,
status=HandStatus.ERROR,
error_message=str(e),
)
async def _run_hand(self, hand_name: str, trigger: TriggerType) -> None:
"""Execute a Hand.
This is the core execution logic. In Phase 4+, this will
call the actual Hand implementation.
Args:
hand_name: Name of the Hand
trigger: What triggered the execution
"""
from hands.models import HandOutcome
try:
hand = self.registry.get_hand(hand_name)
except Exception:
logger.error("Hand %s not found", hand_name)
return
logger.info("Executing Hand %s (trigger: %s)", hand_name, trigger.value)
# TODO: Phase 4+ - Call actual Hand implementation via HandRunner
# For now, just log the execution
output = f"Hand {hand_name} executed (placeholder implementation)"
# Log execution
await self.registry.log_execution(
hand_name=hand_name,
trigger=trigger.value,
outcome=HandOutcome.SUCCESS.value,
output=output,
)
# Update state
state = self.registry.get_state(hand_name)
self.registry.update_state(
hand_name,
status=HandStatus.SCHEDULED,
run_count=state.run_count + 1,
success_count=state.success_count + 1,
)
logger.info("Hand %s completed successfully", hand_name)
async def trigger_hand_now(self, name: str) -> bool:
"""Manually trigger a Hand to run immediately.
Args:
name: Hand name
Returns:
True if triggered successfully
"""
try:
await self._run_hand(name, TriggerType.MANUAL)
return True
except Exception as e:
logger.error("Failed to trigger Hand %s: %s", name, e)
return False
def get_next_run_time(self, name: str) -> Optional[datetime]:
"""Get next scheduled run time for a Hand.
Args:
name: Hand name
Returns:
Next run time or None if not scheduled
"""
if not self._scheduler or name not in self._job_ids:
return None
try:
job = self._scheduler.get_job(self._job_ids[name])
return job.next_run_time if job else None
except Exception:
return None

276
tests/test_hands.py Normal file
View File

@@ -0,0 +1,276 @@
"""Tests for Hands Infrastructure.
Tests HandRegistry, HandScheduler, and HandRunner.
"""
from __future__ import annotations
import tempfile
from pathlib import Path
import pytest
from hands import HandRegistry, HandRunner, HandScheduler
from hands.models import HandConfig, HandStatus, ScheduleConfig
@pytest.fixture
def temp_hands_dir():
"""Create a temporary hands directory with test Hands."""
with tempfile.TemporaryDirectory() as tmpdir:
hands_dir = Path(tmpdir)
# Create Oracle Hand
oracle_dir = hands_dir / "oracle"
oracle_dir.mkdir()
(oracle_dir / "HAND.toml").write_text('''
[hand]
name = "oracle"
description = "Bitcoin intelligence"
schedule = "0 7,19 * * *"
[tools]
required = ["mempool_fetch", "fee_estimate"]
[output]
dashboard = true
''')
(oracle_dir / "SYSTEM.md").write_text("# Oracle System Prompt\nYou are Oracle.")
# Create Sentinel Hand
sentinel_dir = hands_dir / "sentinel"
sentinel_dir.mkdir()
(sentinel_dir / "HAND.toml").write_text('''
[hand]
name = "sentinel"
description = "System health monitoring"
schedule = "*/15 * * * *"
enabled = true
''')
yield hands_dir
@pytest.fixture
def registry(temp_hands_dir):
"""Create HandRegistry with test Hands."""
db_path = temp_hands_dir / "test_hands.db"
reg = HandRegistry(hands_dir=temp_hands_dir, db_path=db_path)
return reg
@pytest.mark.asyncio
class TestHandRegistry:
"""HandRegistry tests."""
async def test_load_all_hands(self, registry, temp_hands_dir):
"""Should load all Hands from directory."""
hands = await registry.load_all()
assert len(hands) == 2
assert "oracle" in hands
assert "sentinel" in hands
async def test_get_hand(self, registry, temp_hands_dir):
"""Should get Hand by name."""
await registry.load_all()
hand = registry.get_hand("oracle")
assert hand.name == "oracle"
assert "Bitcoin" in hand.description
async def test_get_hand_not_found(self, registry):
"""Should raise for unknown Hand."""
from hands.registry import HandNotFoundError
with pytest.raises(HandNotFoundError):
registry.get_hand("nonexistent")
async def test_get_scheduled_hands(self, registry, temp_hands_dir):
"""Should return only Hands with schedules."""
await registry.load_all()
scheduled = registry.get_scheduled_hands()
assert len(scheduled) == 2
assert all(h.schedule is not None for h in scheduled)
async def test_state_management(self, registry, temp_hands_dir):
"""Should track Hand state."""
await registry.load_all()
state = registry.get_state("oracle")
assert state.name == "oracle"
assert state.status == HandStatus.IDLE
registry.update_state("oracle", status=HandStatus.RUNNING)
state = registry.get_state("oracle")
assert state.status == HandStatus.RUNNING
async def test_approval_queue(self, registry, temp_hands_dir):
"""Should manage approval queue."""
await registry.load_all()
# Create approval
request = await registry.create_approval(
hand_name="oracle",
action="post_tweet",
description="Post Bitcoin update",
context={"price": 50000},
)
assert request.id is not None
assert request.hand_name == "oracle"
# Get pending
pending = await registry.get_pending_approvals()
assert len(pending) == 1
# Resolve
result = await registry.resolve_approval(request.id, approved=True)
assert result is True
# Should be empty now
pending = await registry.get_pending_approvals()
assert len(pending) == 0
@pytest.mark.asyncio
class TestHandScheduler:
"""HandScheduler tests."""
async def test_scheduler_initialization(self, registry):
"""Should initialize scheduler."""
scheduler = HandScheduler(registry)
assert scheduler.registry == registry
assert not scheduler._running
async def test_schedule_hand(self, registry, temp_hands_dir):
"""Should schedule a Hand."""
await registry.load_all()
scheduler = HandScheduler(registry)
hand = registry.get_hand("oracle")
job_id = await scheduler.schedule_hand(hand)
# Note: Job ID may be None if APScheduler not available
# But should not raise an exception
async def test_get_scheduled_jobs(self, registry, temp_hands_dir):
"""Should list scheduled jobs."""
await registry.load_all()
scheduler = HandScheduler(registry)
jobs = scheduler.get_scheduled_jobs()
assert isinstance(jobs, list)
async def test_trigger_hand_now(self, registry, temp_hands_dir):
"""Should manually trigger a Hand."""
await registry.load_all()
scheduler = HandScheduler(registry)
# This will fail because Hand isn't fully implemented
# But should not raise
result = await scheduler.trigger_hand_now("oracle")
# Result may be True or False depending on implementation
@pytest.mark.asyncio
class TestHandRunner:
"""HandRunner tests."""
async def test_load_system_prompt(self, registry, temp_hands_dir):
"""Should load SYSTEM.md."""
await registry.load_all()
runner = HandRunner(registry)
hand = registry.get_hand("oracle")
prompt = runner._load_system_prompt(hand)
assert "Oracle" in prompt
async def test_load_skills(self, registry, temp_hands_dir):
"""Should load SKILL.md files."""
# Create a skill file
skills_dir = temp_hands_dir / "oracle" / "skills"
skills_dir.mkdir()
(skills_dir / "bitcoin.md").write_text("# Bitcoin Expertise")
await registry.load_all()
runner = HandRunner(registry)
hand = registry.get_hand("oracle")
skills = runner._load_skills(hand)
assert len(skills) == 1
assert "Bitcoin" in skills[0]
async def test_build_prompt(self, registry, temp_hands_dir):
"""Should build execution prompt."""
await registry.load_all()
runner = HandRunner(registry)
hand = registry.get_hand("oracle")
system = "System prompt"
skills = ["Skill 1", "Skill 2"]
context = {"key": "value"}
prompt = runner._build_prompt(hand, system, skills, context)
assert "System Instructions" in prompt
assert "System prompt" in prompt
assert "Skill 1" in prompt
assert "key" in prompt
class TestHandConfig:
"""HandConfig model tests."""
def test_hand_config_creation(self):
"""Should create HandConfig."""
config = HandConfig(
name="test",
description="Test hand",
schedule=ScheduleConfig(cron="0 * * * *"),
)
assert config.name == "test"
assert config.schedule.cron == "0 * * * *"
def test_schedule_validation(self):
"""Should validate cron expression."""
# Valid cron
config = HandConfig(
name="test",
description="Test",
schedule=ScheduleConfig(cron="0 7 * * *"),
)
assert config.schedule.cron == "0 7 * * *"
class TestHandModels:
"""Hand model tests."""
def test_hand_status_enum(self):
"""HandStatus should have expected values."""
from hands.models import HandStatus
assert HandStatus.IDLE.value == "idle"
assert HandStatus.RUNNING.value == "running"
assert HandStatus.SCHEDULED.value == "scheduled"
def test_hand_state_to_dict(self):
"""HandState should serialize to dict."""
from hands.models import HandState
from datetime import datetime
state = HandState(
name="test",
status=HandStatus.RUNNING,
run_count=5,
)
data = state.to_dict()
assert data["name"] == "test"
assert data["status"] == "running"
assert data["run_count"] == 5