Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
418e601f74 docs: add human confirmation firewall research report
All checks were successful
Lint / lint (pull_request) Successful in 9s
2026-04-22 11:22:24 -04:00
6 changed files with 515 additions and 607 deletions

View File

@@ -29,8 +29,6 @@ import logging
import os
import ssl
import threading
import time
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Callable, Dict, Optional
@@ -443,244 +441,3 @@ class A2AMTLSClient:
def post(self, url: str, json: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Dict[str, Any]:
data = (__import__("json").dumps(json).encode() if json is not None else None)
return self._request("POST", url, data=data, **kwargs)
# ---------------------------------------------------------------------------
# Structured A2A task delegation over mTLS
# ---------------------------------------------------------------------------
_TERMINAL_TASK_STATES = {"completed", "failed", "canceled", "rejected"}
def _iso_now() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def _task_status(state: str, message: str) -> Dict[str, Any]:
return {
"state": state,
"message": message,
"timestamp": _iso_now(),
}
def _coerce_artifact(result: Any) -> Dict[str, Any]:
if isinstance(result, dict):
if "text" in result:
return result
if "artifact" in result and isinstance(result["artifact"], dict):
return result["artifact"]
return {"text": str(result)}
def _build_task_record(task_id: str, task: str, requester: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return {
"taskId": task_id,
"task": task,
"requester": requester,
"metadata": metadata or {},
"artifacts": [],
"status": _task_status("submitted", "Task submitted"),
}
def _default_agent_card(host: str, port: int) -> Dict[str, Any]:
base_url = f"https://{host}:{port}"
try:
from agent.agent_card import build_agent_card
from dataclasses import asdict
card = asdict(build_agent_card())
except Exception as exc: # pragma: no cover - fallback only exercised when card build breaks
logger.warning("Falling back to minimal agent card: %s", exc)
card = {
"name": os.environ.get("HERMES_AGENT_NAME", "hermes"),
"description": "Hermes A2A task server",
"version": "unknown",
}
card["url"] = base_url
card["a2aTaskEndpoint"] = f"{base_url}/a2a/rpc"
return card
def _default_local_hermes_executor(task_payload: Dict[str, Any]) -> Dict[str, Any]:
task_text = str(task_payload.get("task", "")).strip()
if not task_text:
return {"text": ""}
from run_agent import AIAgent
agent = AIAgent(quiet_mode=True)
result = agent.chat(task_text)
return {
"text": result,
"metadata": {"executor": "local-hermes"},
}
class A2ATaskServer:
"""JSON-RPC A2A task server running over the routing mTLS server."""
def __init__(
self,
cert: str | Path,
key: str | Path,
ca: str | Path,
host: str = "127.0.0.1",
port: int = 9443,
executor: Optional[Callable[[Dict[str, Any]], Dict[str, Any]]] = None,
card_factory: Optional[Callable[[], Dict[str, Any]]] = None,
) -> None:
self.host = host
self.port = port
self._server = A2AMTLSServer(cert=cert, key=key, ca=ca, host=host, port=port)
self._executor = executor or _default_local_hermes_executor
self._card_factory = card_factory or (lambda: _default_agent_card(self.host, self.port))
self._tasks: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
self._server.add_route("/.well-known/agent-card.json", self._handle_agent_card)
self._server.add_route("/agent-card.json", self._handle_agent_card)
self._server.add_route("/a2a/rpc", self._handle_rpc)
def __enter__(self) -> "A2ATaskServer":
self.start()
return self
def __exit__(self, *_: Any) -> None:
self.stop()
def start(self) -> None:
self._server.start()
def stop(self) -> None:
self._server.stop()
def _handle_agent_card(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
return self._card_factory()
def _handle_rpc(self, payload: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
req_id = payload.get("id")
if payload.get("jsonrpc") != "2.0":
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32600, "message": "invalid jsonrpc version"}}
method = payload.get("method")
params = payload.get("params") or {}
try:
if method == "tasks/send":
result = self._rpc_send_task(params, peer_cn=peer_cn)
elif method == "tasks/get":
result = self._rpc_get_task(params)
else:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32601, "message": f"unknown method: {method}"}}
except Exception as exc:
logger.exception("A2A task RPC failed: %s", exc)
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(exc)}}
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def _rpc_send_task(self, params: Dict[str, Any], *, peer_cn: str | None = None) -> Dict[str, Any]:
task_text = str(params.get("task", "")).strip()
if not task_text:
raise ValueError("task is required")
task_id = params.get("taskId") or uuid.uuid4().hex
requester = params.get("requester") or peer_cn
metadata = dict(params.get("metadata") or {})
if peer_cn:
metadata.setdefault("peer_cn", peer_cn)
record = _build_task_record(task_id, task_text, requester, metadata)
with self._lock:
self._tasks[task_id] = record
worker = threading.Thread(target=self._run_task, args=(task_id,), daemon=True, name=f"a2a-task-{task_id[:8]}")
worker.start()
return self._copy_task(task_id)
def _rpc_get_task(self, params: Dict[str, Any]) -> Dict[str, Any]:
task_id = str(params.get("taskId", "")).strip()
if not task_id:
raise ValueError("taskId is required")
return self._copy_task(task_id)
def _copy_task(self, task_id: str) -> Dict[str, Any]:
with self._lock:
if task_id not in self._tasks:
raise KeyError(f"unknown taskId: {task_id}")
return json.loads(json.dumps(self._tasks[task_id]))
def _run_task(self, task_id: str) -> None:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("working", "Task is running")
task_payload = {
"taskId": task["taskId"],
"task": task["task"],
"requester": task.get("requester"),
"metadata": dict(task.get("metadata") or {}),
}
try:
result = self._executor(task_payload)
artifact = _coerce_artifact(result)
with self._lock:
task = self._tasks[task_id]
task["artifacts"] = [artifact]
task["status"] = _task_status("completed", "Task completed")
except Exception as exc:
with self._lock:
task = self._tasks[task_id]
task["status"] = _task_status("failed", f"Task failed: {exc}")
class A2ATaskClient(A2AMTLSClient):
"""Client helper for A2A JSON-RPC task send/get flows."""
def discover_card(self, base_url: str) -> Dict[str, Any]:
return self.get(f"{base_url.rstrip('/')}/.well-known/agent-card.json")
def _rpc_call(self, base_url: str, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
payload = {
"jsonrpc": "2.0",
"id": uuid.uuid4().hex,
"method": method,
"params": params,
}
response = self.post(f"{base_url.rstrip('/')}/a2a/rpc", json=payload)
if "error" in response:
error = response["error"]
raise RuntimeError(error.get("message") or str(error))
return response.get("result", {})
def send_task(
self,
base_url: str,
*,
task: str,
requester: str | None = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
return self._rpc_call(
base_url,
"tasks/send",
{
"task": task,
"requester": requester,
"metadata": metadata or {},
},
)
def get_task(self, base_url: str, task_id: str) -> Dict[str, Any]:
return self._rpc_call(base_url, "tasks/get", {"taskId": task_id})
def wait_for_task(
self,
base_url: str,
task_id: str,
*,
timeout: float = 30.0,
poll_interval: float = 0.5,
) -> Dict[str, Any]:
deadline = time.monotonic() + timeout
while True:
task = self.get_task(base_url, task_id)
state = str(((task.get("status") or {}).get("state") or "")).lower()
if state in _TERMINAL_TASK_STATES:
return task
if time.monotonic() >= deadline:
raise TimeoutError(f"Timed out waiting for task {task_id}")
time.sleep(poll_interval)

View File

@@ -1,132 +0,0 @@
"""CLI helpers for A2A task delegation."""
from __future__ import annotations
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Any
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
from hermes_cli.config import get_hermes_home
def _registry_path() -> Path:
return get_hermes_home() / "a2a_agents.json"
def _default_identity_paths() -> tuple[str, str, str]:
hermes_home = get_hermes_home()
agent_name = os.environ.get("HERMES_AGENT_NAME", "hermes").lower()
cert = os.environ.get(
"HERMES_A2A_CERT",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.crt"),
)
key = os.environ.get(
"HERMES_A2A_KEY",
str(hermes_home / "pki" / "agents" / agent_name / f"{agent_name}.key"),
)
ca = os.environ.get(
"HERMES_A2A_CA",
str(hermes_home / "pki" / "ca" / "fleet-ca.crt"),
)
return cert, key, ca
def load_agent_registry(path: Path | None = None) -> dict[str, Any]:
registry_path = path or _registry_path()
if not registry_path.exists():
return {}
return json.loads(registry_path.read_text(encoding="utf-8"))
def resolve_agent_url(agent: str, *, registry_path: Path | None = None) -> str:
key = re.sub(r"[^A-Za-z0-9]+", "_", agent).upper()
env_value = os.getenv(f"HERMES_A2A_{key}_URL")
if env_value:
return env_value
registry = load_agent_registry(registry_path)
entry = registry.get(agent)
if isinstance(entry, str) and entry:
return entry
if isinstance(entry, dict):
url = entry.get("url") or entry.get("base_url") or entry.get("card_url")
if url:
return str(url)
if agent.startswith("https://") or agent.startswith("http://"):
return agent
raise SystemExit(f"Unknown A2A agent '{agent}'. Set HERMES_A2A_{key}_URL or add it to {_registry_path()}.")
def _print(data: dict[str, Any]) -> None:
print(json.dumps(data, indent=2, ensure_ascii=False))
def cmd_send(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
card = client.discover_card(base_url)
task = client.send_task(
base_url,
task=args.task,
requester=args.requester,
metadata={"agent": args.agent},
)
if args.wait:
task = client.wait_for_task(
base_url,
task["taskId"],
timeout=args.timeout,
poll_interval=args.poll_interval,
)
_print({
"agent": args.agent,
"url": base_url,
"card": card,
"task": task,
})
def cmd_status(args) -> None:
base_url = args.url or resolve_agent_url(args.agent)
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
client = A2ATaskClient(cert=cert, key=key, ca=ca)
task = client.get_task(base_url, args.task_id)
_print({"agent": args.agent, "url": base_url, "task": task})
def cmd_serve(args) -> None:
cert, key, ca = args.cert, args.key, args.ca
if not (cert and key and ca):
cert, key, ca = _default_identity_paths()
server = A2ATaskServer(cert=cert, key=key, ca=ca, host=args.host, port=args.port)
server.start()
print(f"A2A task server listening on https://{args.host}:{args.port}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
def cmd_a2a(args) -> None:
command = getattr(args, "a2a_command", None) or "send"
if command == "send":
cmd_send(args)
return
if command == "status":
cmd_status(args)
return
if command == "serve":
cmd_serve(args)
return
raise SystemExit(f"Unknown a2a command: {command}")

View File

@@ -173,13 +173,6 @@ from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
def cmd_a2a(args):
"""Dispatch A2A CLI subcommands lazily to avoid heavy imports at startup."""
from hermes_cli.a2a_cmd import cmd_a2a as _cmd_a2a
return _cmd_a2a(args)
def _relative_time(ts) -> str:
"""Format a timestamp as relative time (e.g., '2h ago', 'yesterday')."""
if not ts:
@@ -4788,45 +4781,6 @@ For more help on a command:
gateway_parser.set_defaults(func=cmd_gateway)
# =========================================================================
# a2a command
# =========================================================================
a2a_parser = subparsers.add_parser(
"a2a",
help="A2A task delegation over mutual TLS",
description="Send, inspect, and serve structured A2A tasks between Hermes agents",
)
a2a_subparsers = a2a_parser.add_subparsers(dest="a2a_command")
a2a_send = a2a_subparsers.add_parser("send", help="Send an A2A task to another agent")
a2a_send.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_send.add_argument("--task", required=True, help="Task text to delegate")
a2a_send.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_send.add_argument("--requester", default=None, help="Requester label included in task metadata")
a2a_send.add_argument("--wait", action="store_true", help="Poll until the task reaches a terminal state")
a2a_send.add_argument("--timeout", type=float, default=30.0, help="Wait timeout in seconds (default: 30)")
a2a_send.add_argument("--poll-interval", type=float, default=0.5, help="Polling interval in seconds while waiting (default: 0.5)")
a2a_send.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_send.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_send.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_status = a2a_subparsers.add_parser("status", help="Fetch the current status of an A2A task")
a2a_status.add_argument("--agent", required=True, help="Agent alias or URL (for example: allegro)")
a2a_status.add_argument("--task-id", required=True, help="Task identifier returned by a2a send")
a2a_status.add_argument("--url", help="Explicit base URL for the remote agent")
a2a_status.add_argument("--cert", default=None, help="Client certificate path (defaults from HERMES_A2A_CERT)")
a2a_status.add_argument("--key", default=None, help="Client private key path (defaults from HERMES_A2A_KEY)")
a2a_status.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_serve = a2a_subparsers.add_parser("serve", help="Run the local A2A task server")
a2a_serve.add_argument("--host", default=os.environ.get("HERMES_A2A_HOST", "127.0.0.1"), help="Bind host (default: HERMES_A2A_HOST or 127.0.0.1)")
a2a_serve.add_argument("--port", type=int, default=int(os.environ.get("HERMES_A2A_PORT", "9443")), help="Bind port (default: HERMES_A2A_PORT or 9443)")
a2a_serve.add_argument("--cert", default=None, help="Server certificate path (defaults from HERMES_A2A_CERT)")
a2a_serve.add_argument("--key", default=None, help="Server private key path (defaults from HERMES_A2A_KEY)")
a2a_serve.add_argument("--ca", default=None, help="Fleet CA certificate path (defaults from HERMES_A2A_CA)")
a2a_parser.set_defaults(func=cmd_a2a)
# =========================================================================
# setup command
# =========================================================================

View File

@@ -0,0 +1,515 @@
# Human Confirmation Firewall: Research Report
## Implementation Patterns for Hermes Agent
**Issue:** #878
**Parent:** #659
**Priority:** P0
**Scope:** Human-in-the-loop safety patterns for tool calls, crisis handling, and irreversible actions
---
## Executive Summary
Hermes already has a partial human confirmation firewall, but it is narrow.
Current repo state shows:
- a real **pre-execution gate** for dangerous terminal commands in `tools/approval.py`
- a partial **confidence-threshold path** via `_smart_approve()` in `tools/approval.py`
- gateway support for blocking approval resolution in `gateway/run.py`
What is still missing is the core recommendation from this research issue:
- **confidence scoring on all tool calls**, not just terminal commands that already matched a dangerous regex
- a **hard pre-execution human gate for crisis interventions**, especially any action that would auto-respond to suicidal content
- a consistent way to classify actions into:
1. pre-execution gate
2. post-execution review
3. confidence-threshold execution
Recommendation:
- use **Pattern 1: Pre-Execution Gate** for crisis interventions and irreversible/high-impact actions
- use **Pattern 3: Confidence Threshold** for normal operations
- reserve **Pattern 2: Post-Execution Review** only for low-risk and reversible actions
The next implementation step should be a **tool-call risk assessment layer** that runs before dispatch in `model_tools.handle_function_call()`, assigns a score and pattern to every tool call, and routes only the highest-risk calls into mandatory human confirmation.
---
## 1. The Three Proven Patterns
### Pattern 1: Pre-Execution Gate
Definition:
- halt before execution
- show the proposed action to the human
- require explicit approval or denial
Best for:
- destructive actions
- irreversible side effects
- crisis interventions
- actions that affect another human's safety, money, infrastructure, or private data
Strengths:
- strongest safety guarantee
- simplest audit story
- prevents the most catastrophic failure mode: acting first and apologizing later
Weaknesses:
- adds latency
- creates operator burden if overused
- should not be applied to every ordinary tool call
### Pattern 2: Post-Execution Review
Definition:
- execute first
- expose result to human
- allow rollback or follow-up correction
Best for:
- reversible operations
- low-risk actions with fast recovery
- tasks where human review matters but immediate execution is acceptable
Strengths:
- low friction
- fast iteration
- useful when rollback is practical
Weaknesses:
- unsafe for crisis or destructive actions
- only works when rollback actually exists
- a poor fit for external communication or life-safety contexts
### Pattern 3: Confidence Threshold
Definition:
- compute a risk/confidence score before execution
- auto-execute high-confidence safe actions
- request confirmation for lower-confidence or higher-risk actions
Best for:
- mixed-risk tool ecosystems
- day-to-day operations where always-confirm would be too expensive
- systems with a large volume of ordinary, safe reads and edits
Strengths:
- best balance of speed and safety
- scales across many tool types
- allows targeted human attention where it matters most
Weaknesses:
- depends on a good scoring model
- weak scoring creates false negatives or unnecessary prompts
- must remain inspectable and debuggable
---
## 2. What Hermes Already Has
## 2.1 Existing Pre-Execution Gate for Dangerous Terminal Commands
`tools/approval.py` already implements a real pre-execution confirmation path for dangerous shell commands.
Observed components:
- `DANGEROUS_PATTERNS`
- `detect_dangerous_command()`
- `prompt_dangerous_approval()`
- `check_dangerous_command()`
- gateway queueing and resolution support in the same module
This is already Pattern 1.
Current behavior:
- dangerous terminal commands are detected before execution
- the user can allow once / session / always / deny
- gateway sessions can block until approval resolves
This is a strong foundation, but it is limited to a subset of terminal commands.
## 2.2 Partial Confidence Threshold via Smart Approvals
Hermes also already has a partial Pattern 3.
Observed component:
- `_smart_approve()` in `tools/approval.py`
Current behavior:
- only runs **after** a command has already been flagged by dangerous-pattern detection
- uses the auxiliary LLM to decide:
- approve
- deny
- escalate
This means Hermes has a confidence-threshold mechanism, but only for **already-flagged dangerous terminal commands**.
What it does not yet do:
- score all tool calls
- classify non-terminal tools
- distinguish crisis interventions from normal ops
- produce a shared risk model across the tool surface
## 2.3 Blocking Approval UX in Gateway
`gateway/run.py` already routes `/approve` and `/deny` into the blocking approval path.
This means the infrastructure for a true human confirmation firewall already exists in messaging contexts.
That is important because the missing work is not "invent human approval from zero."
The missing work is:
- expand the scope from dangerous shell commands to **all tool calls that matter**
- make the routing policy explicit and inspectable
---
## 3. What Hermes Still Lacks
## 3.1 No Universal Tool-Call Risk Assessment
The current approval system is command-pattern-centric.
It is not yet a tool-call firewall.
Missing capability:
- before dispatch, every tool call should receive a structured assessment:
- tool name
- side-effect class
- reversibility
- human-impact potential
- crisis relevance
- confidence score
- recommended confirmation pattern
Natural insertion point:
- `model_tools.handle_function_call()`
That function already sits at the central dispatch boundary.
It is the right place to add a pre-dispatch classifier.
## 3.2 No Hard Crisis Gate for Outbound Intervention
Issue #878 explicitly recommends:
- Pattern 1 for crisis interventions
- never auto-respond to suicidal content
That recommendation is not yet codified as a global firewall rule.
Missing rule:
- if a tool call would directly intervene in a crisis context or send outward guidance in response to suicidal content, it must require explicit human confirmation before execution
Examples that should hard-gate:
- outbound `send_message` content aimed at a suicidal user
- any future tool that places calls, escalates emergencies, or contacts third parties about a crisis
- any autonomous action that claims a person should or should not take a life-safety step
## 3.3 No First-Class Post-Execution Review Policy
Hermes has approval and denial, but it does not yet have a formal policy for when Pattern 2 is acceptable.
Without a policy, post-execution review tends to get used implicitly rather than intentionally.
That is risky.
Hermes should define Pattern 2 narrowly:
- only for actions that are both low-risk and reversible
- only when the system can show the human exactly what happened
- never for crisis, finance, destructive config, or sensitive comms
---
## 4. Recommended Architecture for Hermes
## 4.1 Add a Tool-Call Assessment Layer
Add a pre-dispatch assessment object for every tool call.
Suggested shape:
```python
@dataclass
class ToolCallAssessment:
tool_name: str
risk_score: float # 0.0 to 1.0
confidence: float # confidence in the assessment itself
pattern: str # pre_execution_gate | post_execution_review | confidence_threshold
requires_human: bool
reasons: list[str]
reversible: bool
crisis_sensitive: bool
```
Suggested execution point:
- inside `model_tools.handle_function_call()` before `orchestrator.dispatch()`
Why here:
- one place covers all tools
- one place can emit traces
- one place can remain model-agnostic
- one place lets plugins observe or override the assessment
## 4.2 Classify Tool Calls by Side-Effect Class
Suggested first-pass taxonomy:
### A. Read-only
Examples:
- `read_file`
- `search_files`
- `browser_snapshot`
- `browser_console` read-only inspection
Pattern:
- confidence threshold
- almost always auto-execute
- human confirmation normally unnecessary
### B. Local reversible edits
Examples:
- `patch`
- `write_file`
- `todo`
Pattern:
- confidence threshold
- human confirmation only when risk score rises because of path sensitivity or scope breadth
### C. External side effects
Examples:
- `send_message`
- `cronjob`
- `delegate_task`
- smart-home actuation tools
Pattern:
- confidence threshold by default
- pre-execution gate when score exceeds threshold or when context is sensitive
### D. Critical / destructive / crisis-sensitive
Examples:
- dangerous `terminal`
- financial actions
- deletion / kill / restart / deployment in sensitive paths
- outbound crisis intervention
Pattern:
- pre-execution gate
- never auto-execute on confidence alone
## 4.3 Crisis Override Rule
Add a hard override:
```text
If tool call is crisis-sensitive AND outbound or irreversible:
requires_human = True
pattern = pre_execution_gate
```
This is the most important rule in the issue.
The model may draft the message.
The human must confirm before the system sends it.
## 4.4 Use Confidence Threshold for Normal Ops
For non-crisis operations, use Pattern 3.
Suggested logic:
- low risk + high assessment confidence -> auto-execute
- medium risk or medium confidence -> ask human
- high risk -> always ask human
Key point:
- confidence is not just "how sure the LLM is"
- confidence should combine:
- tool type certainty
- argument clarity
- path sensitivity
- external side effects
- crisis indicators
---
## 5. Recommended Initial Scoring Factors
A simple initial scorer is enough.
It does not need to be fancy.
Suggested factors:
### 5.1 Tool class risk
- read-only tools: very low base risk
- local mutation tools: moderate base risk
- external communication / automation tools: higher base risk
- shell execution: variable, often high
### 5.2 Target sensitivity
Examples:
- `/tmp` or local scratch paths -> lower
- repo files under git -> medium
- system config, credentials, secrets, gateway lifecycle -> high
- human-facing channels -> high if message content is sensitive
### 5.3 Reversibility
- reversible -> lower
- difficult but possible to undo -> medium
- practically irreversible -> high
### 5.4 Human-impact content
- no direct human impact -> low
- administrative impact -> medium
- crisis / safety / emotional intervention -> critical
### 5.5 Context certainty
- arguments are explicit and narrow -> higher confidence
- arguments are vague, inferred, or broad -> lower confidence
---
## 6. Implementation Plan
## Phase 1: Assessment Without Behavior Change
Goal:
- score all tool calls
- log assessment decisions
- emit traces for review
- do not yet block new tool categories
Files to touch:
- `tools/approval.py`
- `model_tools.py`
- tests for assessment coverage
Output:
- risk/confidence trace for every tool call
- pattern recommendation for every tool call
Why first:
- lets us calibrate before changing runtime behavior
- avoids breaking existing workflows blindly
## Phase 2: Hard-Gate Crisis-Sensitive Outbound Actions
Goal:
- enforce Pattern 1 for crisis interventions
Likely surfaces:
- `send_message`
- any future telephony / call / escalation tools
- other tools with direct human intervention side effects
Rule:
- never auto-send crisis intervention content without human confirmation
## Phase 3: General Confidence Threshold for Normal Ops
Goal:
- apply Pattern 3 to all tool calls
- auto-run clearly safe actions
- escalate ambiguous or medium-risk actions
Likely thresholds:
- score < 0.25 -> auto
- 0.25 to 0.60 -> confirm if confidence is weak
- > 0.60 -> confirm
- crisis-sensitive -> always confirm
## Phase 4: Optional Post-Execution Review Lane
Goal:
- allow Pattern 2 only for explicitly reversible operations
Examples:
- maybe low-risk messaging drafts saved locally
- maybe reversible UI actions in specific environments
Important:
- this phase is optional
- Hermes should not rely on Pattern 2 for safety-critical flows
---
## 7. Verification Criteria for the Future Implementation
The eventual implementation should prove all of the following:
1. every tool call receives a scored assessment before dispatch
2. crisis-sensitive outbound actions always require human confirmation
3. dangerous terminal commands still preserve their current pre-execution gate
4. clearly safe read-only tool calls are not slowed by unnecessary prompts
5. assessment traces can be inspected after a run
6. approval decisions remain session-safe across CLI and gateway contexts
---
## 8. Concrete Recommendations
### Recommendation 1
Do **not** replace the current dangerous-command approval path.
Generalize above it.
Why:
- existing terminal Pattern 1 already works
- this is the strongest piece of the current firewall
### Recommendation 2
Add a universal scorer in `model_tools.handle_function_call()`.
Why:
- that is the first point where Hermes knows the tool name and structured arguments
- it is the cleanest place to classify all tool calls uniformly
### Recommendation 3
Treat crisis-sensitive outbound intervention as a separate safety class.
Why:
- issue #878 explicitly calls for Pattern 1 here
- this matches Timmy's SOUL-level safety requirements
### Recommendation 4
Ship scoring traces before enforcement expansion.
Why:
- you cannot tune thresholds you cannot inspect
- false positives will otherwise frustrate normal usage
### Recommendation 5
Use Pattern 3 as the default policy for normal operations.
Why:
- full manual confirmation on every tool call is too expensive
- full autonomy is too risky
- Pattern 3 is the practical middle ground
---
## 9. Bottom Line
Hermes should implement a **two-track human confirmation firewall**:
1. **Pattern 1: Pre-Execution Gate**
- crisis interventions
- destructive terminal actions
- irreversible or safety-critical tool calls
2. **Pattern 3: Confidence Threshold**
- all ordinary tool calls
- driven by a universal tool-call assessment layer
- integrated at the central dispatch boundary
Pattern 2 should remain optional and narrow.
It is not the primary answer for Hermes.
The repo already contains the beginnings of this system.
The next step is not new theory.
It is to turn the existing approval path into a true **tool-call-wide human confirmation firewall**.
---
## References
- Issue #878 — Human Confirmation Firewall Implementation Patterns
- Issue #659 — Critical Research Tasks
- `tools/approval.py` — current dangerous-command approval flow and smart approvals
- `model_tools.py` — central tool dispatch boundary
- `gateway/run.py` — blocking approval handling for messaging sessions

View File

@@ -572,94 +572,3 @@ class TestA2AMTLSServerAndClient:
assert not errors, f"Concurrent connection errors: {errors}"
assert len(results) == 3
@_requires_crypto
class TestA2ATaskServerAndClient:
"""Structured A2A task send/get flow over mTLS."""
@pytest.fixture(autouse=True)
def _pki(self, tmp_path):
ca_dir = tmp_path / "ca"
ca_dir.mkdir()
self.ca_crt, self.ca_key = _make_ca_keypair(ca_dir)
agent_dir = tmp_path / "agents"
agent_dir.mkdir()
self.srv_crt, self.srv_key = _make_agent_keypair(
agent_dir, "timmy", self.ca_crt, self.ca_key
)
self.cli_crt, self.cli_key = _make_agent_keypair(
agent_dir, "allegro", self.ca_crt, self.ca_key
)
@pytest.fixture()
def task_server(self):
from agent.a2a_mtls import A2ATaskServer
gate = threading.Event()
def analyze_executor(task: dict[str, object]) -> dict[str, object]:
gate.wait(timeout=2)
text = str(task.get("task", ""))
return {
"text": f"analysis:{text}",
"metadata": {"tool": "local-hermes-stub"},
}
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=analyze_executor,
)
with server:
time.sleep(0.1)
yield server, port, gate
def test_task_send_get_and_completion_flow(self, task_server):
from agent.a2a_mtls import A2ATaskClient
server, port, gate = task_server
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
card = client.discover_card(base_url)
assert card["name"]
submitted = client.send_task(base_url, task="Analyze README.md", requester="timmy")
assert submitted["status"]["state"] in {"submitted", "working"}
in_flight = client.get_task(base_url, submitted["taskId"])
assert in_flight["status"]["state"] in {"submitted", "working"}
gate.set()
completed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert completed["status"]["state"] == "completed"
assert completed["artifacts"][0]["text"] == "analysis:Analyze README.md"
def test_failed_executor_marks_task_failed(self):
from agent.a2a_mtls import A2ATaskClient, A2ATaskServer
def failing_executor(task: dict[str, object]) -> dict[str, object]:
raise RuntimeError("boom")
port = _find_free_port()
server = A2ATaskServer(
cert=self.srv_crt,
key=self.srv_key,
ca=self.ca_crt,
host="127.0.0.1",
port=port,
executor=failing_executor,
)
with server:
time.sleep(0.1)
client = A2ATaskClient(cert=self.cli_crt, key=self.cli_key, ca=self.ca_crt)
base_url = f"https://127.0.0.1:{port}"
submitted = client.send_task(base_url, task="explode", requester="timmy")
failed = client.wait_for_task(base_url, submitted["taskId"], timeout=5.0, poll_interval=0.05)
assert failed["status"]["state"] == "failed"
assert "boom" in failed["status"]["message"]

View File

@@ -1,95 +0,0 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from unittest.mock import patch
import pytest
def test_cmd_send_uses_registry_and_waits_for_terminal_task(tmp_path, monkeypatch, capsys):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "a2a_agents.json").write_text(
json.dumps({"allegro": {"url": "https://127.0.0.1:9443"}}),
encoding="utf-8",
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
class FakeClient:
def __init__(self, **kwargs):
self.kwargs = kwargs
def discover_card(self, base_url: str):
assert base_url == "https://127.0.0.1:9443"
return {"name": "allegro", "url": base_url}
def send_task(self, base_url: str, *, task: str, requester: str | None = None, metadata=None):
assert task == "analyze README"
return {"taskId": "task-123", "status": {"state": "submitted"}}
def wait_for_task(self, base_url: str, task_id: str, *, timeout: float, poll_interval: float):
assert task_id == "task-123"
return {
"taskId": task_id,
"status": {"state": "completed"},
"artifacts": [{"text": "README looks healthy"}],
}
args = argparse.Namespace(
a2a_command="send",
agent="allegro",
task="analyze README",
url=None,
wait=True,
timeout=5.0,
poll_interval=0.01,
requester="timmy",
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with patch("hermes_cli.a2a_cmd.A2ATaskClient", FakeClient):
cmd_a2a(args)
result = json.loads(capsys.readouterr().out)
assert result["agent"] == "allegro"
assert result["card"]["name"] == "allegro"
assert result["task"]["status"]["state"] == "completed"
assert result["task"]["artifacts"][0]["text"] == "README looks healthy"
def test_resolve_agent_url_supports_env_override(monkeypatch):
monkeypatch.setenv("HERMES_A2A_ALLEGRO_URL", "https://fleet-allegro:9443")
from hermes_cli.a2a_cmd import resolve_agent_url
assert resolve_agent_url("allegro") == "https://fleet-allegro:9443"
def test_cmd_send_requires_known_agent(tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_cli.a2a_cmd import cmd_a2a
args = argparse.Namespace(
a2a_command="send",
agent="unknown",
task="do work",
url=None,
wait=False,
timeout=5.0,
poll_interval=0.05,
requester=None,
cert="cert.pem",
key="key.pem",
ca="ca.pem",
)
with pytest.raises(SystemExit):
cmd_a2a(args)