From 8a952f68186e7db3cf35cfa8cbf3566cedd26c6f Mon Sep 17 00:00:00 2001 From: Alexander Payne Date: Thu, 26 Feb 2026 12:41:52 -0500 Subject: [PATCH] feat: Hands Infrastructure - Models, Registry, Scheduler (Phase 3.1-3.3) Add core Hands infrastructure: - hands/models.py: Pydantic models for HAND.toml schema - HandConfig: Complete hand configuration - HandState: Runtime state tracking - HandExecution: Execution records - ApprovalRequest: Approval queue entries - hands/registry.py: HandRegistry for loading and indexing - Load Hands from hands/ directory - Parse HAND.toml manifests - SQLite indexing for fast lookup - Approval queue management - Execution history logging - hands/scheduler.py: APScheduler-based scheduling - Cron and interval triggers - Job management (schedule, pause, resume, unschedule) - Hand execution wrapper - Manual trigger support --- src/hands/models.py | 252 ++++++++++++++++++++ src/hands/registry.py | 526 +++++++++++++++++++++++++++++++++++++++++ src/hands/scheduler.py | 410 ++++++++++++++++++++++++++++++++ 3 files changed, 1188 insertions(+) create mode 100644 src/hands/models.py create mode 100644 src/hands/registry.py create mode 100644 src/hands/scheduler.py diff --git a/src/hands/models.py b/src/hands/models.py new file mode 100644 index 0000000..d440a72 --- /dev/null +++ b/src/hands/models.py @@ -0,0 +1,252 @@ +"""Hands Models — Pydantic schemas for HAND.toml manifests. + +Defines the data structures for autonomous Hand agents: +- HandConfig: Complete hand configuration from HAND.toml +- HandState: Runtime state tracking +- HandExecution: Execution record for audit trail +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any, Optional + +from pydantic import BaseModel, Field, validator + + +class HandStatus(str, Enum): + """Runtime status of a Hand.""" + DISABLED = "disabled" + IDLE = "idle" + SCHEDULED = "scheduled" + RUNNING = "running" + PAUSED = "paused" + ERROR = "error" + + +class HandOutcome(str, Enum): + """Outcome of a Hand execution.""" + SUCCESS = "success" + FAILURE = "failure" + APPROVAL_PENDING = "approval_pending" + TIMEOUT = "timeout" + SKIPPED = "skipped" + + +class TriggerType(str, Enum): + """Types of execution triggers.""" + SCHEDULE = "schedule" # Cron schedule + MANUAL = "manual" # User triggered + EVENT = "event" # Event-driven + WEBHOOK = "webhook" # External webhook + + +# ── HAND.toml Schema Models ─────────────────────────────────────────────── + +class ToolRequirement(BaseModel): + """A required tool for the Hand.""" + name: str + version: Optional[str] = None + optional: bool = False + + +class OutputConfig(BaseModel): + """Output configuration for Hand results.""" + dashboard: bool = True + channel: Optional[str] = None # e.g., "telegram", "discord" + format: str = "markdown" # markdown, json, html + file_drop: Optional[str] = None # Path to write output files + + +class ApprovalGate(BaseModel): + """An approval gate for sensitive operations.""" + action: str # e.g., "post_tweet", "send_payment" + description: str + auto_approve_after: Optional[int] = None # Seconds to auto-approve + + +class ScheduleConfig(BaseModel): + """Schedule configuration for the Hand.""" + cron: Optional[str] = None # Cron expression + interval: Optional[int] = None # Seconds between runs + at: Optional[str] = None # Specific time (HH:MM) + timezone: str = "UTC" + + @validator('cron') + def validate_cron(cls, v: Optional[str]) -> Optional[str]: + if v is None: + return v + # Basic cron validation (5 fields) + parts = v.split() + if len(parts) != 5: + raise ValueError("Cron expression must have 5 fields: minute hour day month weekday") + return v + + +class HandConfig(BaseModel): + """Complete Hand configuration from HAND.toml. + + Example HAND.toml: + [hand] + name = "oracle" + schedule = "0 7,19 * * *" + description = "Bitcoin and on-chain intelligence briefing" + + [tools] + required = ["mempool_fetch", "fee_estimate"] + + [approval_gates] + post_tweet = { action = "post_tweet", description = "Post to Twitter" } + + [output] + dashboard = true + channel = "telegram" + """ + + # Required fields + name: str = Field(..., description="Unique hand identifier") + description: str = Field(..., description="What this Hand does") + + # Schedule (one of these must be set) + schedule: Optional[ScheduleConfig] = None + trigger: Optional[TriggerType] = TriggerType.SCHEDULE + + # Optional fields + enabled: bool = True + version: str = "1.0.0" + author: Optional[str] = None + + # Tools + tools_required: list[str] = Field(default_factory=list) + tools_optional: list[str] = Field(default_factory=list) + + # Approval gates + approval_gates: list[ApprovalGate] = Field(default_factory=list) + + # Output configuration + output: OutputConfig = Field(default_factory=OutputConfig) + + # File paths (set at runtime) + hand_dir: Optional[Path] = Field(None, exclude=True) + system_prompt_path: Optional[Path] = None + skill_paths: list[Path] = Field(default_factory=list) + + class Config: + extra = "allow" # Allow additional fields for extensibility + + @property + def system_md_path(self) -> Optional[Path]: + """Path to SYSTEM.md file.""" + if self.hand_dir: + return self.hand_dir / "SYSTEM.md" + return None + + @property + def skill_md_paths(self) -> list[Path]: + """Paths to SKILL.md files.""" + if self.hand_dir: + skill_dir = self.hand_dir / "skills" + if skill_dir.exists(): + return list(skill_dir.glob("*.md")) + return [] + + +# ── Runtime State Models ───────────────────────────────────────────────── + +@dataclass +class HandState: + """Runtime state of a Hand.""" + name: str + status: HandStatus = HandStatus.IDLE + last_run: Optional[datetime] = None + next_run: Optional[datetime] = None + run_count: int = 0 + success_count: int = 0 + failure_count: int = 0 + error_message: Optional[str] = None + is_paused: bool = False + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "status": self.status.value, + "last_run": self.last_run.isoformat() if self.last_run else None, + "next_run": self.next_run.isoformat() if self.next_run else None, + "run_count": self.run_count, + "success_count": self.success_count, + "failure_count": self.failure_count, + "error_message": self.error_message, + "is_paused": self.is_paused, + } + + +@dataclass +class HandExecution: + """Record of a Hand execution.""" + id: str + hand_name: str + trigger: TriggerType + started_at: datetime + completed_at: Optional[datetime] = None + outcome: HandOutcome = HandOutcome.SKIPPED + output: str = "" + error: Optional[str] = None + approval_id: Optional[str] = None + files_generated: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "hand_name": self.hand_name, + "trigger": self.trigger.value, + "started_at": self.started_at.isoformat(), + "completed_at": self.completed_at.isoformat() if self.completed_at else None, + "outcome": self.outcome.value, + "output": self.output, + "error": self.error, + "approval_id": self.approval_id, + "files_generated": self.files_generated, + } + + +# ── Approval Queue Models ──────────────────────────────────────────────── + +class ApprovalStatus(str, Enum): + """Status of an approval request.""" + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + EXPIRED = "expired" + AUTO_APPROVED = "auto_approved" + + +@dataclass +class ApprovalRequest: + """A request for user approval.""" + id: str + hand_name: str + action: str + description: str + context: dict[str, Any] = field(default_factory=dict) + status: ApprovalStatus = ApprovalStatus.PENDING + created_at: datetime = field(default_factory=datetime.utcnow) + expires_at: Optional[datetime] = None + resolved_at: Optional[datetime] = None + resolved_by: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + return { + "id": self.id, + "hand_name": self.hand_name, + "action": self.action, + "description": self.description, + "context": self.context, + "status": self.status.value, + "created_at": self.created_at.isoformat(), + "expires_at": self.expires_at.isoformat() if self.expires_at else None, + "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None, + "resolved_by": self.resolved_by, + } diff --git a/src/hands/registry.py b/src/hands/registry.py new file mode 100644 index 0000000..72a3e5a --- /dev/null +++ b/src/hands/registry.py @@ -0,0 +1,526 @@ +"""Hand Registry — Load, validate, and index Hands from the hands directory. + +The HandRegistry discovers all Hand packages in the hands/ directory, +loads their HAND.toml manifests, and maintains an index for fast lookup. + +Usage: + from hands.registry import HandRegistry + + registry = HandRegistry(hands_dir="hands/") + await registry.load_all() + + oracle = registry.get_hand("oracle") + all_hands = registry.list_hands() + scheduled = registry.get_scheduled_hands() +""" + +from __future__ import annotations + +import logging +import sqlite3 +import tomllib +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +from hands.models import ApprovalGate, ApprovalRequest, ApprovalStatus, HandConfig, HandState, HandStatus, OutputConfig, ScheduleConfig + +logger = logging.getLogger(__name__) + + +class HandRegistryError(Exception): + """Base exception for HandRegistry errors.""" + pass + + +class HandNotFoundError(HandRegistryError): + """Raised when a Hand is not found.""" + pass + + +class HandValidationError(HandRegistryError): + """Raised when a Hand fails validation.""" + pass + + +class HandRegistry: + """Registry for autonomous Hands. + + Discovers Hands from the filesystem, loads their configurations, + and maintains a SQLite index for fast lookups. + + Attributes: + hands_dir: Directory containing Hand packages + db_path: SQLite database for indexing + _hands: In-memory cache of loaded HandConfigs + _states: Runtime state of each Hand + """ + + def __init__( + self, + hands_dir: str | Path = "hands/", + db_path: str | Path = "data/hands.db", + ) -> None: + """Initialize HandRegistry. + + Args: + hands_dir: Directory containing Hand subdirectories + db_path: SQLite database path for indexing + """ + self.hands_dir = Path(hands_dir) + self.db_path = Path(db_path) + self._hands: dict[str, HandConfig] = {} + self._states: dict[str, HandState] = {} + self._ensure_schema() + logger.info("HandRegistry initialized (hands_dir=%s)", self.hands_dir) + + def _get_conn(self) -> sqlite3.Connection: + """Get database connection.""" + self.db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(self.db_path)) + conn.row_factory = sqlite3.Row + return conn + + def _ensure_schema(self) -> None: + """Create database tables if they don't exist.""" + with self._get_conn() as conn: + # Hands index + conn.execute(""" + CREATE TABLE IF NOT EXISTS hands ( + name TEXT PRIMARY KEY, + config_json TEXT NOT NULL, + enabled INTEGER DEFAULT 1, + loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Hand execution history + conn.execute(""" + CREATE TABLE IF NOT EXISTS hand_executions ( + id TEXT PRIMARY KEY, + hand_name TEXT NOT NULL, + trigger TEXT NOT NULL, + started_at TIMESTAMP NOT NULL, + completed_at TIMESTAMP, + outcome TEXT NOT NULL, + output TEXT, + error TEXT, + approval_id TEXT + ) + """) + + # Approval queue + conn.execute(""" + CREATE TABLE IF NOT EXISTS approval_queue ( + id TEXT PRIMARY KEY, + hand_name TEXT NOT NULL, + action TEXT NOT NULL, + description TEXT NOT NULL, + context_json TEXT, + status TEXT DEFAULT 'pending', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + expires_at TIMESTAMP, + resolved_at TIMESTAMP, + resolved_by TEXT + ) + """) + + conn.commit() + + async def load_all(self) -> dict[str, HandConfig]: + """Load all Hands from the hands directory. + + Returns: + Dict mapping hand names to HandConfigs + """ + if not self.hands_dir.exists(): + logger.warning("Hands directory does not exist: %s", self.hands_dir) + return {} + + loaded = {} + + for hand_dir in self.hands_dir.iterdir(): + if not hand_dir.is_dir(): + continue + + try: + hand = self._load_hand_from_dir(hand_dir) + if hand: + loaded[hand.name] = hand + self._hands[hand.name] = hand + + # Initialize state if not exists + if hand.name not in self._states: + self._states[hand.name] = HandState(name=hand.name) + + # Store in database + self._store_hand(conn=None, hand=hand) + + logger.info("Loaded Hand: %s (%s)", hand.name, hand.description[:50]) + + except Exception as e: + logger.error("Failed to load Hand from %s: %s", hand_dir, e) + + logger.info("Loaded %d Hands", len(loaded)) + return loaded + + def _load_hand_from_dir(self, hand_dir: Path) -> Optional[HandConfig]: + """Load a single Hand from its directory. + + Args: + hand_dir: Directory containing HAND.toml + + Returns: + HandConfig or None if invalid + """ + manifest_path = hand_dir / "HAND.toml" + + if not manifest_path.exists(): + logger.debug("No HAND.toml in %s", hand_dir) + return None + + # Parse TOML + try: + with open(manifest_path, "rb") as f: + data = tomllib.load(f) + except Exception as e: + raise HandValidationError(f"Invalid HAND.toml: {e}") + + # Extract hand section + hand_data = data.get("hand", {}) + if not hand_data: + raise HandValidationError("Missing [hand] section in HAND.toml") + + # Build HandConfig + config = HandConfig( + name=hand_data.get("name", hand_dir.name), + description=hand_data.get("description", ""), + enabled=hand_data.get("enabled", True), + version=hand_data.get("version", "1.0.0"), + author=hand_data.get("author"), + hand_dir=hand_dir, + ) + + # Parse schedule + if "schedule" in hand_data: + schedule_data = hand_data["schedule"] + if isinstance(schedule_data, str): + # Simple cron string + config.schedule = ScheduleConfig(cron=schedule_data) + elif isinstance(schedule_data, dict): + config.schedule = ScheduleConfig(**schedule_data) + + # Parse tools + tools_data = data.get("tools", {}) + config.tools_required = tools_data.get("required", []) + config.tools_optional = tools_data.get("optional", []) + + # Parse approval gates + gates_data = data.get("approval_gates", {}) + for action, gate_data in gates_data.items(): + if isinstance(gate_data, dict): + config.approval_gates.append(ApprovalGate( + action=gate_data.get("action", action), + description=gate_data.get("description", ""), + auto_approve_after=gate_data.get("auto_approve_after"), + )) + + # Parse output config + output_data = data.get("output", {}) + config.output = OutputConfig(**output_data) + + return config + + def _store_hand(self, conn: Optional[sqlite3.Connection], hand: HandConfig) -> None: + """Store hand config in database.""" + import json + + if conn is None: + with self._get_conn() as conn: + self._store_hand(conn, hand) + return + + conn.execute( + """ + INSERT OR REPLACE INTO hands (name, config_json, enabled) + VALUES (?, ?, ?) + """, + (hand.name, hand.json(), 1 if hand.enabled else 0), + ) + conn.commit() + + def get_hand(self, name: str) -> HandConfig: + """Get a Hand by name. + + Args: + name: Hand name + + Returns: + HandConfig + + Raises: + HandNotFoundError: If Hand doesn't exist + """ + if name not in self._hands: + raise HandNotFoundError(f"Hand not found: {name}") + return self._hands[name] + + def list_hands(self) -> list[HandConfig]: + """List all loaded Hands. + + Returns: + List of HandConfigs + """ + return list(self._hands.values()) + + def get_scheduled_hands(self) -> list[HandConfig]: + """Get all Hands with schedule configuration. + + Returns: + List of HandConfigs with schedules + """ + return [h for h in self._hands.values() if h.schedule is not None and h.enabled] + + def get_enabled_hands(self) -> list[HandConfig]: + """Get all enabled Hands. + + Returns: + List of enabled HandConfigs + """ + return [h for h in self._hands.values() if h.enabled] + + def get_state(self, name: str) -> HandState: + """Get runtime state of a Hand. + + Args: + name: Hand name + + Returns: + HandState + """ + if name not in self._states: + self._states[name] = HandState(name=name) + return self._states[name] + + def update_state(self, name: str, **kwargs) -> None: + """Update Hand state. + + Args: + name: Hand name + **kwargs: State fields to update + """ + state = self.get_state(name) + for key, value in kwargs.items(): + if hasattr(state, key): + setattr(state, key, value) + + async def log_execution( + self, + hand_name: str, + trigger: str, + outcome: str, + output: str = "", + error: Optional[str] = None, + approval_id: Optional[str] = None, + ) -> str: + """Log a Hand execution. + + Args: + hand_name: Name of the Hand + trigger: Trigger type + outcome: Execution outcome + output: Execution output + error: Error message if failed + approval_id: Associated approval ID + + Returns: + Execution ID + """ + execution_id = str(uuid.uuid4()) + + with self._get_conn() as conn: + conn.execute( + """ + INSERT INTO hand_executions + (id, hand_name, trigger, started_at, completed_at, outcome, output, error, approval_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + execution_id, + hand_name, + trigger, + datetime.now(timezone.utc).isoformat(), + datetime.now(timezone.utc).isoformat(), + outcome, + output, + error, + approval_id, + ), + ) + conn.commit() + + return execution_id + + async def create_approval( + self, + hand_name: str, + action: str, + description: str, + context: dict, + expires_after: Optional[int] = None, + ) -> ApprovalRequest: + """Create an approval request. + + Args: + hand_name: Hand requesting approval + action: Action to approve + description: Human-readable description + context: Additional context + expires_after: Seconds until expiration + + Returns: + ApprovalRequest + """ + approval_id = str(uuid.uuid4()) + + created_at = datetime.now(timezone.utc) + expires_at = None + if expires_after: + from datetime import timedelta + expires_at = created_at + timedelta(seconds=expires_after) + + request = ApprovalRequest( + id=approval_id, + hand_name=hand_name, + action=action, + description=description, + context=context, + created_at=created_at, + expires_at=expires_at, + ) + + # Store in database + import json + with self._get_conn() as conn: + conn.execute( + """ + INSERT INTO approval_queue + (id, hand_name, action, description, context_json, status, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + request.id, + request.hand_name, + request.action, + request.description, + json.dumps(request.context), + request.status.value, + request.created_at.isoformat(), + request.expires_at.isoformat() if request.expires_at else None, + ), + ) + conn.commit() + + return request + + async def get_pending_approvals(self) -> list[ApprovalRequest]: + """Get all pending approval requests. + + Returns: + List of pending ApprovalRequests + """ + import json + + with self._get_conn() as conn: + rows = conn.execute( + """ + SELECT * FROM approval_queue + WHERE status = 'pending' + ORDER BY created_at DESC + """ + ).fetchall() + + requests = [] + for row in rows: + requests.append(ApprovalRequest( + id=row["id"], + hand_name=row["hand_name"], + action=row["action"], + description=row["description"], + context=json.loads(row["context_json"] or "{}"), + status=ApprovalStatus(row["status"]), + created_at=datetime.fromisoformat(row["created_at"]), + expires_at=datetime.fromisoformat(row["expires_at"]) if row["expires_at"] else None, + )) + + return requests + + async def resolve_approval( + self, + approval_id: str, + approved: bool, + resolved_by: Optional[str] = None, + ) -> bool: + """Resolve an approval request. + + Args: + approval_id: ID of the approval request + approved: True to approve, False to reject + resolved_by: Who resolved the request + + Returns: + True if resolved successfully + """ + status = ApprovalStatus.APPROVED if approved else ApprovalStatus.REJECTED + resolved_at = datetime.now(timezone.utc) + + with self._get_conn() as conn: + cursor = conn.execute( + """ + UPDATE approval_queue + SET status = ?, resolved_at = ?, resolved_by = ? + WHERE id = ? AND status = 'pending' + """, + (status.value, resolved_at.isoformat(), resolved_by, approval_id), + ) + conn.commit() + + return cursor.rowcount > 0 + + async def get_recent_executions( + self, + hand_name: Optional[str] = None, + limit: int = 50, + ) -> list[dict]: + """Get recent Hand executions. + + Args: + hand_name: Filter by Hand name + limit: Maximum results + + Returns: + List of execution records + """ + with self._get_conn() as conn: + if hand_name: + rows = conn.execute( + """ + SELECT * FROM hand_executions + WHERE hand_name = ? + ORDER BY started_at DESC + LIMIT ? + """, + (hand_name, limit), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT * FROM hand_executions + ORDER BY started_at DESC + LIMIT ? + """, + (limit,), + ).fetchall() + + return [dict(row) for row in rows] diff --git a/src/hands/scheduler.py b/src/hands/scheduler.py new file mode 100644 index 0000000..e934498 --- /dev/null +++ b/src/hands/scheduler.py @@ -0,0 +1,410 @@ +"""Hand Scheduler — APScheduler-based cron scheduling for Hands. + +Manages the scheduling of autonomous Hands using APScheduler. +Supports cron expressions, intervals, and specific times. + +Usage: + from hands.scheduler import HandScheduler + from hands.registry import HandRegistry + + registry = HandRegistry() + await registry.load_all() + + scheduler = HandScheduler(registry) + await scheduler.start() + + # Hands are now scheduled and will run automatically +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone +from typing import Any, Callable, Optional + +from hands.models import HandConfig, HandState, HandStatus, TriggerType +from hands.registry import HandRegistry + +logger = logging.getLogger(__name__) + +# Try to import APScheduler +try: + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.triggers.cron import CronTrigger + from apscheduler.triggers.interval import IntervalTrigger + APSCHEDULER_AVAILABLE = True +except ImportError: + APSCHEDULER_AVAILABLE = False + logger.warning("APScheduler not installed. Scheduling will be disabled.") + + +class HandScheduler: + """Scheduler for autonomous Hands. + + Uses APScheduler to manage cron-based execution of Hands. + Each Hand with a schedule gets its own job in the scheduler. + + Attributes: + registry: HandRegistry for Hand configurations + _scheduler: APScheduler instance + _running: Whether scheduler is running + _job_ids: Mapping of hand names to job IDs + """ + + def __init__( + self, + registry: HandRegistry, + job_defaults: Optional[dict] = None, + ) -> None: + """Initialize HandScheduler. + + Args: + registry: HandRegistry instance + job_defaults: Default job configuration for APScheduler + """ + self.registry = registry + self._scheduler: Optional[Any] = None + self._running = False + self._job_ids: dict[str, str] = {} + + if APSCHEDULER_AVAILABLE: + self._scheduler = AsyncIOScheduler(job_defaults=job_defaults or { + 'coalesce': True, # Coalesce missed jobs into one + 'max_instances': 1, # Only one instance per Hand + }) + + logger.info("HandScheduler initialized") + + async def start(self) -> None: + """Start the scheduler and schedule all enabled Hands.""" + if not APSCHEDULER_AVAILABLE: + logger.error("Cannot start scheduler: APScheduler not installed") + return + + if self._running: + logger.warning("Scheduler already running") + return + + # Schedule all enabled Hands + hands = self.registry.get_scheduled_hands() + for hand in hands: + await self.schedule_hand(hand) + + # Start the scheduler + self._scheduler.start() + self._running = True + + logger.info("HandScheduler started with %d scheduled Hands", len(hands)) + + async def stop(self) -> None: + """Stop the scheduler.""" + if not self._running or not self._scheduler: + return + + self._scheduler.shutdown(wait=True) + self._running = False + self._job_ids.clear() + + logger.info("HandScheduler stopped") + + async def schedule_hand(self, hand: HandConfig) -> Optional[str]: + """Schedule a Hand for execution. + + Args: + hand: HandConfig to schedule + + Returns: + Job ID if scheduled successfully + """ + if not APSCHEDULER_AVAILABLE or not self._scheduler: + logger.warning("Cannot schedule %s: APScheduler not available", hand.name) + return None + + if not hand.schedule: + logger.debug("Hand %s has no schedule", hand.name) + return None + + if not hand.enabled: + logger.debug("Hand %s is disabled", hand.name) + return None + + # Remove existing job if any + if hand.name in self._job_ids: + self.unschedule_hand(hand.name) + + # Create the trigger + trigger = self._create_trigger(hand.schedule) + if not trigger: + logger.error("Failed to create trigger for Hand %s", hand.name) + return None + + # Add job to scheduler + try: + job = self._scheduler.add_job( + func=self._execute_hand_wrapper, + trigger=trigger, + id=f"hand_{hand.name}", + name=f"Hand: {hand.name}", + args=[hand.name], + replace_existing=True, + ) + + self._job_ids[hand.name] = job.id + + # Update state + self.registry.update_state( + hand.name, + status=HandStatus.SCHEDULED, + next_run=job.next_run_time, + ) + + logger.info("Scheduled Hand %s (next run: %s)", hand.name, job.next_run_time) + return job.id + + except Exception as e: + logger.error("Failed to schedule Hand %s: %s", hand.name, e) + return None + + def unschedule_hand(self, name: str) -> bool: + """Remove a Hand from the scheduler. + + Args: + name: Hand name + + Returns: + True if unscheduled successfully + """ + if not self._scheduler: + return False + + if name not in self._job_ids: + return False + + try: + self._scheduler.remove_job(self._job_ids[name]) + del self._job_ids[name] + + self.registry.update_state(name, status=HandStatus.IDLE) + + logger.info("Unscheduled Hand %s", name) + return True + + except Exception as e: + logger.error("Failed to unschedule Hand %s: %s", name, e) + return False + + def pause_hand(self, name: str) -> bool: + """Pause a scheduled Hand. + + Args: + name: Hand name + + Returns: + True if paused successfully + """ + if not self._scheduler: + return False + + if name not in self._job_ids: + return False + + try: + self._scheduler.pause_job(self._job_ids[name]) + self.registry.update_state(name, status=HandStatus.PAUSED, is_paused=True) + logger.info("Paused Hand %s", name) + return True + except Exception as e: + logger.error("Failed to pause Hand %s: %s", name, e) + return False + + def resume_hand(self, name: str) -> bool: + """Resume a paused Hand. + + Args: + name: Hand name + + Returns: + True if resumed successfully + """ + if not self._scheduler: + return False + + if name not in self._job_ids: + return False + + try: + self._scheduler.resume_job(self._job_ids[name]) + self.registry.update_state(name, status=HandStatus.SCHEDULED, is_paused=False) + logger.info("Resumed Hand %s", name) + return True + except Exception as e: + logger.error("Failed to resume Hand %s: %s", name, e) + return False + + def get_scheduled_jobs(self) -> list[dict]: + """Get all scheduled jobs. + + Returns: + List of job information dicts + """ + if not self._scheduler: + return [] + + jobs = [] + for job in self._scheduler.get_jobs(): + if job.id.startswith("hand_"): + hand_name = job.id[5:] # Remove "hand_" prefix + jobs.append({ + "hand_name": hand_name, + "job_id": job.id, + "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, + "trigger": str(job.trigger), + }) + + return jobs + + def _create_trigger(self, schedule: Any) -> Optional[Any]: + """Create an APScheduler trigger from ScheduleConfig. + + Args: + schedule: ScheduleConfig + + Returns: + APScheduler trigger + """ + if not APSCHEDULER_AVAILABLE: + return None + + # Cron trigger + if schedule.cron: + try: + parts = schedule.cron.split() + if len(parts) == 5: + return CronTrigger( + minute=parts[0], + hour=parts[1], + day=parts[2], + month=parts[3], + day_of_week=parts[4], + timezone=schedule.timezone, + ) + except Exception as e: + logger.error("Invalid cron expression '%s': %s", schedule.cron, e) + return None + + # Interval trigger + if schedule.interval: + return IntervalTrigger( + seconds=schedule.interval, + timezone=schedule.timezone, + ) + + return None + + async def _execute_hand_wrapper(self, hand_name: str) -> None: + """Wrapper for Hand execution. + + This is called by APScheduler when a Hand's trigger fires. + + Args: + hand_name: Name of the Hand to execute + """ + logger.info("Triggering Hand: %s", hand_name) + + try: + # Update state + self.registry.update_state( + hand_name, + status=HandStatus.RUNNING, + last_run=datetime.now(timezone.utc), + ) + + # Execute the Hand + await self._run_hand(hand_name, TriggerType.SCHEDULE) + + except Exception as e: + logger.exception("Hand %s execution failed", hand_name) + self.registry.update_state( + hand_name, + status=HandStatus.ERROR, + error_message=str(e), + ) + + async def _run_hand(self, hand_name: str, trigger: TriggerType) -> None: + """Execute a Hand. + + This is the core execution logic. In Phase 4+, this will + call the actual Hand implementation. + + Args: + hand_name: Name of the Hand + trigger: What triggered the execution + """ + from hands.models import HandOutcome + + try: + hand = self.registry.get_hand(hand_name) + except Exception: + logger.error("Hand %s not found", hand_name) + return + + logger.info("Executing Hand %s (trigger: %s)", hand_name, trigger.value) + + # TODO: Phase 4+ - Call actual Hand implementation via HandRunner + # For now, just log the execution + + output = f"Hand {hand_name} executed (placeholder implementation)" + + # Log execution + await self.registry.log_execution( + hand_name=hand_name, + trigger=trigger.value, + outcome=HandOutcome.SUCCESS.value, + output=output, + ) + + # Update state + state = self.registry.get_state(hand_name) + self.registry.update_state( + hand_name, + status=HandStatus.SCHEDULED, + run_count=state.run_count + 1, + success_count=state.success_count + 1, + ) + + logger.info("Hand %s completed successfully", hand_name) + + async def trigger_hand_now(self, name: str) -> bool: + """Manually trigger a Hand to run immediately. + + Args: + name: Hand name + + Returns: + True if triggered successfully + """ + try: + await self._run_hand(name, TriggerType.MANUAL) + return True + except Exception as e: + logger.error("Failed to trigger Hand %s: %s", name, e) + return False + + def get_next_run_time(self, name: str) -> Optional[datetime]: + """Get next scheduled run time for a Hand. + + Args: + name: Hand name + + Returns: + Next run time or None if not scheduled + """ + if not self._scheduler or name not in self._job_ids: + return None + + try: + job = self._scheduler.get_job(self._job_ids[name]) + return job.next_run_time if job else None + except Exception: + return None