fix(cli): non-blocking startup update check and banner deduplication
- Add background thread mechanism (prefetch_update_check/get_update_result) so git fetch runs in parallel with skill sync and agent init - Fix repo path fallback in check_for_updates() for dev installs - Remove duplicate build_welcome_banner (~180 lines) and _format_context_length from cli.py — the banner.py version is now the single source of truth - Port skin banner_hero/banner_logo support and terminal width check from cli.py's version into banner.py - Add update status output to hermes version command - Add unit tests for update check, prefetch, and version string
This commit is contained in:
227
cli.py
227
cli.py
@@ -454,7 +454,6 @@ from model_tools import get_tool_definitions, get_toolset_for_tool
|
||||
from hermes_cli.banner import (
|
||||
cprint as _cprint, _GOLD, _BOLD, _DIM, _RST,
|
||||
VERSION, RELEASE_DATE, HERMES_AGENT_LOGO, HERMES_CADUCEUS, COMPACT_BANNER,
|
||||
get_available_skills as _get_available_skills,
|
||||
build_welcome_banner,
|
||||
)
|
||||
from hermes_cli.commands import COMMANDS, SlashCommandCompleter
|
||||
@@ -845,232 +844,6 @@ def _build_compact_banner() -> str:
|
||||
)
|
||||
|
||||
|
||||
def _get_available_skills() -> Dict[str, List[str]]:
|
||||
"""
|
||||
Scan ~/.hermes/skills/ and return skills grouped by category.
|
||||
|
||||
Returns:
|
||||
Dict mapping category name to list of skill names
|
||||
"""
|
||||
import os
|
||||
|
||||
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
|
||||
skills_dir = hermes_home / "skills"
|
||||
skills_by_category = {}
|
||||
|
||||
if not skills_dir.exists():
|
||||
return skills_by_category
|
||||
|
||||
for skill_file in skills_dir.rglob("SKILL.md"):
|
||||
rel_path = skill_file.relative_to(skills_dir)
|
||||
parts = rel_path.parts
|
||||
|
||||
if len(parts) >= 2:
|
||||
category = parts[0]
|
||||
skill_name = parts[-2]
|
||||
else:
|
||||
category = "general"
|
||||
skill_name = skill_file.parent.name
|
||||
|
||||
skills_by_category.setdefault(category, []).append(skill_name)
|
||||
|
||||
return skills_by_category
|
||||
|
||||
|
||||
def _format_context_length(tokens: int) -> str:
|
||||
"""Format a token count for display (e.g. 128000 → '128K', 1048576 → '1M')."""
|
||||
if tokens >= 1_000_000:
|
||||
val = tokens / 1_000_000
|
||||
return f"{val:g}M"
|
||||
elif tokens >= 1_000:
|
||||
val = tokens / 1_000
|
||||
return f"{val:g}K"
|
||||
return str(tokens)
|
||||
|
||||
|
||||
def build_welcome_banner(console: Console, model: str, cwd: str, tools: List[dict] = None, enabled_toolsets: List[str] = None, session_id: str = None, context_length: int = None):
|
||||
"""
|
||||
Build and print a Claude Code-style welcome banner with caduceus on left and info on right.
|
||||
|
||||
Args:
|
||||
console: Rich Console instance for printing
|
||||
model: The current model name (e.g., "anthropic/claude-opus-4")
|
||||
cwd: Current working directory
|
||||
tools: List of tool definitions
|
||||
enabled_toolsets: List of enabled toolset names
|
||||
session_id: Unique session identifier for logging
|
||||
context_length: Model's context window size in tokens
|
||||
"""
|
||||
from model_tools import check_tool_availability, TOOLSET_REQUIREMENTS
|
||||
|
||||
tools = tools or []
|
||||
enabled_toolsets = enabled_toolsets or []
|
||||
|
||||
# Get unavailable tools info for coloring
|
||||
_, unavailable_toolsets = check_tool_availability(quiet=True)
|
||||
disabled_tools = set()
|
||||
for item in unavailable_toolsets:
|
||||
disabled_tools.update(item.get("tools", []))
|
||||
|
||||
# Build the side-by-side content using a table for precise control
|
||||
layout_table = Table.grid(padding=(0, 2))
|
||||
layout_table.add_column("left", justify="center")
|
||||
layout_table.add_column("right", justify="left")
|
||||
|
||||
# Build left content: caduceus + model info
|
||||
# Resolve skin colors for the banner
|
||||
try:
|
||||
from hermes_cli.skin_engine import get_active_skin
|
||||
_bskin = get_active_skin()
|
||||
_accent = _bskin.get_color("banner_accent", "#FFBF00")
|
||||
_dim = _bskin.get_color("banner_dim", "#B8860B")
|
||||
_text = _bskin.get_color("banner_text", "#FFF8DC")
|
||||
_session_c = _bskin.get_color("session_border", "#8B8682")
|
||||
_title_c = _bskin.get_color("banner_title", "#FFD700")
|
||||
_border_c = _bskin.get_color("banner_border", "#CD7F32")
|
||||
_agent_name = _bskin.get_branding("agent_name", "Hermes Agent")
|
||||
except Exception:
|
||||
_bskin = None
|
||||
_accent, _dim, _text = "#FFBF00", "#B8860B", "#FFF8DC"
|
||||
_session_c, _title_c, _border_c = "#8B8682", "#FFD700", "#CD7F32"
|
||||
_agent_name = "Hermes Agent"
|
||||
|
||||
_hero = _bskin.banner_hero if hasattr(_bskin, 'banner_hero') and _bskin.banner_hero else HERMES_CADUCEUS
|
||||
left_lines = ["", _hero, ""]
|
||||
|
||||
# Shorten model name for display
|
||||
model_short = model.split("/")[-1] if "/" in model else model
|
||||
if len(model_short) > 28:
|
||||
model_short = model_short[:25] + "..."
|
||||
|
||||
ctx_str = f" [dim {_dim}]·[/] [dim {_dim}]{_format_context_length(context_length)} context[/]" if context_length else ""
|
||||
left_lines.append(f"[{_accent}]{model_short}[/]{ctx_str} [dim {_dim}]·[/] [dim {_dim}]Nous Research[/]")
|
||||
left_lines.append(f"[dim {_dim}]{cwd}[/]")
|
||||
|
||||
# Add session ID if provided
|
||||
if session_id:
|
||||
left_lines.append(f"[dim {_session_c}]Session: {session_id}[/]")
|
||||
left_content = "\n".join(left_lines)
|
||||
|
||||
# Build right content: tools list grouped by toolset
|
||||
right_lines = []
|
||||
right_lines.append(f"[bold {_accent}]Available Tools[/]")
|
||||
|
||||
# Group tools by toolset (include all possible tools, both enabled and disabled)
|
||||
toolsets_dict = {}
|
||||
|
||||
# First, add all enabled tools
|
||||
for tool in tools:
|
||||
tool_name = tool["function"]["name"]
|
||||
toolset = get_toolset_for_tool(tool_name) or "other"
|
||||
if toolset not in toolsets_dict:
|
||||
toolsets_dict[toolset] = []
|
||||
toolsets_dict[toolset].append(tool_name)
|
||||
|
||||
# Also add disabled toolsets so they show in the banner
|
||||
for item in unavailable_toolsets:
|
||||
# Map the internal toolset ID to display name
|
||||
toolset_id = item.get("id", item.get("name", "unknown"))
|
||||
display_name = f"{toolset_id}_tools" if not toolset_id.endswith("_tools") else toolset_id
|
||||
if display_name not in toolsets_dict:
|
||||
toolsets_dict[display_name] = []
|
||||
for tool_name in item.get("tools", []):
|
||||
if tool_name not in toolsets_dict[display_name]:
|
||||
toolsets_dict[display_name].append(tool_name)
|
||||
|
||||
# Display tools grouped by toolset (compact format, max 8 groups)
|
||||
sorted_toolsets = sorted(toolsets_dict.keys())
|
||||
display_toolsets = sorted_toolsets[:8]
|
||||
remaining_toolsets = len(sorted_toolsets) - 8
|
||||
|
||||
for toolset in display_toolsets:
|
||||
tool_names = toolsets_dict[toolset]
|
||||
# Color each tool name - red if disabled, normal if enabled
|
||||
colored_names = []
|
||||
for name in sorted(tool_names):
|
||||
if name in disabled_tools:
|
||||
colored_names.append(f"[red]{name}[/]")
|
||||
else:
|
||||
colored_names.append(f"[{_text}]{name}[/]")
|
||||
|
||||
tools_str = ", ".join(colored_names)
|
||||
# Truncate if too long (accounting for markup)
|
||||
if len(", ".join(sorted(tool_names))) > 45:
|
||||
# Rebuild with truncation
|
||||
short_names = []
|
||||
length = 0
|
||||
for name in sorted(tool_names):
|
||||
if length + len(name) + 2 > 42:
|
||||
short_names.append("...")
|
||||
break
|
||||
short_names.append(name)
|
||||
length += len(name) + 2
|
||||
# Re-color the truncated list
|
||||
colored_names = []
|
||||
for name in short_names:
|
||||
if name == "...":
|
||||
colored_names.append("[dim]...[/]")
|
||||
elif name in disabled_tools:
|
||||
colored_names.append(f"[red]{name}[/]")
|
||||
else:
|
||||
colored_names.append(f"[{_text}]{name}[/]")
|
||||
tools_str = ", ".join(colored_names)
|
||||
|
||||
right_lines.append(f"[dim {_dim}]{toolset}:[/] {tools_str}")
|
||||
|
||||
if remaining_toolsets > 0:
|
||||
right_lines.append(f"[dim {_dim}](and {remaining_toolsets} more toolsets...)[/]")
|
||||
|
||||
right_lines.append("")
|
||||
|
||||
# Add skills section
|
||||
right_lines.append(f"[bold {_accent}]Available Skills[/]")
|
||||
skills_by_category = _get_available_skills()
|
||||
total_skills = sum(len(s) for s in skills_by_category.values())
|
||||
|
||||
if skills_by_category:
|
||||
for category in sorted(skills_by_category.keys()):
|
||||
skill_names = sorted(skills_by_category[category])
|
||||
# Show first 8 skills, then "..." if more
|
||||
if len(skill_names) > 8:
|
||||
display_names = skill_names[:8]
|
||||
skills_str = ", ".join(display_names) + f" +{len(skill_names) - 8} more"
|
||||
else:
|
||||
skills_str = ", ".join(skill_names)
|
||||
# Truncate if still too long
|
||||
if len(skills_str) > 50:
|
||||
skills_str = skills_str[:47] + "..."
|
||||
right_lines.append(f"[dim {_dim}]{category}:[/] [{_text}]{skills_str}[/]")
|
||||
else:
|
||||
right_lines.append(f"[dim {_dim}]No skills installed[/]")
|
||||
|
||||
right_lines.append("")
|
||||
right_lines.append(f"[dim {_dim}]{len(tools)} tools · {total_skills} skills · /help for commands[/]")
|
||||
|
||||
right_content = "\n".join(right_lines)
|
||||
|
||||
# Add to table
|
||||
layout_table.add_row(left_content, right_content)
|
||||
|
||||
# Wrap in a panel with the title
|
||||
outer_panel = Panel(
|
||||
layout_table,
|
||||
title=f"[bold {_title_c}]{_agent_name} v{VERSION} ({RELEASE_DATE})[/]",
|
||||
border_style=_border_c,
|
||||
padding=(0, 2),
|
||||
)
|
||||
|
||||
# Print the big logo — use skin's custom logo if available
|
||||
console.print()
|
||||
term_width = shutil.get_terminal_size().columns
|
||||
if term_width >= 95:
|
||||
_logo = _bskin.banner_logo if hasattr(_bskin, 'banner_logo') and _bskin.banner_logo else HERMES_AGENT_LOGO
|
||||
console.print(_logo)
|
||||
console.print()
|
||||
|
||||
# Print the panel with caduceus and info
|
||||
console.print(outer_panel)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Skill Slash Commands — dynamic commands generated from installed skills
|
||||
|
||||
@@ -6,7 +6,9 @@ Pure display functions with no HermesCLI state dependency.
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional
|
||||
@@ -143,7 +145,9 @@ def check_for_updates() -> Optional[int]:
|
||||
repo_dir = hermes_home / "hermes-agent"
|
||||
cache_file = hermes_home / ".update_check"
|
||||
|
||||
# Must be a git repo
|
||||
# Must be a git repo — fall back to project root for dev installs
|
||||
if not (repo_dir / ".git").exists():
|
||||
repo_dir = Path(__file__).parent.parent.resolve()
|
||||
if not (repo_dir / ".git").exists():
|
||||
return None
|
||||
|
||||
@@ -190,6 +194,30 @@ def check_for_updates() -> Optional[int]:
|
||||
return behind
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Non-blocking update check
|
||||
# =========================================================================
|
||||
|
||||
_update_result: Optional[int] = None
|
||||
_update_check_done = threading.Event()
|
||||
|
||||
|
||||
def prefetch_update_check():
|
||||
"""Kick off update check in a background daemon thread."""
|
||||
def _run():
|
||||
global _update_result
|
||||
_update_result = check_for_updates()
|
||||
_update_check_done.set()
|
||||
t = threading.Thread(target=_run, daemon=True)
|
||||
t.start()
|
||||
|
||||
|
||||
def get_update_result(timeout: float = 0.5) -> Optional[int]:
|
||||
"""Get result of prefetched check. Returns None if not ready."""
|
||||
_update_check_done.wait(timeout=timeout)
|
||||
return _update_result
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Welcome banner
|
||||
# =========================================================================
|
||||
@@ -245,7 +273,15 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
|
||||
text = _skin_color("banner_text", "#FFF8DC")
|
||||
session_color = _skin_color("session_border", "#8B8682")
|
||||
|
||||
left_lines = ["", HERMES_CADUCEUS, ""]
|
||||
# Use skin's custom caduceus art if provided
|
||||
try:
|
||||
from hermes_cli.skin_engine import get_active_skin
|
||||
_bskin = get_active_skin()
|
||||
_hero = _bskin.banner_hero if hasattr(_bskin, 'banner_hero') and _bskin.banner_hero else HERMES_CADUCEUS
|
||||
except Exception:
|
||||
_bskin = None
|
||||
_hero = HERMES_CADUCEUS
|
||||
left_lines = ["", _hero, ""]
|
||||
model_short = model.split("/")[-1] if "/" in model else model
|
||||
if len(model_short) > 28:
|
||||
model_short = model_short[:25] + "..."
|
||||
@@ -360,9 +396,9 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
|
||||
summary_parts.append("/help for commands")
|
||||
right_lines.append(f"[dim {dim}]{' · '.join(summary_parts)}[/]")
|
||||
|
||||
# Update check — show if behind origin/main
|
||||
# Update check — use prefetched result if available
|
||||
try:
|
||||
behind = check_for_updates()
|
||||
behind = get_update_result(timeout=0.5)
|
||||
if behind and behind > 0:
|
||||
commits_word = "commit" if behind == 1 else "commits"
|
||||
right_lines.append(
|
||||
@@ -386,6 +422,9 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
|
||||
)
|
||||
|
||||
console.print()
|
||||
console.print(HERMES_AGENT_LOGO)
|
||||
console.print()
|
||||
term_width = shutil.get_terminal_size().columns
|
||||
if term_width >= 95:
|
||||
_logo = _bskin.banner_logo if _bskin and hasattr(_bskin, 'banner_logo') and _bskin.banner_logo else HERMES_AGENT_LOGO
|
||||
console.print(_logo)
|
||||
console.print()
|
||||
console.print(outer_panel)
|
||||
|
||||
@@ -480,6 +480,13 @@ def cmd_chat(args):
|
||||
print("You can run 'hermes setup' at any time to configure.")
|
||||
sys.exit(1)
|
||||
|
||||
# Start update check in background (runs while other init happens)
|
||||
try:
|
||||
from hermes_cli.banner import prefetch_update_check
|
||||
prefetch_update_check()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Sync bundled skills on every CLI launch (fast -- skips unchanged skills)
|
||||
try:
|
||||
from tools.skills_sync import sync_skills
|
||||
@@ -1863,6 +1870,18 @@ def cmd_version(args):
|
||||
except ImportError:
|
||||
print("OpenAI SDK: Not installed")
|
||||
|
||||
# Show update status (synchronous — acceptable since user asked for version info)
|
||||
try:
|
||||
from hermes_cli.banner import check_for_updates
|
||||
behind = check_for_updates()
|
||||
if behind and behind > 0:
|
||||
commits_word = "commit" if behind == 1 else "commits"
|
||||
print(f"Update available: {behind} {commits_word} behind — run 'hermes update'")
|
||||
elif behind == 0:
|
||||
print("Up to date")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def cmd_uninstall(args):
|
||||
"""Uninstall Hermes Agent."""
|
||||
|
||||
135
tests/hermes_cli/test_update_check.py
Normal file
135
tests/hermes_cli/test_update_check.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Tests for the update check mechanism in hermes_cli.banner."""
|
||||
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def test_version_string_no_v_prefix():
|
||||
"""__version__ should be bare semver without a 'v' prefix."""
|
||||
from hermes_cli import __version__
|
||||
assert not __version__.startswith("v"), f"__version__ should not start with 'v', got {__version__!r}"
|
||||
|
||||
|
||||
def test_check_for_updates_uses_cache(tmp_path):
|
||||
"""When cache is fresh, check_for_updates should return cached value without calling git."""
|
||||
from hermes_cli.banner import check_for_updates
|
||||
|
||||
# Create a fake git repo and fresh cache
|
||||
repo_dir = tmp_path / "hermes-agent"
|
||||
repo_dir.mkdir()
|
||||
(repo_dir / ".git").mkdir()
|
||||
|
||||
cache_file = tmp_path / ".update_check"
|
||||
cache_file.write_text(json.dumps({"ts": time.time(), "behind": 3}))
|
||||
|
||||
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
|
||||
with patch("hermes_cli.banner.subprocess.run") as mock_run:
|
||||
result = check_for_updates()
|
||||
|
||||
assert result == 3
|
||||
mock_run.assert_not_called()
|
||||
|
||||
|
||||
def test_check_for_updates_expired_cache(tmp_path):
|
||||
"""When cache is expired, check_for_updates should call git fetch."""
|
||||
from hermes_cli.banner import check_for_updates
|
||||
|
||||
repo_dir = tmp_path / "hermes-agent"
|
||||
repo_dir.mkdir()
|
||||
(repo_dir / ".git").mkdir()
|
||||
|
||||
# Write an expired cache (timestamp far in the past)
|
||||
cache_file = tmp_path / ".update_check"
|
||||
cache_file.write_text(json.dumps({"ts": 0, "behind": 1}))
|
||||
|
||||
mock_result = MagicMock(returncode=0, stdout="5\n")
|
||||
|
||||
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
|
||||
with patch("hermes_cli.banner.subprocess.run", return_value=mock_result) as mock_run:
|
||||
result = check_for_updates()
|
||||
|
||||
assert result == 5
|
||||
assert mock_run.call_count == 2 # git fetch + git rev-list
|
||||
|
||||
|
||||
def test_check_for_updates_no_git_dir(tmp_path):
|
||||
"""Returns None when .git directory doesn't exist anywhere."""
|
||||
import hermes_cli.banner as banner
|
||||
|
||||
# Create a fake banner.py so the fallback path also has no .git
|
||||
fake_banner = tmp_path / "hermes_cli" / "banner.py"
|
||||
fake_banner.parent.mkdir(parents=True, exist_ok=True)
|
||||
fake_banner.touch()
|
||||
|
||||
original = banner.__file__
|
||||
try:
|
||||
banner.__file__ = str(fake_banner)
|
||||
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
|
||||
with patch("hermes_cli.banner.subprocess.run") as mock_run:
|
||||
result = banner.check_for_updates()
|
||||
assert result is None
|
||||
mock_run.assert_not_called()
|
||||
finally:
|
||||
banner.__file__ = original
|
||||
|
||||
|
||||
def test_check_for_updates_fallback_to_project_root():
|
||||
"""Dev install: falls back to Path(__file__).parent.parent when HERMES_HOME has no git repo."""
|
||||
import hermes_cli.banner as banner
|
||||
|
||||
project_root = Path(banner.__file__).parent.parent.resolve()
|
||||
if not (project_root / ".git").exists():
|
||||
pytest.skip("Not running from a git checkout")
|
||||
|
||||
# Point HERMES_HOME at a temp dir with no hermes-agent/.git
|
||||
import tempfile
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
with patch("hermes_cli.banner.os.getenv", return_value=td):
|
||||
with patch("hermes_cli.banner.subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(returncode=0, stdout="0\n")
|
||||
result = banner.check_for_updates()
|
||||
# Should have fallen back to project root and run git commands
|
||||
assert mock_run.call_count >= 1
|
||||
|
||||
|
||||
def test_prefetch_non_blocking():
|
||||
"""prefetch_update_check() should return immediately without blocking."""
|
||||
import hermes_cli.banner as banner
|
||||
|
||||
# Reset module state
|
||||
banner._update_result = None
|
||||
banner._update_check_done = threading.Event()
|
||||
|
||||
with patch.object(banner, "check_for_updates", return_value=5):
|
||||
start = time.monotonic()
|
||||
banner.prefetch_update_check()
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
# Should return almost immediately (well under 1 second)
|
||||
assert elapsed < 1.0
|
||||
|
||||
# Wait for the background thread to finish
|
||||
banner._update_check_done.wait(timeout=5)
|
||||
assert banner._update_result == 5
|
||||
|
||||
|
||||
def test_get_update_result_timeout():
|
||||
"""get_update_result() returns None when check hasn't completed within timeout."""
|
||||
import hermes_cli.banner as banner
|
||||
|
||||
# Reset module state — don't set the event
|
||||
banner._update_result = None
|
||||
banner._update_check_done = threading.Event()
|
||||
|
||||
start = time.monotonic()
|
||||
result = banner.get_update_result(timeout=0.1)
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
# Should have waited ~0.1s and returned None
|
||||
assert result is None
|
||||
assert elapsed < 0.5
|
||||
Reference in New Issue
Block a user