Files
hermes-agent/hermes_cli/banner.py

431 lines
18 KiB
Python
Raw Normal View History

"""Welcome banner, ASCII art, skills summary, and update check for the CLI.
Pure display functions with no HermesCLI state dependency.
"""
import json
import logging
import os
import shutil
import subprocess
import threading
import time
from pathlib import Path
from typing import Dict, List, Any, Optional
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from prompt_toolkit import print_formatted_text as _pt_print
from prompt_toolkit.formatted_text import ANSI as _PT_ANSI
logger = logging.getLogger(__name__)
# =========================================================================
# ANSI building blocks for conversation display
# =========================================================================
_GOLD = "\033[1;33m"
_BOLD = "\033[1m"
_DIM = "\033[2m"
_RST = "\033[0m"
def cprint(text: str):
"""Print ANSI-colored text through prompt_toolkit's renderer."""
_pt_print(_PT_ANSI(text))
# =========================================================================
# Skin-aware color helpers
# =========================================================================
def _skin_color(key: str, fallback: str) -> str:
"""Get a color from the active skin, or return fallback."""
try:
from hermes_cli.skin_engine import get_active_skin
return get_active_skin().get_color(key, fallback)
except Exception:
return fallback
def _skin_branding(key: str, fallback: str) -> str:
"""Get a branding string from the active skin, or return fallback."""
try:
from hermes_cli.skin_engine import get_active_skin
return get_active_skin().get_branding(key, fallback)
except Exception:
return fallback
# =========================================================================
# ASCII Art & Branding
# =========================================================================
from hermes_cli import __version__ as VERSION, __release_date__ as RELEASE_DATE
HERMES_AGENT_LOGO = """[bold #FFD700]██╗ ██╗███████╗██████╗ ███╗ ███╗███████╗███████╗ █████╗ ██████╗ ███████╗███╗ ██╗████████╗[/]
[bold #FFD700]██║ ██║██╔════╝██╔══██╗████╗ ████║██╔════╝██╔════╝ ██╔══██╗██╔════╝ ██╔════╝████╗ ██║╚══██╔══╝[/]
[#FFBF00]███████║█████╗ ██████╔╝██╔████╔██║█████╗ ███████╗█████╗███████║██║ ███╗█████╗ ██╔██╗ ██║ ██║[/]
[#FFBF00]██╔══██║██╔══╝ ██╔══██╗██║╚██╔╝██║██╔══╝ ╚════██║╚════╝██╔══██║██║ ██║██╔══╝ ██║╚██╗██║ ██║[/]
[#CD7F32]██║ ██║███████╗██║ ██║██║ ╚═╝ ██║███████╗███████║ ██║ ██║╚██████╔╝███████╗██║ ╚████║ ██║[/]
[#CD7F32]╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝╚══════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═╝[/]"""
HERMES_CADUCEUS = """[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⡀⠀⣀⣀⠀⢀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#CD7F32]⠀⠀⠀⠀⠀⠀⢀⣠⣴⣾⣿⣿⣇⠸⣿⣿⠇⣸⣿⣿⣷⣦⣄⡀⠀⠀⠀⠀⠀⠀[/]
[#FFBF00]⠀⢀⣠⣴⣶⠿⠋⣩⡿⣿⡿⠻⣿⡇⢠⡄⢸⣿⠟⢿⣿⢿⣍⠙⠿⣶⣦⣄⡀⠀[/]
[#FFBF00]⠀⠀⠉⠉⠁⠶⠟⠋⠀⠉⠀⢀⣈⣁⡈⢁⣈⣁⡀⠀⠉⠀⠙⠻⠶⠈⠉⠉⠀⠀[/]
[#FFD700]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣴⣿⡿⠛⢁⡈⠛⢿⣿⣦⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#FFD700]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠿⣿⣦⣤⣈⠁⢠⣴⣿⠿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#FFBF00]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠻⢿⣿⣦⡉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#FFBF00]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⢷⣦⣈⠛⠃⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢠⣴⠦⠈⠙⠿⣦⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠸⣿⣤⡈⠁⢤⣿⠇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠉⠛⠷⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⠑⢶⣄⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⠁⢰⡆⠈⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠳⠈⣡⠞⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]"""
COMPACT_BANNER = """
[bold #FFD700]╔══════════════════════════════════════════════════════════════╗[/]
[bold #FFD700]║[/] [#FFBF00]⚕ NOUS HERMES[/] [dim #B8860B]- AI Agent Framework[/] [bold #FFD700]║[/]
[bold #FFD700]║[/] [#CD7F32]Messenger of the Digital Gods[/] [dim #B8860B]Nous Research[/] [bold #FFD700]║[/]
[bold #FFD700]╚══════════════════════════════════════════════════════════════╝[/]
"""
# =========================================================================
# Skills scanning
# =========================================================================
def get_available_skills() -> Dict[str, List[str]]:
"""Scan ~/.hermes/skills/ and return skills grouped by category."""
import os
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
skills_dir = hermes_home / "skills"
skills_by_category = {}
if not skills_dir.exists():
return skills_by_category
for skill_file in skills_dir.rglob("SKILL.md"):
rel_path = skill_file.relative_to(skills_dir)
parts = rel_path.parts
if len(parts) >= 2:
category = parts[0]
skill_name = parts[-2]
else:
category = "general"
skill_name = skill_file.parent.name
skills_by_category.setdefault(category, []).append(skill_name)
return skills_by_category
# =========================================================================
# Update check
# =========================================================================
# Cache update check results for 6 hours to avoid repeated git fetches
_UPDATE_CHECK_CACHE_SECONDS = 6 * 3600
def check_for_updates() -> Optional[int]:
"""Check how many commits behind origin/main the local repo is.
Does a ``git fetch`` at most once every 6 hours (cached to
``~/.hermes/.update_check``). Returns the number of commits behind,
or ``None`` if the check fails or isn't applicable.
"""
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
repo_dir = hermes_home / "hermes-agent"
cache_file = hermes_home / ".update_check"
# Must be a git repo — fall back to project root for dev installs
if not (repo_dir / ".git").exists():
repo_dir = Path(__file__).parent.parent.resolve()
if not (repo_dir / ".git").exists():
return None
# Read cache
now = time.time()
try:
if cache_file.exists():
cached = json.loads(cache_file.read_text())
if now - cached.get("ts", 0) < _UPDATE_CHECK_CACHE_SECONDS:
return cached.get("behind")
except Exception:
pass
# Fetch latest refs (fast — only downloads ref metadata, no files)
try:
subprocess.run(
["git", "fetch", "origin", "--quiet"],
capture_output=True, timeout=10,
cwd=str(repo_dir),
)
except Exception:
pass # Offline or timeout — use stale refs, that's fine
# Count commits behind
try:
result = subprocess.run(
["git", "rev-list", "--count", "HEAD..origin/main"],
capture_output=True, text=True, timeout=5,
cwd=str(repo_dir),
)
if result.returncode == 0:
behind = int(result.stdout.strip())
else:
behind = None
except Exception:
behind = None
# Write cache
try:
cache_file.write_text(json.dumps({"ts": now, "behind": behind}))
except Exception:
pass
return behind
# =========================================================================
# Non-blocking update check
# =========================================================================
_update_result: Optional[int] = None
_update_check_done = threading.Event()
def prefetch_update_check():
"""Kick off update check in a background daemon thread."""
def _run():
global _update_result
_update_result = check_for_updates()
_update_check_done.set()
t = threading.Thread(target=_run, daemon=True)
t.start()
def get_update_result(timeout: float = 0.5) -> Optional[int]:
"""Get result of prefetched check. Returns None if not ready."""
_update_check_done.wait(timeout=timeout)
return _update_result
# =========================================================================
# Welcome banner
# =========================================================================
def _format_context_length(tokens: int) -> str:
"""Format a token count for display (e.g. 128000 → '128K', 1048576 → '1M')."""
if tokens >= 1_000_000:
val = tokens / 1_000_000
return f"{val:g}M"
elif tokens >= 1_000:
val = tokens / 1_000
return f"{val:g}K"
return str(tokens)
def build_welcome_banner(console: Console, model: str, cwd: str,
tools: List[dict] = None,
enabled_toolsets: List[str] = None,
session_id: str = None,
get_toolset_for_tool=None,
context_length: int = None):
"""Build and print a welcome banner with caduceus on left and info on right.
Args:
console: Rich Console instance.
model: Current model name.
cwd: Current working directory.
tools: List of tool definitions.
enabled_toolsets: List of enabled toolset names.
session_id: Session identifier.
get_toolset_for_tool: Callable to map tool name -> toolset name.
context_length: Model's context window size in tokens.
"""
from model_tools import check_tool_availability, TOOLSET_REQUIREMENTS
if get_toolset_for_tool is None:
from model_tools import get_toolset_for_tool
tools = tools or []
enabled_toolsets = enabled_toolsets or []
_, unavailable_toolsets = check_tool_availability(quiet=True)
disabled_tools = set()
for item in unavailable_toolsets:
disabled_tools.update(item.get("tools", []))
layout_table = Table.grid(padding=(0, 2))
layout_table.add_column("left", justify="center")
layout_table.add_column("right", justify="left")
# Resolve skin colors once for the entire banner
accent = _skin_color("banner_accent", "#FFBF00")
dim = _skin_color("banner_dim", "#B8860B")
text = _skin_color("banner_text", "#FFF8DC")
session_color = _skin_color("session_border", "#8B8682")
# Use skin's custom caduceus art if provided
try:
from hermes_cli.skin_engine import get_active_skin
_bskin = get_active_skin()
_hero = _bskin.banner_hero if hasattr(_bskin, 'banner_hero') and _bskin.banner_hero else HERMES_CADUCEUS
except Exception:
_bskin = None
_hero = HERMES_CADUCEUS
left_lines = ["", _hero, ""]
model_short = model.split("/")[-1] if "/" in model else model
if len(model_short) > 28:
model_short = model_short[:25] + "..."
ctx_str = f" [dim {dim}]·[/] [dim {dim}]{_format_context_length(context_length)} context[/]" if context_length else ""
left_lines.append(f"[{accent}]{model_short}[/]{ctx_str} [dim {dim}]·[/] [dim {dim}]Nous Research[/]")
left_lines.append(f"[dim {dim}]{cwd}[/]")
if session_id:
left_lines.append(f"[dim {session_color}]Session: {session_id}[/]")
left_content = "\n".join(left_lines)
right_lines = [f"[bold {accent}]Available Tools[/]"]
toolsets_dict: Dict[str, list] = {}
for tool in tools:
tool_name = tool["function"]["name"]
toolset = get_toolset_for_tool(tool_name) or "other"
toolsets_dict.setdefault(toolset, []).append(tool_name)
for item in unavailable_toolsets:
toolset_id = item.get("id", item.get("name", "unknown"))
display_name = f"{toolset_id}_tools" if not toolset_id.endswith("_tools") else toolset_id
if display_name not in toolsets_dict:
toolsets_dict[display_name] = []
for tool_name in item.get("tools", []):
if tool_name not in toolsets_dict[display_name]:
toolsets_dict[display_name].append(tool_name)
sorted_toolsets = sorted(toolsets_dict.keys())
display_toolsets = sorted_toolsets[:8]
remaining_toolsets = len(sorted_toolsets) - 8
for toolset in display_toolsets:
tool_names = toolsets_dict[toolset]
colored_names = []
for name in sorted(tool_names):
if name in disabled_tools:
colored_names.append(f"[red]{name}[/]")
else:
colored_names.append(f"[{text}]{name}[/]")
tools_str = ", ".join(colored_names)
if len(", ".join(sorted(tool_names))) > 45:
short_names = []
length = 0
for name in sorted(tool_names):
if length + len(name) + 2 > 42:
short_names.append("...")
break
short_names.append(name)
length += len(name) + 2
colored_names = []
for name in short_names:
if name == "...":
colored_names.append("[dim]...[/]")
elif name in disabled_tools:
colored_names.append(f"[red]{name}[/]")
else:
colored_names.append(f"[{text}]{name}[/]")
tools_str = ", ".join(colored_names)
right_lines.append(f"[dim #B8860B]{toolset}:[/] {tools_str}")
if remaining_toolsets > 0:
right_lines.append(f"[dim #B8860B](and {remaining_toolsets} more toolsets...)[/]")
# MCP Servers section (only if configured)
try:
from tools.mcp_tool import get_mcp_status
mcp_status = get_mcp_status()
except Exception:
mcp_status = []
if mcp_status:
right_lines.append("")
right_lines.append("[bold #FFBF00]MCP Servers[/]")
for srv in mcp_status:
if srv["connected"]:
right_lines.append(
f"[dim #B8860B]{srv['name']}[/] [#FFF8DC]({srv['transport']})[/] "
f"[dim #B8860B]—[/] [#FFF8DC]{srv['tools']} tool(s)[/]"
)
else:
right_lines.append(
f"[red]{srv['name']}[/] [dim]({srv['transport']})[/] "
f"[red]— failed[/]"
)
right_lines.append("")
right_lines.append(f"[bold {accent}]Available Skills[/]")
skills_by_category = get_available_skills()
total_skills = sum(len(s) for s in skills_by_category.values())
if skills_by_category:
for category in sorted(skills_by_category.keys()):
skill_names = sorted(skills_by_category[category])
if len(skill_names) > 8:
display_names = skill_names[:8]
skills_str = ", ".join(display_names) + f" +{len(skill_names) - 8} more"
else:
skills_str = ", ".join(skill_names)
if len(skills_str) > 50:
skills_str = skills_str[:47] + "..."
right_lines.append(f"[dim {dim}]{category}:[/] [{text}]{skills_str}[/]")
else:
right_lines.append(f"[dim {dim}]No skills installed[/]")
right_lines.append("")
mcp_connected = sum(1 for s in mcp_status if s["connected"]) if mcp_status else 0
summary_parts = [f"{len(tools)} tools", f"{total_skills} skills"]
if mcp_connected:
summary_parts.append(f"{mcp_connected} MCP servers")
summary_parts.append("/help for commands")
right_lines.append(f"[dim {dim}]{' · '.join(summary_parts)}[/]")
# Update check — use prefetched result if available
try:
behind = get_update_result(timeout=0.5)
if behind and behind > 0:
commits_word = "commit" if behind == 1 else "commits"
right_lines.append(
f"[bold yellow]⚠ {behind} {commits_word} behind[/]"
f"[dim yellow] — run [bold]hermes update[/bold] to update[/]"
)
except Exception:
pass # Never break the banner over an update check
right_content = "\n".join(right_lines)
layout_table.add_row(left_content, right_content)
agent_name = _skin_branding("agent_name", "Hermes Agent")
title_color = _skin_color("banner_title", "#FFD700")
border_color = _skin_color("banner_border", "#CD7F32")
outer_panel = Panel(
layout_table,
title=f"[bold {title_color}]{agent_name} v{VERSION} ({RELEASE_DATE})[/]",
border_style=border_color,
padding=(0, 2),
)
console.print()
term_width = shutil.get_terminal_size().columns
if term_width >= 95:
_logo = _bskin.banner_logo if _bskin and hasattr(_bskin, 'banner_logo') and _bskin.banner_logo else HERMES_AGENT_LOGO
console.print(_logo)
console.print()
console.print(outer_panel)