feat: Hands Infrastructure - Models, Registry, Scheduler (Phase 3.1-3.3)
Add core Hands infrastructure: - hands/models.py: Pydantic models for HAND.toml schema - HandConfig: Complete hand configuration - HandState: Runtime state tracking - HandExecution: Execution records - ApprovalRequest: Approval queue entries - hands/registry.py: HandRegistry for loading and indexing - Load Hands from hands/ directory - Parse HAND.toml manifests - SQLite indexing for fast lookup - Approval queue management - Execution history logging - hands/scheduler.py: APScheduler-based scheduling - Cron and interval triggers - Job management (schedule, pause, resume, unschedule) - Hand execution wrapper - Manual trigger support
This commit is contained in:
252
src/hands/models.py
Normal file
252
src/hands/models.py
Normal file
@@ -0,0 +1,252 @@
|
||||
"""Hands Models — Pydantic schemas for HAND.toml manifests.
|
||||
|
||||
Defines the data structures for autonomous Hand agents:
|
||||
- HandConfig: Complete hand configuration from HAND.toml
|
||||
- HandState: Runtime state tracking
|
||||
- HandExecution: Execution record for audit trail
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from pydantic import BaseModel, Field, validator
|
||||
|
||||
|
||||
class HandStatus(str, Enum):
|
||||
"""Runtime status of a Hand."""
|
||||
DISABLED = "disabled"
|
||||
IDLE = "idle"
|
||||
SCHEDULED = "scheduled"
|
||||
RUNNING = "running"
|
||||
PAUSED = "paused"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
class HandOutcome(str, Enum):
|
||||
"""Outcome of a Hand execution."""
|
||||
SUCCESS = "success"
|
||||
FAILURE = "failure"
|
||||
APPROVAL_PENDING = "approval_pending"
|
||||
TIMEOUT = "timeout"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
class TriggerType(str, Enum):
|
||||
"""Types of execution triggers."""
|
||||
SCHEDULE = "schedule" # Cron schedule
|
||||
MANUAL = "manual" # User triggered
|
||||
EVENT = "event" # Event-driven
|
||||
WEBHOOK = "webhook" # External webhook
|
||||
|
||||
|
||||
# ── HAND.toml Schema Models ───────────────────────────────────────────────
|
||||
|
||||
class ToolRequirement(BaseModel):
|
||||
"""A required tool for the Hand."""
|
||||
name: str
|
||||
version: Optional[str] = None
|
||||
optional: bool = False
|
||||
|
||||
|
||||
class OutputConfig(BaseModel):
|
||||
"""Output configuration for Hand results."""
|
||||
dashboard: bool = True
|
||||
channel: Optional[str] = None # e.g., "telegram", "discord"
|
||||
format: str = "markdown" # markdown, json, html
|
||||
file_drop: Optional[str] = None # Path to write output files
|
||||
|
||||
|
||||
class ApprovalGate(BaseModel):
|
||||
"""An approval gate for sensitive operations."""
|
||||
action: str # e.g., "post_tweet", "send_payment"
|
||||
description: str
|
||||
auto_approve_after: Optional[int] = None # Seconds to auto-approve
|
||||
|
||||
|
||||
class ScheduleConfig(BaseModel):
|
||||
"""Schedule configuration for the Hand."""
|
||||
cron: Optional[str] = None # Cron expression
|
||||
interval: Optional[int] = None # Seconds between runs
|
||||
at: Optional[str] = None # Specific time (HH:MM)
|
||||
timezone: str = "UTC"
|
||||
|
||||
@validator('cron')
|
||||
def validate_cron(cls, v: Optional[str]) -> Optional[str]:
|
||||
if v is None:
|
||||
return v
|
||||
# Basic cron validation (5 fields)
|
||||
parts = v.split()
|
||||
if len(parts) != 5:
|
||||
raise ValueError("Cron expression must have 5 fields: minute hour day month weekday")
|
||||
return v
|
||||
|
||||
|
||||
class HandConfig(BaseModel):
|
||||
"""Complete Hand configuration from HAND.toml.
|
||||
|
||||
Example HAND.toml:
|
||||
[hand]
|
||||
name = "oracle"
|
||||
schedule = "0 7,19 * * *"
|
||||
description = "Bitcoin and on-chain intelligence briefing"
|
||||
|
||||
[tools]
|
||||
required = ["mempool_fetch", "fee_estimate"]
|
||||
|
||||
[approval_gates]
|
||||
post_tweet = { action = "post_tweet", description = "Post to Twitter" }
|
||||
|
||||
[output]
|
||||
dashboard = true
|
||||
channel = "telegram"
|
||||
"""
|
||||
|
||||
# Required fields
|
||||
name: str = Field(..., description="Unique hand identifier")
|
||||
description: str = Field(..., description="What this Hand does")
|
||||
|
||||
# Schedule (one of these must be set)
|
||||
schedule: Optional[ScheduleConfig] = None
|
||||
trigger: Optional[TriggerType] = TriggerType.SCHEDULE
|
||||
|
||||
# Optional fields
|
||||
enabled: bool = True
|
||||
version: str = "1.0.0"
|
||||
author: Optional[str] = None
|
||||
|
||||
# Tools
|
||||
tools_required: list[str] = Field(default_factory=list)
|
||||
tools_optional: list[str] = Field(default_factory=list)
|
||||
|
||||
# Approval gates
|
||||
approval_gates: list[ApprovalGate] = Field(default_factory=list)
|
||||
|
||||
# Output configuration
|
||||
output: OutputConfig = Field(default_factory=OutputConfig)
|
||||
|
||||
# File paths (set at runtime)
|
||||
hand_dir: Optional[Path] = Field(None, exclude=True)
|
||||
system_prompt_path: Optional[Path] = None
|
||||
skill_paths: list[Path] = Field(default_factory=list)
|
||||
|
||||
class Config:
|
||||
extra = "allow" # Allow additional fields for extensibility
|
||||
|
||||
@property
|
||||
def system_md_path(self) -> Optional[Path]:
|
||||
"""Path to SYSTEM.md file."""
|
||||
if self.hand_dir:
|
||||
return self.hand_dir / "SYSTEM.md"
|
||||
return None
|
||||
|
||||
@property
|
||||
def skill_md_paths(self) -> list[Path]:
|
||||
"""Paths to SKILL.md files."""
|
||||
if self.hand_dir:
|
||||
skill_dir = self.hand_dir / "skills"
|
||||
if skill_dir.exists():
|
||||
return list(skill_dir.glob("*.md"))
|
||||
return []
|
||||
|
||||
|
||||
# ── Runtime State Models ─────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class HandState:
|
||||
"""Runtime state of a Hand."""
|
||||
name: str
|
||||
status: HandStatus = HandStatus.IDLE
|
||||
last_run: Optional[datetime] = None
|
||||
next_run: Optional[datetime] = None
|
||||
run_count: int = 0
|
||||
success_count: int = 0
|
||||
failure_count: int = 0
|
||||
error_message: Optional[str] = None
|
||||
is_paused: bool = False
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"name": self.name,
|
||||
"status": self.status.value,
|
||||
"last_run": self.last_run.isoformat() if self.last_run else None,
|
||||
"next_run": self.next_run.isoformat() if self.next_run else None,
|
||||
"run_count": self.run_count,
|
||||
"success_count": self.success_count,
|
||||
"failure_count": self.failure_count,
|
||||
"error_message": self.error_message,
|
||||
"is_paused": self.is_paused,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class HandExecution:
|
||||
"""Record of a Hand execution."""
|
||||
id: str
|
||||
hand_name: str
|
||||
trigger: TriggerType
|
||||
started_at: datetime
|
||||
completed_at: Optional[datetime] = None
|
||||
outcome: HandOutcome = HandOutcome.SKIPPED
|
||||
output: str = ""
|
||||
error: Optional[str] = None
|
||||
approval_id: Optional[str] = None
|
||||
files_generated: list[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"id": self.id,
|
||||
"hand_name": self.hand_name,
|
||||
"trigger": self.trigger.value,
|
||||
"started_at": self.started_at.isoformat(),
|
||||
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
|
||||
"outcome": self.outcome.value,
|
||||
"output": self.output,
|
||||
"error": self.error,
|
||||
"approval_id": self.approval_id,
|
||||
"files_generated": self.files_generated,
|
||||
}
|
||||
|
||||
|
||||
# ── Approval Queue Models ────────────────────────────────────────────────
|
||||
|
||||
class ApprovalStatus(str, Enum):
|
||||
"""Status of an approval request."""
|
||||
PENDING = "pending"
|
||||
APPROVED = "approved"
|
||||
REJECTED = "rejected"
|
||||
EXPIRED = "expired"
|
||||
AUTO_APPROVED = "auto_approved"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApprovalRequest:
|
||||
"""A request for user approval."""
|
||||
id: str
|
||||
hand_name: str
|
||||
action: str
|
||||
description: str
|
||||
context: dict[str, Any] = field(default_factory=dict)
|
||||
status: ApprovalStatus = ApprovalStatus.PENDING
|
||||
created_at: datetime = field(default_factory=datetime.utcnow)
|
||||
expires_at: Optional[datetime] = None
|
||||
resolved_at: Optional[datetime] = None
|
||||
resolved_by: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"id": self.id,
|
||||
"hand_name": self.hand_name,
|
||||
"action": self.action,
|
||||
"description": self.description,
|
||||
"context": self.context,
|
||||
"status": self.status.value,
|
||||
"created_at": self.created_at.isoformat(),
|
||||
"expires_at": self.expires_at.isoformat() if self.expires_at else None,
|
||||
"resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
|
||||
"resolved_by": self.resolved_by,
|
||||
}
|
||||
526
src/hands/registry.py
Normal file
526
src/hands/registry.py
Normal file
@@ -0,0 +1,526 @@
|
||||
"""Hand Registry — Load, validate, and index Hands from the hands directory.
|
||||
|
||||
The HandRegistry discovers all Hand packages in the hands/ directory,
|
||||
loads their HAND.toml manifests, and maintains an index for fast lookup.
|
||||
|
||||
Usage:
|
||||
from hands.registry import HandRegistry
|
||||
|
||||
registry = HandRegistry(hands_dir="hands/")
|
||||
await registry.load_all()
|
||||
|
||||
oracle = registry.get_hand("oracle")
|
||||
all_hands = registry.list_hands()
|
||||
scheduled = registry.get_scheduled_hands()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
import tomllib
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from hands.models import ApprovalGate, ApprovalRequest, ApprovalStatus, HandConfig, HandState, HandStatus, OutputConfig, ScheduleConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HandRegistryError(Exception):
|
||||
"""Base exception for HandRegistry errors."""
|
||||
pass
|
||||
|
||||
|
||||
class HandNotFoundError(HandRegistryError):
|
||||
"""Raised when a Hand is not found."""
|
||||
pass
|
||||
|
||||
|
||||
class HandValidationError(HandRegistryError):
|
||||
"""Raised when a Hand fails validation."""
|
||||
pass
|
||||
|
||||
|
||||
class HandRegistry:
|
||||
"""Registry for autonomous Hands.
|
||||
|
||||
Discovers Hands from the filesystem, loads their configurations,
|
||||
and maintains a SQLite index for fast lookups.
|
||||
|
||||
Attributes:
|
||||
hands_dir: Directory containing Hand packages
|
||||
db_path: SQLite database for indexing
|
||||
_hands: In-memory cache of loaded HandConfigs
|
||||
_states: Runtime state of each Hand
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hands_dir: str | Path = "hands/",
|
||||
db_path: str | Path = "data/hands.db",
|
||||
) -> None:
|
||||
"""Initialize HandRegistry.
|
||||
|
||||
Args:
|
||||
hands_dir: Directory containing Hand subdirectories
|
||||
db_path: SQLite database path for indexing
|
||||
"""
|
||||
self.hands_dir = Path(hands_dir)
|
||||
self.db_path = Path(db_path)
|
||||
self._hands: dict[str, HandConfig] = {}
|
||||
self._states: dict[str, HandState] = {}
|
||||
self._ensure_schema()
|
||||
logger.info("HandRegistry initialized (hands_dir=%s)", self.hands_dir)
|
||||
|
||||
def _get_conn(self) -> sqlite3.Connection:
|
||||
"""Get database connection."""
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn
|
||||
|
||||
def _ensure_schema(self) -> None:
|
||||
"""Create database tables if they don't exist."""
|
||||
with self._get_conn() as conn:
|
||||
# Hands index
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS hands (
|
||||
name TEXT PRIMARY KEY,
|
||||
config_json TEXT NOT NULL,
|
||||
enabled INTEGER DEFAULT 1,
|
||||
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
# Hand execution history
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS hand_executions (
|
||||
id TEXT PRIMARY KEY,
|
||||
hand_name TEXT NOT NULL,
|
||||
trigger TEXT NOT NULL,
|
||||
started_at TIMESTAMP NOT NULL,
|
||||
completed_at TIMESTAMP,
|
||||
outcome TEXT NOT NULL,
|
||||
output TEXT,
|
||||
error TEXT,
|
||||
approval_id TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
# Approval queue
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS approval_queue (
|
||||
id TEXT PRIMARY KEY,
|
||||
hand_name TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
context_json TEXT,
|
||||
status TEXT DEFAULT 'pending',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at TIMESTAMP,
|
||||
resolved_at TIMESTAMP,
|
||||
resolved_by TEXT
|
||||
)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
|
||||
async def load_all(self) -> dict[str, HandConfig]:
|
||||
"""Load all Hands from the hands directory.
|
||||
|
||||
Returns:
|
||||
Dict mapping hand names to HandConfigs
|
||||
"""
|
||||
if not self.hands_dir.exists():
|
||||
logger.warning("Hands directory does not exist: %s", self.hands_dir)
|
||||
return {}
|
||||
|
||||
loaded = {}
|
||||
|
||||
for hand_dir in self.hands_dir.iterdir():
|
||||
if not hand_dir.is_dir():
|
||||
continue
|
||||
|
||||
try:
|
||||
hand = self._load_hand_from_dir(hand_dir)
|
||||
if hand:
|
||||
loaded[hand.name] = hand
|
||||
self._hands[hand.name] = hand
|
||||
|
||||
# Initialize state if not exists
|
||||
if hand.name not in self._states:
|
||||
self._states[hand.name] = HandState(name=hand.name)
|
||||
|
||||
# Store in database
|
||||
self._store_hand(conn=None, hand=hand)
|
||||
|
||||
logger.info("Loaded Hand: %s (%s)", hand.name, hand.description[:50])
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to load Hand from %s: %s", hand_dir, e)
|
||||
|
||||
logger.info("Loaded %d Hands", len(loaded))
|
||||
return loaded
|
||||
|
||||
def _load_hand_from_dir(self, hand_dir: Path) -> Optional[HandConfig]:
|
||||
"""Load a single Hand from its directory.
|
||||
|
||||
Args:
|
||||
hand_dir: Directory containing HAND.toml
|
||||
|
||||
Returns:
|
||||
HandConfig or None if invalid
|
||||
"""
|
||||
manifest_path = hand_dir / "HAND.toml"
|
||||
|
||||
if not manifest_path.exists():
|
||||
logger.debug("No HAND.toml in %s", hand_dir)
|
||||
return None
|
||||
|
||||
# Parse TOML
|
||||
try:
|
||||
with open(manifest_path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
except Exception as e:
|
||||
raise HandValidationError(f"Invalid HAND.toml: {e}")
|
||||
|
||||
# Extract hand section
|
||||
hand_data = data.get("hand", {})
|
||||
if not hand_data:
|
||||
raise HandValidationError("Missing [hand] section in HAND.toml")
|
||||
|
||||
# Build HandConfig
|
||||
config = HandConfig(
|
||||
name=hand_data.get("name", hand_dir.name),
|
||||
description=hand_data.get("description", ""),
|
||||
enabled=hand_data.get("enabled", True),
|
||||
version=hand_data.get("version", "1.0.0"),
|
||||
author=hand_data.get("author"),
|
||||
hand_dir=hand_dir,
|
||||
)
|
||||
|
||||
# Parse schedule
|
||||
if "schedule" in hand_data:
|
||||
schedule_data = hand_data["schedule"]
|
||||
if isinstance(schedule_data, str):
|
||||
# Simple cron string
|
||||
config.schedule = ScheduleConfig(cron=schedule_data)
|
||||
elif isinstance(schedule_data, dict):
|
||||
config.schedule = ScheduleConfig(**schedule_data)
|
||||
|
||||
# Parse tools
|
||||
tools_data = data.get("tools", {})
|
||||
config.tools_required = tools_data.get("required", [])
|
||||
config.tools_optional = tools_data.get("optional", [])
|
||||
|
||||
# Parse approval gates
|
||||
gates_data = data.get("approval_gates", {})
|
||||
for action, gate_data in gates_data.items():
|
||||
if isinstance(gate_data, dict):
|
||||
config.approval_gates.append(ApprovalGate(
|
||||
action=gate_data.get("action", action),
|
||||
description=gate_data.get("description", ""),
|
||||
auto_approve_after=gate_data.get("auto_approve_after"),
|
||||
))
|
||||
|
||||
# Parse output config
|
||||
output_data = data.get("output", {})
|
||||
config.output = OutputConfig(**output_data)
|
||||
|
||||
return config
|
||||
|
||||
def _store_hand(self, conn: Optional[sqlite3.Connection], hand: HandConfig) -> None:
|
||||
"""Store hand config in database."""
|
||||
import json
|
||||
|
||||
if conn is None:
|
||||
with self._get_conn() as conn:
|
||||
self._store_hand(conn, hand)
|
||||
return
|
||||
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT OR REPLACE INTO hands (name, config_json, enabled)
|
||||
VALUES (?, ?, ?)
|
||||
""",
|
||||
(hand.name, hand.json(), 1 if hand.enabled else 0),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
def get_hand(self, name: str) -> HandConfig:
|
||||
"""Get a Hand by name.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
HandConfig
|
||||
|
||||
Raises:
|
||||
HandNotFoundError: If Hand doesn't exist
|
||||
"""
|
||||
if name not in self._hands:
|
||||
raise HandNotFoundError(f"Hand not found: {name}")
|
||||
return self._hands[name]
|
||||
|
||||
def list_hands(self) -> list[HandConfig]:
|
||||
"""List all loaded Hands.
|
||||
|
||||
Returns:
|
||||
List of HandConfigs
|
||||
"""
|
||||
return list(self._hands.values())
|
||||
|
||||
def get_scheduled_hands(self) -> list[HandConfig]:
|
||||
"""Get all Hands with schedule configuration.
|
||||
|
||||
Returns:
|
||||
List of HandConfigs with schedules
|
||||
"""
|
||||
return [h for h in self._hands.values() if h.schedule is not None and h.enabled]
|
||||
|
||||
def get_enabled_hands(self) -> list[HandConfig]:
|
||||
"""Get all enabled Hands.
|
||||
|
||||
Returns:
|
||||
List of enabled HandConfigs
|
||||
"""
|
||||
return [h for h in self._hands.values() if h.enabled]
|
||||
|
||||
def get_state(self, name: str) -> HandState:
|
||||
"""Get runtime state of a Hand.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
HandState
|
||||
"""
|
||||
if name not in self._states:
|
||||
self._states[name] = HandState(name=name)
|
||||
return self._states[name]
|
||||
|
||||
def update_state(self, name: str, **kwargs) -> None:
|
||||
"""Update Hand state.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
**kwargs: State fields to update
|
||||
"""
|
||||
state = self.get_state(name)
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(state, key):
|
||||
setattr(state, key, value)
|
||||
|
||||
async def log_execution(
|
||||
self,
|
||||
hand_name: str,
|
||||
trigger: str,
|
||||
outcome: str,
|
||||
output: str = "",
|
||||
error: Optional[str] = None,
|
||||
approval_id: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Log a Hand execution.
|
||||
|
||||
Args:
|
||||
hand_name: Name of the Hand
|
||||
trigger: Trigger type
|
||||
outcome: Execution outcome
|
||||
output: Execution output
|
||||
error: Error message if failed
|
||||
approval_id: Associated approval ID
|
||||
|
||||
Returns:
|
||||
Execution ID
|
||||
"""
|
||||
execution_id = str(uuid.uuid4())
|
||||
|
||||
with self._get_conn() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO hand_executions
|
||||
(id, hand_name, trigger, started_at, completed_at, outcome, output, error, approval_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
execution_id,
|
||||
hand_name,
|
||||
trigger,
|
||||
datetime.now(timezone.utc).isoformat(),
|
||||
datetime.now(timezone.utc).isoformat(),
|
||||
outcome,
|
||||
output,
|
||||
error,
|
||||
approval_id,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return execution_id
|
||||
|
||||
async def create_approval(
|
||||
self,
|
||||
hand_name: str,
|
||||
action: str,
|
||||
description: str,
|
||||
context: dict,
|
||||
expires_after: Optional[int] = None,
|
||||
) -> ApprovalRequest:
|
||||
"""Create an approval request.
|
||||
|
||||
Args:
|
||||
hand_name: Hand requesting approval
|
||||
action: Action to approve
|
||||
description: Human-readable description
|
||||
context: Additional context
|
||||
expires_after: Seconds until expiration
|
||||
|
||||
Returns:
|
||||
ApprovalRequest
|
||||
"""
|
||||
approval_id = str(uuid.uuid4())
|
||||
|
||||
created_at = datetime.now(timezone.utc)
|
||||
expires_at = None
|
||||
if expires_after:
|
||||
from datetime import timedelta
|
||||
expires_at = created_at + timedelta(seconds=expires_after)
|
||||
|
||||
request = ApprovalRequest(
|
||||
id=approval_id,
|
||||
hand_name=hand_name,
|
||||
action=action,
|
||||
description=description,
|
||||
context=context,
|
||||
created_at=created_at,
|
||||
expires_at=expires_at,
|
||||
)
|
||||
|
||||
# Store in database
|
||||
import json
|
||||
with self._get_conn() as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO approval_queue
|
||||
(id, hand_name, action, description, context_json, status, created_at, expires_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
request.id,
|
||||
request.hand_name,
|
||||
request.action,
|
||||
request.description,
|
||||
json.dumps(request.context),
|
||||
request.status.value,
|
||||
request.created_at.isoformat(),
|
||||
request.expires_at.isoformat() if request.expires_at else None,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return request
|
||||
|
||||
async def get_pending_approvals(self) -> list[ApprovalRequest]:
|
||||
"""Get all pending approval requests.
|
||||
|
||||
Returns:
|
||||
List of pending ApprovalRequests
|
||||
"""
|
||||
import json
|
||||
|
||||
with self._get_conn() as conn:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT * FROM approval_queue
|
||||
WHERE status = 'pending'
|
||||
ORDER BY created_at DESC
|
||||
"""
|
||||
).fetchall()
|
||||
|
||||
requests = []
|
||||
for row in rows:
|
||||
requests.append(ApprovalRequest(
|
||||
id=row["id"],
|
||||
hand_name=row["hand_name"],
|
||||
action=row["action"],
|
||||
description=row["description"],
|
||||
context=json.loads(row["context_json"] or "{}"),
|
||||
status=ApprovalStatus(row["status"]),
|
||||
created_at=datetime.fromisoformat(row["created_at"]),
|
||||
expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None,
|
||||
))
|
||||
|
||||
return requests
|
||||
|
||||
async def resolve_approval(
|
||||
self,
|
||||
approval_id: str,
|
||||
approved: bool,
|
||||
resolved_by: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""Resolve an approval request.
|
||||
|
||||
Args:
|
||||
approval_id: ID of the approval request
|
||||
approved: True to approve, False to reject
|
||||
resolved_by: Who resolved the request
|
||||
|
||||
Returns:
|
||||
True if resolved successfully
|
||||
"""
|
||||
status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED
|
||||
resolved_at = datetime.now(timezone.utc)
|
||||
|
||||
with self._get_conn() as conn:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
UPDATE approval_queue
|
||||
SET status = ?, resolved_at = ?, resolved_by = ?
|
||||
WHERE id = ? AND status = 'pending'
|
||||
""",
|
||||
(status.value, resolved_at.isoformat(), resolved_by, approval_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return cursor.rowcount > 0
|
||||
|
||||
async def get_recent_executions(
|
||||
self,
|
||||
hand_name: Optional[str] = None,
|
||||
limit: int = 50,
|
||||
) -> list[dict]:
|
||||
"""Get recent Hand executions.
|
||||
|
||||
Args:
|
||||
hand_name: Filter by Hand name
|
||||
limit: Maximum results
|
||||
|
||||
Returns:
|
||||
List of execution records
|
||||
"""
|
||||
with self._get_conn() as conn:
|
||||
if hand_name:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT * FROM hand_executions
|
||||
WHERE hand_name = ?
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(hand_name, limit),
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT * FROM hand_executions
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(limit,),
|
||||
).fetchall()
|
||||
|
||||
return [dict(row) for row in rows]
|
||||
410
src/hands/scheduler.py
Normal file
410
src/hands/scheduler.py
Normal file
@@ -0,0 +1,410 @@
|
||||
"""Hand Scheduler — APScheduler-based cron scheduling for Hands.
|
||||
|
||||
Manages the scheduling of autonomous Hands using APScheduler.
|
||||
Supports cron expressions, intervals, and specific times.
|
||||
|
||||
Usage:
|
||||
from hands.scheduler import HandScheduler
|
||||
from hands.registry import HandRegistry
|
||||
|
||||
registry = HandRegistry()
|
||||
await registry.load_all()
|
||||
|
||||
scheduler = HandScheduler(registry)
|
||||
await scheduler.start()
|
||||
|
||||
# Hands are now scheduled and will run automatically
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from hands.models import HandConfig, HandState, HandStatus, TriggerType
|
||||
from hands.registry import HandRegistry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Try to import APScheduler
|
||||
try:
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
APSCHEDULER_AVAILABLE = True
|
||||
except ImportError:
|
||||
APSCHEDULER_AVAILABLE = False
|
||||
logger.warning("APScheduler not installed. Scheduling will be disabled.")
|
||||
|
||||
|
||||
class HandScheduler:
|
||||
"""Scheduler for autonomous Hands.
|
||||
|
||||
Uses APScheduler to manage cron-based execution of Hands.
|
||||
Each Hand with a schedule gets its own job in the scheduler.
|
||||
|
||||
Attributes:
|
||||
registry: HandRegistry for Hand configurations
|
||||
_scheduler: APScheduler instance
|
||||
_running: Whether scheduler is running
|
||||
_job_ids: Mapping of hand names to job IDs
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
registry: HandRegistry,
|
||||
job_defaults: Optional[dict] = None,
|
||||
) -> None:
|
||||
"""Initialize HandScheduler.
|
||||
|
||||
Args:
|
||||
registry: HandRegistry instance
|
||||
job_defaults: Default job configuration for APScheduler
|
||||
"""
|
||||
self.registry = registry
|
||||
self._scheduler: Optional[Any] = None
|
||||
self._running = False
|
||||
self._job_ids: dict[str, str] = {}
|
||||
|
||||
if APSCHEDULER_AVAILABLE:
|
||||
self._scheduler = AsyncIOScheduler(job_defaults=job_defaults or {
|
||||
'coalesce': True, # Coalesce missed jobs into one
|
||||
'max_instances': 1, # Only one instance per Hand
|
||||
})
|
||||
|
||||
logger.info("HandScheduler initialized")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the scheduler and schedule all enabled Hands."""
|
||||
if not APSCHEDULER_AVAILABLE:
|
||||
logger.error("Cannot start scheduler: APScheduler not installed")
|
||||
return
|
||||
|
||||
if self._running:
|
||||
logger.warning("Scheduler already running")
|
||||
return
|
||||
|
||||
# Schedule all enabled Hands
|
||||
hands = self.registry.get_scheduled_hands()
|
||||
for hand in hands:
|
||||
await self.schedule_hand(hand)
|
||||
|
||||
# Start the scheduler
|
||||
self._scheduler.start()
|
||||
self._running = True
|
||||
|
||||
logger.info("HandScheduler started with %d scheduled Hands", len(hands))
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the scheduler."""
|
||||
if not self._running or not self._scheduler:
|
||||
return
|
||||
|
||||
self._scheduler.shutdown(wait=True)
|
||||
self._running = False
|
||||
self._job_ids.clear()
|
||||
|
||||
logger.info("HandScheduler stopped")
|
||||
|
||||
async def schedule_hand(self, hand: HandConfig) -> Optional[str]:
|
||||
"""Schedule a Hand for execution.
|
||||
|
||||
Args:
|
||||
hand: HandConfig to schedule
|
||||
|
||||
Returns:
|
||||
Job ID if scheduled successfully
|
||||
"""
|
||||
if not APSCHEDULER_AVAILABLE or not self._scheduler:
|
||||
logger.warning("Cannot schedule %s: APScheduler not available", hand.name)
|
||||
return None
|
||||
|
||||
if not hand.schedule:
|
||||
logger.debug("Hand %s has no schedule", hand.name)
|
||||
return None
|
||||
|
||||
if not hand.enabled:
|
||||
logger.debug("Hand %s is disabled", hand.name)
|
||||
return None
|
||||
|
||||
# Remove existing job if any
|
||||
if hand.name in self._job_ids:
|
||||
self.unschedule_hand(hand.name)
|
||||
|
||||
# Create the trigger
|
||||
trigger = self._create_trigger(hand.schedule)
|
||||
if not trigger:
|
||||
logger.error("Failed to create trigger for Hand %s", hand.name)
|
||||
return None
|
||||
|
||||
# Add job to scheduler
|
||||
try:
|
||||
job = self._scheduler.add_job(
|
||||
func=self._execute_hand_wrapper,
|
||||
trigger=trigger,
|
||||
id=f"hand_{hand.name}",
|
||||
name=f"Hand: {hand.name}",
|
||||
args=[hand.name],
|
||||
replace_existing=True,
|
||||
)
|
||||
|
||||
self._job_ids[hand.name] = job.id
|
||||
|
||||
# Update state
|
||||
self.registry.update_state(
|
||||
hand.name,
|
||||
status=HandStatus.SCHEDULED,
|
||||
next_run=job.next_run_time,
|
||||
)
|
||||
|
||||
logger.info("Scheduled Hand %s (next run: %s)", hand.name, job.next_run_time)
|
||||
return job.id
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to schedule Hand %s: %s", hand.name, e)
|
||||
return None
|
||||
|
||||
def unschedule_hand(self, name: str) -> bool:
|
||||
"""Remove a Hand from the scheduler.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
True if unscheduled successfully
|
||||
"""
|
||||
if not self._scheduler:
|
||||
return False
|
||||
|
||||
if name not in self._job_ids:
|
||||
return False
|
||||
|
||||
try:
|
||||
self._scheduler.remove_job(self._job_ids[name])
|
||||
del self._job_ids[name]
|
||||
|
||||
self.registry.update_state(name, status=HandStatus.IDLE)
|
||||
|
||||
logger.info("Unscheduled Hand %s", name)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to unschedule Hand %s: %s", name, e)
|
||||
return False
|
||||
|
||||
def pause_hand(self, name: str) -> bool:
|
||||
"""Pause a scheduled Hand.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
True if paused successfully
|
||||
"""
|
||||
if not self._scheduler:
|
||||
return False
|
||||
|
||||
if name not in self._job_ids:
|
||||
return False
|
||||
|
||||
try:
|
||||
self._scheduler.pause_job(self._job_ids[name])
|
||||
self.registry.update_state(name, status=HandStatus.PAUSED, is_paused=True)
|
||||
logger.info("Paused Hand %s", name)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Failed to pause Hand %s: %s", name, e)
|
||||
return False
|
||||
|
||||
def resume_hand(self, name: str) -> bool:
|
||||
"""Resume a paused Hand.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
True if resumed successfully
|
||||
"""
|
||||
if not self._scheduler:
|
||||
return False
|
||||
|
||||
if name not in self._job_ids:
|
||||
return False
|
||||
|
||||
try:
|
||||
self._scheduler.resume_job(self._job_ids[name])
|
||||
self.registry.update_state(name, status=HandStatus.SCHEDULED, is_paused=False)
|
||||
logger.info("Resumed Hand %s", name)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Failed to resume Hand %s: %s", name, e)
|
||||
return False
|
||||
|
||||
def get_scheduled_jobs(self) -> list[dict]:
|
||||
"""Get all scheduled jobs.
|
||||
|
||||
Returns:
|
||||
List of job information dicts
|
||||
"""
|
||||
if not self._scheduler:
|
||||
return []
|
||||
|
||||
jobs = []
|
||||
for job in self._scheduler.get_jobs():
|
||||
if job.id.startswith("hand_"):
|
||||
hand_name = job.id[5:] # Remove "hand_" prefix
|
||||
jobs.append({
|
||||
"hand_name": hand_name,
|
||||
"job_id": job.id,
|
||||
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
|
||||
"trigger": str(job.trigger),
|
||||
})
|
||||
|
||||
return jobs
|
||||
|
||||
def _create_trigger(self, schedule: Any) -> Optional[Any]:
|
||||
"""Create an APScheduler trigger from ScheduleConfig.
|
||||
|
||||
Args:
|
||||
schedule: ScheduleConfig
|
||||
|
||||
Returns:
|
||||
APScheduler trigger
|
||||
"""
|
||||
if not APSCHEDULER_AVAILABLE:
|
||||
return None
|
||||
|
||||
# Cron trigger
|
||||
if schedule.cron:
|
||||
try:
|
||||
parts = schedule.cron.split()
|
||||
if len(parts) == 5:
|
||||
return CronTrigger(
|
||||
minute=parts[0],
|
||||
hour=parts[1],
|
||||
day=parts[2],
|
||||
month=parts[3],
|
||||
day_of_week=parts[4],
|
||||
timezone=schedule.timezone,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Invalid cron expression '%s': %s", schedule.cron, e)
|
||||
return None
|
||||
|
||||
# Interval trigger
|
||||
if schedule.interval:
|
||||
return IntervalTrigger(
|
||||
seconds=schedule.interval,
|
||||
timezone=schedule.timezone,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
async def _execute_hand_wrapper(self, hand_name: str) -> None:
|
||||
"""Wrapper for Hand execution.
|
||||
|
||||
This is called by APScheduler when a Hand's trigger fires.
|
||||
|
||||
Args:
|
||||
hand_name: Name of the Hand to execute
|
||||
"""
|
||||
logger.info("Triggering Hand: %s", hand_name)
|
||||
|
||||
try:
|
||||
# Update state
|
||||
self.registry.update_state(
|
||||
hand_name,
|
||||
status=HandStatus.RUNNING,
|
||||
last_run=datetime.now(timezone.utc),
|
||||
)
|
||||
|
||||
# Execute the Hand
|
||||
await self._run_hand(hand_name, TriggerType.SCHEDULE)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Hand %s execution failed", hand_name)
|
||||
self.registry.update_state(
|
||||
hand_name,
|
||||
status=HandStatus.ERROR,
|
||||
error_message=str(e),
|
||||
)
|
||||
|
||||
async def _run_hand(self, hand_name: str, trigger: TriggerType) -> None:
|
||||
"""Execute a Hand.
|
||||
|
||||
This is the core execution logic. In Phase 4+, this will
|
||||
call the actual Hand implementation.
|
||||
|
||||
Args:
|
||||
hand_name: Name of the Hand
|
||||
trigger: What triggered the execution
|
||||
"""
|
||||
from hands.models import HandOutcome
|
||||
|
||||
try:
|
||||
hand = self.registry.get_hand(hand_name)
|
||||
except Exception:
|
||||
logger.error("Hand %s not found", hand_name)
|
||||
return
|
||||
|
||||
logger.info("Executing Hand %s (trigger: %s)", hand_name, trigger.value)
|
||||
|
||||
# TODO: Phase 4+ - Call actual Hand implementation via HandRunner
|
||||
# For now, just log the execution
|
||||
|
||||
output = f"Hand {hand_name} executed (placeholder implementation)"
|
||||
|
||||
# Log execution
|
||||
await self.registry.log_execution(
|
||||
hand_name=hand_name,
|
||||
trigger=trigger.value,
|
||||
outcome=HandOutcome.SUCCESS.value,
|
||||
output=output,
|
||||
)
|
||||
|
||||
# Update state
|
||||
state = self.registry.get_state(hand_name)
|
||||
self.registry.update_state(
|
||||
hand_name,
|
||||
status=HandStatus.SCHEDULED,
|
||||
run_count=state.run_count + 1,
|
||||
success_count=state.success_count + 1,
|
||||
)
|
||||
|
||||
logger.info("Hand %s completed successfully", hand_name)
|
||||
|
||||
async def trigger_hand_now(self, name: str) -> bool:
|
||||
"""Manually trigger a Hand to run immediately.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
True if triggered successfully
|
||||
"""
|
||||
try:
|
||||
await self._run_hand(name, TriggerType.MANUAL)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Failed to trigger Hand %s: %s", name, e)
|
||||
return False
|
||||
|
||||
def get_next_run_time(self, name: str) -> Optional[datetime]:
|
||||
"""Get next scheduled run time for a Hand.
|
||||
|
||||
Args:
|
||||
name: Hand name
|
||||
|
||||
Returns:
|
||||
Next run time or None if not scheduled
|
||||
"""
|
||||
if not self._scheduler or name not in self._job_ids:
|
||||
return None
|
||||
|
||||
try:
|
||||
job = self._scheduler.get_job(self._job_ids[name])
|
||||
return job.next_run_time if job else None
|
||||
except Exception:
|
||||
return None
|
||||
Reference in New Issue
Block a user