feat(hooks): introduce event hooks system for lifecycle management

Add a new hooks system allowing users to run custom code at key lifecycle points in the agent's operation. This includes support for events such as `gateway:startup`, `session:start`, `agent:step`, and more. Documentation for creating hooks and available events has been added to `README.md` and a new `hooks.md` file. Additionally, integrate step callbacks in the agent to facilitate hook execution during tool-calling iterations.
This commit is contained in:
teknium1
2026-02-28 17:09:26 -08:00
parent bf52468a91
commit 7f7643cf63
4 changed files with 252 additions and 0 deletions

View File

@@ -709,6 +709,21 @@ hermes cron status # Check if gateway is running
Even if no messaging platforms are configured, the gateway stays running for cron. A file lock prevents duplicate execution if multiple processes overlap.
### 🪝 Event Hooks
Run custom code at key lifecycle points — log activity, send alerts, post to webhooks. Hooks are Python handlers that fire automatically during gateway operation.
```
~/.hermes/hooks/
└── my-hook/
├── HOOK.yaml # name + events to subscribe to
└── handler.py # async def handle(event_type, context)
```
**Available events:** `gateway:startup`, `session:start`, `session:reset`, `agent:start`, `agent:step`, `agent:end`, `command:*` (wildcard — fires for any slash command).
Hooks are non-blocking — errors are caught and logged, never crashing the agent. See [docs/hooks.md](docs/hooks.md) for the full event reference, context keys, and examples.
### 🛡️ Exec Approval (Messaging Platforms)
When the agent tries to run a potentially dangerous command (`rm -rf`, `chmod 777`, etc.) on Telegram/Discord/WhatsApp, instead of blocking it silently, it asks the user for approval:

174
docs/hooks.md Normal file
View File

@@ -0,0 +1,174 @@
# Event Hooks
The hooks system lets you run custom code at key points in the agent lifecycle — session creation, slash commands, each tool-calling step, and more. Hooks are discovered automatically from `~/.hermes/hooks/` and fire without blocking the main agent pipeline.
## Creating a Hook
Each hook is a directory under `~/.hermes/hooks/` containing two files:
```
~/.hermes/hooks/
└── my-hook/
├── HOOK.yaml # Declares which events to listen for
└── handler.py # Python handler function
```
### HOOK.yaml
```yaml
name: my-hook
description: Log all agent activity to a file
events:
- agent:start
- agent:end
- agent:step
```
The `events` list determines which events trigger your handler. You can subscribe to any combination of events, including wildcards like `command:*`.
### handler.py
```python
import json
from datetime import datetime
from pathlib import Path
LOG_FILE = Path.home() / ".hermes" / "hooks" / "my-hook" / "activity.log"
async def handle(event_type: str, context: dict):
"""Called for each subscribed event. Must be named 'handle'."""
entry = {
"timestamp": datetime.now().isoformat(),
"event": event_type,
**context,
}
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
```
The handler function:
- Must be named `handle`
- Receives `event_type` (string) and `context` (dict)
- Can be `async def` or regular `def` — both work
- Errors are caught and logged, never crashing the agent
## Available Events
| Event | When it fires | Context keys |
|-------|---------------|--------------|
| `gateway:startup` | Gateway process starts | `platforms` (list of active platform names) |
| `session:start` | New messaging session created | `platform`, `user_id`, `session_id`, `session_key` |
| `session:reset` | User ran `/new` or `/reset` | `platform`, `user_id`, `session_key` |
| `agent:start` | Agent begins processing a message | `platform`, `user_id`, `session_id`, `message` |
| `agent:step` | Each iteration of the tool-calling loop | `platform`, `user_id`, `session_id`, `iteration`, `tool_names` |
| `agent:end` | Agent finishes processing | `platform`, `user_id`, `session_id`, `message`, `response` |
| `command:*` | Any slash command executed | `platform`, `user_id`, `command`, `args` |
### Wildcard Matching
Handlers registered for `command:*` fire for any `command:` event (`command:model`, `command:reset`, etc.). This lets you monitor all slash commands with a single subscription.
## Examples
### Telegram Notification on Long Tasks
Send yourself a Telegram message when the agent takes more than 10 tool-calling steps:
```yaml
# ~/.hermes/hooks/long-task-alert/HOOK.yaml
name: long-task-alert
description: Alert when agent is taking many steps
events:
- agent:step
```
```python
# ~/.hermes/hooks/long-task-alert/handler.py
import os
import httpx
THRESHOLD = 10
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_HOME_CHANNEL")
async def handle(event_type: str, context: dict):
iteration = context.get("iteration", 0)
if iteration == THRESHOLD and BOT_TOKEN and CHAT_ID:
tools = ", ".join(context.get("tool_names", []))
text = f"⚠️ Agent has been running for {iteration} steps. Last tools: {tools}"
async with httpx.AsyncClient() as client:
await client.post(
f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage",
json={"chat_id": CHAT_ID, "text": text},
)
```
### Command Usage Logger
Track which slash commands are used and how often:
```yaml
# ~/.hermes/hooks/command-logger/HOOK.yaml
name: command-logger
description: Log slash command usage
events:
- command:*
```
```python
# ~/.hermes/hooks/command-logger/handler.py
import json
from datetime import datetime
from pathlib import Path
LOG = Path.home() / ".hermes" / "logs" / "command_usage.jsonl"
def handle(event_type: str, context: dict):
LOG.parent.mkdir(parents=True, exist_ok=True)
entry = {
"ts": datetime.now().isoformat(),
"command": context.get("command"),
"args": context.get("args"),
"platform": context.get("platform"),
"user": context.get("user_id"),
}
with open(LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
```
### Session Start Webhook
POST to an external service whenever a new session starts:
```yaml
# ~/.hermes/hooks/session-webhook/HOOK.yaml
name: session-webhook
description: Notify external service on new sessions
events:
- session:start
- session:reset
```
```python
# ~/.hermes/hooks/session-webhook/handler.py
import httpx
WEBHOOK_URL = "https://your-service.example.com/hermes-events"
async def handle(event_type: str, context: dict):
async with httpx.AsyncClient() as client:
await client.post(WEBHOOK_URL, json={
"event": event_type,
**context,
}, timeout=5)
```
## How It Works
1. On gateway startup, `HookRegistry.discover_and_load()` scans `~/.hermes/hooks/`
2. Each subdirectory with `HOOK.yaml` + `handler.py` is loaded dynamically
3. Handlers are registered for their declared events
4. At each lifecycle point, `hooks.emit()` fires all matching handlers
5. Errors in any handler are caught and logged — a broken hook never crashes the agent
Hooks only fire in the **gateway** (Telegram, Discord, Slack, WhatsApp). The CLI does not currently load hooks. The `agent:step` event bridges from the sync agent thread to the async hook system via `asyncio.run_coroutine_threadsafe`.

View File

@@ -609,6 +609,18 @@ class GatewayRunner:
# Check for commands
command = event.get_command()
# Emit command:* hook for any recognized slash command
_known_commands = {"new", "reset", "help", "status", "stop", "model",
"personality", "retry", "undo", "sethome", "set-home"}
if command and command in _known_commands:
await self.hooks.emit(f"command:{command}", {
"platform": source.platform.value if source.platform else "",
"user_id": source.user_id,
"command": command,
"args": event.get_command_args().strip(),
})
if command in ["new", "reset"]:
return await self._handle_reset_command(event)
@@ -679,6 +691,19 @@ class GatewayRunner:
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key
# Emit session:start for new or auto-reset sessions
_is_new_session = (
session_entry.created_at == session_entry.updated_at
or getattr(session_entry, "was_auto_reset", False)
)
if _is_new_session:
await self.hooks.emit("session:start", {
"platform": source.platform.value if source.platform else "",
"user_id": source.user_id,
"session_id": session_entry.session_id,
"session_key": session_key,
})
# Build session context
context = build_session_context(source, self.config, session_entry)
@@ -1618,6 +1643,25 @@ class GatewayRunner:
result_holder = [None] # Mutable container for the result
tools_holder = [None] # Mutable container for the tool definitions
# Bridge sync step_callback → async hooks.emit for agent:step events
_loop_for_step = asyncio.get_event_loop()
_hooks_ref = self.hooks
def _step_callback_sync(iteration: int, tool_names: list) -> None:
try:
asyncio.run_coroutine_threadsafe(
_hooks_ref.emit("agent:step", {
"platform": source.platform.value if source.platform else "",
"user_id": source.user_id,
"session_id": session_id,
"iteration": iteration,
"tool_names": tool_names,
}),
_loop_for_step,
)
except Exception as _e:
logger.debug("agent:step hook error: %s", _e)
def run_sync():
# Pass session_key to process registry via env var so background
# processes can be mapped back to this gateway session
@@ -1687,6 +1731,7 @@ class GatewayRunner:
reasoning_config=self._reasoning_config,
session_id=session_id,
tool_progress_callback=progress_callback if tool_progress_enabled else None,
step_callback=_step_callback_sync if _hooks_ref.loaded_hooks else None,
platform=platform_key,
honcho_session_key=session_key,
session_db=self._session_db,

View File

@@ -124,6 +124,7 @@ class AIAgent:
session_id: str = None,
tool_progress_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
@@ -195,6 +196,7 @@ class AIAgent:
)
self.tool_progress_callback = tool_progress_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
@@ -1936,6 +1938,22 @@ class AIAgent:
api_call_count += 1
# Fire step_callback for gateway hooks (agent:step event)
if self.step_callback is not None:
try:
prev_tools = []
for _m in reversed(messages):
if _m.get("role") == "assistant" and _m.get("tool_calls"):
prev_tools = [
tc["function"]["name"]
for tc in _m["tool_calls"]
if isinstance(tc, dict)
]
break
self.step_callback(api_call_count, prev_tools)
except Exception as _step_err:
logger.debug("step_callback error (iteration %s): %s", api_call_count, _step_err)
# Track tool-calling iterations for skill nudge.
# Counter resets whenever skill_manage is actually used.
if (self._skill_nudge_interval > 0