Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
a23f54d15f fix: add numbered approval and clarify shortcuts (#956)
All checks were successful
Lint / lint (pull_request) Successful in 21s
2026-04-22 10:44:09 -04:00
Alexander Whitestone
ffb3257cb5 fix: repair auxiliary client syntax corruption 2026-04-22 10:44:00 -04:00
5 changed files with 416 additions and 595 deletions

View File

@@ -1,70 +1,43 @@
from __future__ import annotations
"""
A2A agent card generation for fleet discovery.
Agent Card — A2A-compliant agent discovery.
Part of #843: fix: implement A2A agent card for fleet discovery (#819)
Refs #801.
Closes #802.
Provides metadata about the agent's identity, capabilities, and installed skills
for discovery by other agents in the fleet.
"""
import argparse
import json
import logging
import os
import socket
import sys
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Sequence
from urllib.parse import urlparse, urlunparse
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_cli import __version__
from hermes_cli.config import load_config
from hermes_cli.config import load_config, get_hermes_home
from agent.skill_utils import (
get_all_skills_dirs,
get_disabled_skill_names,
iter_skill_index_files,
parse_frontmatter,
skill_matches_platform,
get_all_skills_dirs,
get_disabled_skill_names,
skill_matches_platform
)
logger = logging.getLogger(__name__)
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
DEFAULT_INPUT_MODES = ["text/plain", "application/json"]
DEFAULT_OUTPUT_MODES = ["text/plain", "application/json"]
_REQUIRED_CAPABILITY_FLAGS = (
"streaming",
"pushNotifications",
"stateTransitionHistory",
)
@dataclass
class AgentSkill:
id: str
name: str
description: str = ""
tags: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"id": self.id, "name": self.name}
if self.description:
data["description"] = self.description
if self.tags:
data["tags"] = self.tags
return data
version: str = "1.0.0"
@dataclass
class AgentCapabilities:
streaming: bool = True
pushNotifications: bool = False
stateTransitionHistory: bool = True
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
tools: bool = True
vision: bool = False
reasoning: bool = False
@dataclass
class AgentCard:
@@ -74,81 +47,14 @@ class AgentCard:
version: str = __version__
capabilities: AgentCapabilities = field(default_factory=AgentCapabilities)
skills: List[AgentSkill] = field(default_factory=list)
defaultInputModes: List[str] = field(default_factory=lambda: list(DEFAULT_INPUT_MODES))
defaultOutputModes: List[str] = field(default_factory=lambda: list(DEFAULT_OUTPUT_MODES))
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {
"name": self.name,
"description": self.description,
"url": self.url,
"version": self.version,
"capabilities": self.capabilities.to_dict(),
"skills": [skill.to_dict() for skill in self.skills],
"defaultInputModes": list(self.defaultInputModes),
"defaultOutputModes": list(self.defaultOutputModes),
}
if self.metadata:
data["metadata"] = dict(self.metadata)
return data
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent)
def _env_or_empty(key: str) -> str:
return os.environ.get(key, "").strip()
def _as_agent_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
agent_cfg = config.get("agent")
return dict(agent_cfg) if isinstance(agent_cfg, Mapping) else {}
def _as_a2a_config(config: Mapping[str, Any] | None) -> Dict[str, Any]:
if not isinstance(config, Mapping):
return {}
a2a_cfg = config.get("a2a")
return dict(a2a_cfg) if isinstance(a2a_cfg, Mapping) else {}
def _normalize_string_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, str):
parts = value.split(",")
elif isinstance(value, Sequence) and not isinstance(value, (bytes, bytearray, str)):
parts = list(value)
else:
parts = [value]
out: List[str] = []
seen = set()
for item in parts:
text = str(item).strip()
if not text or text in seen:
continue
seen.add(text)
out.append(text)
return out
def _normalize_skill_tags(frontmatter: Mapping[str, Any]) -> List[str]:
tags = _normalize_string_list(frontmatter.get("tags"))
category = str(frontmatter.get("category") or "").strip()
if category and category not in tags:
tags.append(category)
return tags
defaultInputModes: List[str] = field(default_factory=lambda: ["text/plain"])
defaultOutputModes: List[str] = field(default_factory=lambda: ["text/plain"])
def _load_skills() -> List[AgentSkill]:
"""Scan enabled skills and return A2A skill metadata."""
skills: List[AgentSkill] = []
"""Scan all enabled skills and return metadata."""
skills = []
disabled = get_disabled_skill_names()
seen_ids = set()
for skills_dir in get_all_skills_dirs():
if not skills_dir.is_dir():
continue
@@ -159,262 +65,71 @@ def _load_skills() -> List[AgentSkill]:
except Exception:
continue
skill_name = frontmatter.get("name") or skill_file.parent.name
if str(skill_name) in disabled:
continue
if not skill_matches_platform(frontmatter):
continue
skill_id = str(frontmatter.get("name") or skill_file.parent.name).strip().lower().replace(" ", "-")
if skill_id in disabled or skill_id in seen_ids:
continue
seen_ids.add(skill_id)
skills.append(AgentSkill(
id=str(skill_name),
name=str(frontmatter.get("name", skill_name)),
description=str(frontmatter.get("description", "")),
version=str(frontmatter.get("version", "1.0.0"))
))
return skills
display_name = str(frontmatter.get("title") or frontmatter.get("name") or skill_file.parent.name).strip()
description = str(frontmatter.get("description") or "").strip()
tags = _normalize_skill_tags(frontmatter)
skills.append(
AgentSkill(
id=skill_id,
name=display_name,
description=description,
tags=tags,
)
)
def build_agent_card() -> AgentCard:
"""Build the agent card from current configuration and environment."""
config = load_config()
# Identity
name = os.environ.get("HERMES_AGENT_NAME") or config.get("agent", {}).get("name") or "hermes"
description = os.environ.get("HERMES_AGENT_DESCRIPTION") or config.get("agent", {}).get("description") or "Sovereign AI agent"
# URL - try to determine from environment or config
port = os.environ.get("HERMES_WEB_PORT") or "9119"
host = os.environ.get("HERMES_WEB_HOST") or "localhost"
url = f"http://{host}:{port}"
# Capabilities
# In a real scenario, we'd check model metadata for vision/reasoning
capabilities = AgentCapabilities(
streaming=True,
tools=True,
vision=False, # Default to false unless we can confirm
reasoning=False
)
# Skills
skills = _load_skills()
return AgentCard(
name=name,
description=description,
url=url,
version=__version__,
capabilities=capabilities,
skills=skills
)
return sorted(skills, key=lambda skill: skill.id)
def _get_agent_name(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_name = _env_or_empty("HERMES_AGENT_NAME") or _env_or_empty("AGENT_NAME")
if env_name:
return env_name
agent_cfg = _as_agent_config(config)
if agent_cfg.get("name"):
return str(agent_cfg["name"]).strip()
def get_agent_card_json() -> str:
"""Return the agent card as a JSON string."""
try:
hostname = socket.gethostname().split(".", 1)[0].strip()
if hostname:
return hostname
except Exception:
pass
return "hermes"
def _get_description(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return override
env_description = _env_or_empty("HERMES_AGENT_DESCRIPTION") or _env_or_empty("AGENT_DESCRIPTION")
if env_description:
return env_description
agent_cfg = _as_agent_config(config)
if agent_cfg.get("description"):
return str(agent_cfg["description"]).strip()
return DEFAULT_DESCRIPTION
def _normalize_a2a_url(url: str) -> str:
raw = (url or "").strip()
if not raw:
return ""
parsed = urlparse(raw if "://" in raw else f"https://{raw}")
scheme = parsed.scheme or "https"
netloc = parsed.netloc or parsed.path
path = parsed.path if parsed.netloc else ""
normalized_path = path.rstrip("/") if path not in ("", "/") else ""
if not normalized_path.endswith("/a2a"):
normalized_path = f"{normalized_path}/a2a" if normalized_path else "/a2a"
return urlunparse((scheme, netloc, normalized_path, "", "", ""))
def _get_agent_url(config: Mapping[str, Any] | None, override: str | None = None) -> str:
if override:
return _normalize_a2a_url(override)
agent_cfg = _as_agent_config(config)
a2a_cfg = _as_a2a_config(config)
explicit = (
_env_or_empty("HERMES_A2A_PUBLIC_URL")
or str(a2a_cfg.get("public_url") or "").strip()
or str(agent_cfg.get("a2a_public_url") or "").strip()
)
if explicit:
return _normalize_a2a_url(explicit)
host = (
_env_or_empty("HERMES_A2A_HOST")
or str(a2a_cfg.get("host") or "").strip()
or _env_or_empty("HERMES_WEB_HOST")
or str(agent_cfg.get("host") or "").strip()
or "localhost"
)
port = (
_env_or_empty("HERMES_A2A_PORT")
or str(a2a_cfg.get("port") or "").strip()
or _env_or_empty("HERMES_WEB_PORT")
or str(agent_cfg.get("port") or "").strip()
or "9119"
)
scheme = (
_env_or_empty("HERMES_A2A_SCHEME")
or str(a2a_cfg.get("scheme") or "").strip()
or ("https" if (_env_or_empty("HERMES_MTLS_CERT") or str(port) == "9443") else "http")
)
return _normalize_a2a_url(f"{scheme}://{host}:{port}")
def _merge_skills(base_skills: Iterable[AgentSkill], extra_skills: Iterable[AgentSkill] | None = None) -> List[AgentSkill]:
merged: Dict[str, AgentSkill] = {}
for skill in list(base_skills) + list(extra_skills or []):
if skill.id not in merged:
merged[skill.id] = skill
return [merged[key] for key in sorted(merged)]
def build_agent_card(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
extra_skills: Iterable[AgentSkill] | None = None,
metadata: Mapping[str, Any] | None = None,
) -> AgentCard:
"""Build an A2A-compliant agent card from config, env, and installed skills."""
try:
config = load_config()
except Exception as exc:
logger.debug("Falling back to empty config while building agent card: %s", exc)
config = {}
card = AgentCard(
name=_get_agent_name(config, override=name),
description=_get_description(config, override=description),
url=_get_agent_url(config, override=url),
skills=_merge_skills(_load_skills(), extra_skills),
metadata=dict(metadata or {}),
)
return card
def validate_agent_card(card: AgentCard | Dict[str, Any]) -> List[str]:
"""Return a list of schema-validation errors for an agent card."""
data = card.to_dict() if isinstance(card, AgentCard) else dict(card)
errors: List[str] = []
for field_name in ("name", "description", "url", "version"):
value = data.get(field_name)
if not isinstance(value, str) or not value.strip():
errors.append(f"{field_name} must be a non-empty string")
url_value = str(data.get("url") or "")
parsed = urlparse(url_value)
if not parsed.scheme or not parsed.netloc:
errors.append("url must be an absolute http/https URL")
elif parsed.scheme not in {"http", "https"}:
errors.append("url must use http or https")
elif not parsed.path.rstrip("/").endswith("/a2a"):
errors.append("url must point to the /a2a endpoint")
capabilities = data.get("capabilities")
if not isinstance(capabilities, Mapping):
errors.append("capabilities must be an object")
else:
for capability_name in _REQUIRED_CAPABILITY_FLAGS:
if not isinstance(capabilities.get(capability_name), bool):
errors.append(f"capabilities.{capability_name} must be a boolean")
for field_name, required_modes in (
("defaultInputModes", DEFAULT_INPUT_MODES),
("defaultOutputModes", DEFAULT_OUTPUT_MODES),
):
modes = data.get(field_name)
if not isinstance(modes, list) or not modes:
errors.append(f"{field_name} must be a non-empty list of MIME types")
continue
for mode in modes:
if not isinstance(mode, str) or "/" not in mode:
errors.append(f"{field_name} entries must be MIME types")
for required_mode in required_modes:
if required_mode not in modes:
errors.append(f"{field_name} must include {required_mode}")
skills = data.get("skills")
if not isinstance(skills, list):
errors.append("skills must be a list")
else:
for index, skill in enumerate(skills):
if not isinstance(skill, Mapping):
errors.append(f"skills[{index}] must be an object")
continue
if not str(skill.get("id") or "").strip():
errors.append(f"skills[{index}] missing id")
if not str(skill.get("name") or "").strip():
errors.append(f"skills[{index}] missing name")
tags = skill.get("tags", [])
if tags is None:
tags = []
if not isinstance(tags, list):
errors.append(f"skills[{index}].tags must be a list")
else:
for tag in tags:
if not isinstance(tag, str) or not tag.strip():
errors.append(f"skills[{index}].tags entries must be non-empty strings")
metadata = data.get("metadata")
if metadata is not None and not isinstance(metadata, Mapping):
errors.append("metadata must be an object when present")
return errors
def get_agent_card_json(
*,
name: str | None = None,
description: str | None = None,
url: str | None = None,
metadata: Mapping[str, Any] | None = None,
indent: int = 2,
) -> str:
"""Return the local agent card as JSON, falling back to an error card on failure."""
try:
card = build_agent_card(name=name, description=description, url=url, metadata=metadata)
errors = validate_agent_card(card)
if errors:
raise ValueError("; ".join(errors))
return card.to_json(indent=indent)
except Exception as exc:
logger.error("Failed to build agent card: %s", exc)
card = build_agent_card()
return json.dumps(asdict(card), indent=2)
except Exception as e:
logger.error(f"Failed to build agent card: {e}")
# Minimal fallback card
fallback = {
"name": name or _env_or_empty("HERMES_AGENT_NAME") or "hermes",
"description": "Sovereign AI agent (agent card fallback)",
"url": url or "http://localhost:9119/a2a",
"name": "hermes",
"description": "Sovereign AI agent (fallback)",
"version": __version__,
"capabilities": AgentCapabilities().to_dict(),
"skills": [],
"defaultInputModes": list(DEFAULT_INPUT_MODES),
"defaultOutputModes": list(DEFAULT_OUTPUT_MODES),
"error": str(exc),
"error": str(e)
}
return json.dumps(fallback, indent=indent)
return json.dumps(fallback, indent=2)
def main(argv: Sequence[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Generate an A2A-compliant Hermes agent card")
parser.add_argument("--name", help="Override the agent name")
parser.add_argument("--description", help="Override the agent description")
parser.add_argument("--url", help="Override the public A2A URL")
parser.add_argument("--validate", action="store_true", help="Validate before printing; exit 1 on schema errors")
args = parser.parse_args(list(argv) if argv is not None else None)
card = build_agent_card(name=args.name, description=args.description, url=args.url)
errors = validate_agent_card(card)
if args.validate and errors:
for error in errors:
print(error, file=sys.stderr)
return 1
print(card.to_json(indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
def validate_agent_card(card_data: Dict[str, Any]) -> bool:
"""Check if the card data complies with the A2A schema."""
required = ["name", "description", "url", "version"]
return all(k in card_data for k in required)

View File

@@ -1,4 +1,4 @@
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
"""Shared auxiliary client router for side tasks.
Provides a single resolution chain so every consumer (context compression,
session search, web extraction, vision analysis, browser vision) picks up
@@ -34,6 +34,8 @@ Payment / credit exhaustion fallback:
their OpenRouter balance but has Codex OAuth or another provider available.
"""
from agent.telemetry_logger import log_token_usage
import json
import logging
import os
@@ -396,7 +398,8 @@ class _CodexCompletionsAdapter:
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
completion_tokens=getattr(resp_usage, "output_tokens", 0),
total_tokens=getattr(resp_usage, "total_tokens", 0),
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
except Exception as exc:
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
raise
@@ -529,7 +532,8 @@ class _AnthropicCompletionsAdapter:
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
choice = SimpleNamespace(
index=0,

240
cli.py
View File

@@ -7254,6 +7254,40 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
def _handle_clarify_selection(self) -> None:
"""Process the currently selected clarify choice."""
state = self._clarify_state
if not state or self._clarify_freetext:
return
selected = state.get("selected", 0)
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
self._clarify_freetext = False
self._invalidate()
return
if selected == len(choices):
self._clarify_freetext = True
self._invalidate()
def _handle_clarify_number_shortcut(self, number: int) -> bool:
"""Select a clarify option by number key."""
state = self._clarify_state
if not state or self._clarify_freetext:
return False
choices = state.get("choices") or []
max_option = len(choices) + 1
if number < 1 or number > max_option:
return False
state["selected"] = number - 1
self._handle_clarify_selection()
return True
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@@ -7362,6 +7396,20 @@ class HermesCLI:
choices.append("view")
return choices
def _handle_approval_number_shortcut(self, number: int) -> bool:
"""Select an approval option by number key."""
state = self._approval_state
if not state:
return False
choices = state.get("choices") or []
if number < 1 or number > len(choices):
return False
state["selected"] = number - 1
self._handle_approval_selection()
return True
def _handle_approval_selection(self) -> None:
"""Process the currently selected dangerous-command approval choice."""
state = self._approval_state
@@ -7437,8 +7485,9 @@ class HermesCLI:
preview_lines.extend(_wrap_panel_text(cmd_display, 60))
for i, choice in enumerate(choices):
prefix = ' ' if i == selected else ' '
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
preview_lines.extend(_wrap_panel_text(
f"{prefix}{choice_labels.get(choice, choice)}",
f"{prefix}{label}",
60,
subsequent_indent=" ",
))
@@ -7456,7 +7505,7 @@ class HermesCLI:
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for i, choice in enumerate(choices):
label = choice_labels.get(choice, choice)
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
prefix = ' ' if i == selected else ' '
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
@@ -7465,6 +7514,97 @@ class HermesCLI:
lines.append(('class:approval-border', '' + ('' * box_width) + '\n'))
return lines
def _get_clarify_display_fragments(self):
"""Render the clarify panel for the prompt_toolkit UI."""
state = self._clarify_state
if not state:
return []
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
term_cols = shutil.get_terminal_size((100, 20)).columns
longest = max([len(title)] + [len(line) for line in content_lines] + [min_width - 4])
inner = min(max(longest + 4, min_width - 2), max_width - 2, max(24, term_cols - 6))
return inner + 2
def _wrap_panel_text(text: str, width: int, subsequent_indent: str = "") -> list[str]:
wrapped = textwrap.wrap(
text,
width=max(8, width),
break_long_words=False,
break_on_hyphens=False,
subsequent_indent=subsequent_indent,
)
return wrapped or [""]
def _append_panel_line(lines, border_style: str, content_style: str, text: str, box_width: int) -> None:
inner_width = max(0, box_width - 2)
lines.append((border_style, ""))
lines.append((content_style, text.ljust(inner_width)))
lines.append((border_style, "\n"))
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
lines.append((border_style, "" + (" " * box_width) + "\n"))
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not self._clarify_freetext else " "
label = f"{i + 1}. {choice}"
preview_lines.extend(_wrap_panel_text(f"{prefix}{label}", 60, subsequent_indent=" "))
other_number = len(choices) + 1
other_label = (
f" {other_number}. Other (type below)" if self._clarify_freetext
else f" {other_number}. Other (type your answer)" if selected == len(choices)
else f" {other_number}. Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if self._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not self._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not self._clarify_freetext else ' '
label = f"{i + 1}. {choice}"
wrapped_lines = _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
other_idx = len(choices)
if selected == other_idx and not self._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = f' {other_number}. Other (type your answer)'
elif self._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = f' {other_number}. Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = f' {other_number}. Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
def _secret_capture_callback(self, var_name: str, prompt: str, metadata=None) -> dict:
return prompt_for_secret(self, var_name, prompt, metadata)
@@ -8371,17 +8511,8 @@ class HermesCLI:
# --- Clarify choice mode: confirm the highlighted selection ---
if self._clarify_state and not self._clarify_freetext:
state = self._clarify_state
selected = state["selected"]
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
event.app.invalidate()
else:
# "Other" selected → switch to freetext
self._clarify_freetext = True
event.app.invalidate()
self._handle_clarify_selection()
event.app.invalidate()
return
# --- Normal input routing ---
@@ -8501,6 +8632,19 @@ class HermesCLI:
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
event.app.invalidate()
# --- Numbered shortcuts for clarify / approval modal prompts ---
for _digit in '123456789':
@kb.add(_digit, filter=Condition(lambda: bool(self._approval_state) or (bool(self._clarify_state) and not self._clarify_freetext)))
def _handle_modal_number(event, digit=_digit):
number = int(digit)
handled = False
if self._approval_state:
handled = self._handle_approval_number_shortcut(number)
elif self._clarify_state and not self._clarify_freetext:
handled = self._handle_clarify_number_shortcut(number)
if handled:
event.app.invalidate()
# --- /model picker: arrow-key navigation ---
@kb.add('up', filter=Condition(lambda: bool(self._model_picker_state)))
def model_picker_up(event):
@@ -8995,7 +9139,7 @@ class HermesCLI:
if cli_ref._approval_state:
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', f' ({remaining}s)'),
]
@@ -9008,7 +9152,7 @@ class HermesCLI:
('class:clarify-countdown', countdown),
]
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', countdown),
]
@@ -9086,71 +9230,7 @@ class HermesCLI:
lines.append((border_style, "" + (" " * box_width) + "\n"))
def _get_clarify_display():
"""Build styled text for the clarify question/choices panel."""
state = cli_ref._clarify_state
if not state:
return []
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not cli_ref._clarify_freetext else " "
preview_lines.extend(_wrap_panel_text(f"{prefix}{choice}", 60, subsequent_indent=" "))
other_label = (
" Other (type below)" if cli_ref._clarify_freetext
else " Other (type your answer)" if selected == len(choices)
else " Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
# Box top border
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
# Question text
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if cli_ref._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
# Multiple-choice mode: show selectable options
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not cli_ref._clarify_freetext else ' '
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
# "Other" option (5th line, only shown when choices exist)
other_idx = len(choices)
if selected == other_idx and not cli_ref._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = ' Other (type your answer)'
elif cli_ref._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = ' Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = ' Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
return cli_ref._get_clarify_display_fragments()
clarify_widget = ConditionalContainer(
Window(

View File

@@ -1,150 +0,0 @@
from __future__ import annotations
import json
from pathlib import Path
import pytest
from agent import agent_card as mod
DEFAULT_DESCRIPTION = "Sovereign AI agent — orchestration, code, research"
def _set_base_context(monkeypatch, *, name: str = "Timmy", description: str = DEFAULT_DESCRIPTION, url: str = "https://timmy.local:9443/a2a", skills=None):
monkeypatch.setattr(mod, "load_config", lambda: {"agent": {"name": name, "description": description}})
monkeypatch.setattr(
mod,
"_load_skills",
lambda: list(
skills
if skills is not None
else [
mod.AgentSkill(
id="code",
name="Code Implementation",
description="Implement and patch code",
tags=["python", "gitea"],
)
]
),
)
monkeypatch.setenv("HERMES_A2A_PUBLIC_URL", url)
monkeypatch.delenv("HERMES_AGENT_NAME", raising=False)
monkeypatch.delenv("AGENT_NAME", raising=False)
monkeypatch.delenv("HERMES_AGENT_DESCRIPTION", raising=False)
monkeypatch.delenv("AGENT_DESCRIPTION", raising=False)
def test_build_agent_card_matches_issue_802_schema(monkeypatch):
_set_base_context(monkeypatch)
card = mod.build_agent_card()
payload = card.to_dict()
assert payload["name"] == "Timmy"
assert payload["description"] == DEFAULT_DESCRIPTION
assert payload["url"] == "https://timmy.local:9443/a2a"
assert payload["capabilities"] == {
"streaming": True,
"pushNotifications": False,
"stateTransitionHistory": True,
}
assert payload["defaultInputModes"] == ["text/plain", "application/json"]
assert payload["defaultOutputModes"] == ["text/plain", "application/json"]
assert payload["skills"][0]["tags"] == ["python", "gitea"]
assert mod.validate_agent_card(payload) == []
@pytest.mark.parametrize(
("name", "url"),
[
("Timmy", "https://timmy.local:9443/a2a"),
("Allegro", "https://allegro.local:9443/a2a"),
("Ezra", "https://ezra.local:9443/a2a"),
],
)
def test_build_agent_card_supports_fleet_members(monkeypatch, name, url):
_set_base_context(monkeypatch, name=name, url=url, skills=[])
payload = mod.build_agent_card().to_dict()
assert payload["name"] == name
assert payload["url"] == url
assert mod.validate_agent_card(payload) == []
def test_load_skills_collects_tags_and_category(monkeypatch, tmp_path):
skill_root = tmp_path / "skills"
skill_dir = skill_root / "code-implementation"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"""---
name: Code Implementation
description: Implement and patch code
tags: [python, gitea]
category: discovery
---
# Code Implementation
""",
encoding="utf-8",
)
monkeypatch.setattr(mod, "get_all_skills_dirs", lambda: [skill_root])
monkeypatch.setattr(mod, "get_disabled_skill_names", lambda: set())
monkeypatch.setattr(mod, "skill_matches_platform", lambda _frontmatter: True)
skills = mod._load_skills()
assert len(skills) == 1
assert skills[0].id == "code-implementation"
assert skills[0].name == "Code Implementation"
assert skills[0].description == "Implement and patch code"
assert skills[0].tags == ["python", "gitea", "discovery"]
def test_validate_agent_card_reports_schema_errors():
errors = mod.validate_agent_card(
{
"name": "",
"description": "",
"url": "timmy.local",
"version": "",
"capabilities": {"streaming": True},
"skills": [{"id": "", "name": "", "tags": "python"}],
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["plain"],
"metadata": [],
}
)
assert any("name must be a non-empty string" in error for error in errors)
assert any("url must be an absolute http/https URL" in error for error in errors)
assert any("capabilities.pushNotifications" in error for error in errors)
assert any("skills[0] missing id" in error for error in errors)
assert any("skills[0].tags must be a list" in error for error in errors)
assert any("defaultInputModes must include application/json" in error for error in errors)
assert any("defaultOutputModes entries must be MIME types" in error for error in errors)
assert any("metadata must be an object" in error for error in errors)
def test_get_agent_card_json_emits_valid_json(monkeypatch):
_set_base_context(monkeypatch)
payload = json.loads(mod.get_agent_card_json())
assert payload["name"] == "Timmy"
assert mod.validate_agent_card(payload) == []
def test_main_validate_prints_card(monkeypatch, capsys):
_set_base_context(monkeypatch)
exit_code = mod.main(["--validate"])
captured = capsys.readouterr()
assert exit_code == 0
payload = json.loads(captured.out)
assert payload["url"] == "https://timmy.local:9443/a2a"
assert captured.err == ""

View File

@@ -0,0 +1,172 @@
import queue
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock
from cli import HermesCLI
class _FakeBuffer:
def __init__(self, text="", cursor_position=None):
self.text = text
self.cursor_position = len(text) if cursor_position is None else cursor_position
def reset(self, append_to_history=False):
self.text = ""
self.cursor_position = 0
def _make_cli_stub():
cli = HermesCLI.__new__(HermesCLI)
cli._approval_state = None
cli._approval_deadline = 0
cli._approval_lock = threading.Lock()
cli._clarify_state = None
cli._clarify_freetext = False
cli._clarify_deadline = 0
cli._sudo_state = None
cli._sudo_deadline = 0
cli._secret_state = None
cli._secret_deadline = 0
cli._modal_input_snapshot = None
cli._invalidate = MagicMock()
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
return cli
def test_approval_display_numbers_choices():
cli = _make_cli_stub()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_approval_display_fragments())
assert " 1. Allow once" in rendered
assert "2. Allow for this session" in rendered
assert "3. Add to permanent allowlist" in rendered
assert "4. Deny" in rendered
def test_approval_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(2) is True
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_approval_selection()
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_number_shortcut_handles_view_in_place():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
"description": "disk copy",
"choices": ["once", "session", "always", "deny", "view"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(5) is True
assert cli._approval_state is not None
assert cli._approval_state["show_full"] is True
assert "view" not in cli._approval_state["choices"]
assert cli._approval_state["selected"] == 3
assert response_queue.empty()
def test_clarify_display_numbers_choices_and_other():
cli = _make_cli_stub()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma", "Delta"],
"selected": 1,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_clarify_display_fragments())
assert "1. Alpha" in rendered
assert " 2. Beta" in rendered
assert "3. Gamma" in rendered
assert "4. Delta" in rendered
assert "5. Other (type your answer)" in rendered
def test_clarify_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(3) is True
assert response_queue.get_nowait() == "Gamma"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_clarify_selection()
assert response_queue.get_nowait() == "Beta"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_number_shortcut_activates_other_freetext():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(4) is True
assert cli._clarify_state is not None
assert cli._clarify_state["selected"] == 3
assert cli._clarify_freetext is True
assert response_queue.empty()