Tools from check_fn-gated toolsets (honcho, homeassistant) showed as red (disabled) in the startup banner even when properly configured. This happened because check_fn runs lazily after session context is set, but the banner renders before agent init. Now distinguishes three states: - red: truly unavailable (missing env var, no API key) - yellow: lazy-initialized (check_fn pending, will activate on use) - normal: available and ready Only the banner fix was salvaged from the original PR; unrelated bundled changes (context_compressor, STT config, auth default_model, SessionResetPolicy) were discarded. Co-authored-by: Jah-yee <Jah-yee@users.noreply.github.com>
463 lines
19 KiB
Python
463 lines
19 KiB
Python
"""Welcome banner, ASCII art, skills summary, and update check for the CLI.
|
|
|
|
Pure display functions with no HermesCLI state dependency.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from hermes_constants import get_hermes_home
|
|
from typing import Dict, List, Optional
|
|
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
|
|
from prompt_toolkit import print_formatted_text as _pt_print
|
|
from prompt_toolkit.formatted_text import ANSI as _PT_ANSI
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =========================================================================
|
|
# ANSI building blocks for conversation display
|
|
# =========================================================================
|
|
|
|
_GOLD = "\033[1;38;2;255;215;0m" # True-color #FFD700 bold
|
|
_BOLD = "\033[1m"
|
|
_DIM = "\033[2m"
|
|
_RST = "\033[0m"
|
|
|
|
|
|
def cprint(text: str):
|
|
"""Print ANSI-colored text through prompt_toolkit's renderer."""
|
|
_pt_print(_PT_ANSI(text))
|
|
|
|
|
|
# =========================================================================
|
|
# Skin-aware color helpers
|
|
# =========================================================================
|
|
|
|
def _skin_color(key: str, fallback: str) -> str:
|
|
"""Get a color from the active skin, or return fallback."""
|
|
try:
|
|
from hermes_cli.skin_engine import get_active_skin
|
|
return get_active_skin().get_color(key, fallback)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def _skin_branding(key: str, fallback: str) -> str:
|
|
"""Get a branding string from the active skin, or return fallback."""
|
|
try:
|
|
from hermes_cli.skin_engine import get_active_skin
|
|
return get_active_skin().get_branding(key, fallback)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
# =========================================================================
|
|
# ASCII Art & Branding
|
|
# =========================================================================
|
|
|
|
from hermes_cli import __version__ as VERSION, __release_date__ as RELEASE_DATE
|
|
|
|
HERMES_AGENT_LOGO = """[bold #FFD700]██╗ ██╗███████╗██████╗ ███╗ ███╗███████╗███████╗ █████╗ ██████╗ ███████╗███╗ ██╗████████╗[/]
|
|
[bold #FFD700]██║ ██║██╔════╝██╔══██╗████╗ ████║██╔════╝██╔════╝ ██╔══██╗██╔════╝ ██╔════╝████╗ ██║╚══██╔══╝[/]
|
|
[#FFBF00]███████║█████╗ ██████╔╝██╔████╔██║█████╗ ███████╗█████╗███████║██║ ███╗█████╗ ██╔██╗ ██║ ██║[/]
|
|
[#FFBF00]██╔══██║██╔══╝ ██╔══██╗██║╚██╔╝██║██╔══╝ ╚════██║╚════╝██╔══██║██║ ██║██╔══╝ ██║╚██╗██║ ██║[/]
|
|
[#CD7F32]██║ ██║███████╗██║ ██║██║ ╚═╝ ██║███████╗███████║ ██║ ██║╚██████╔╝███████╗██║ ╚████║ ██║[/]
|
|
[#CD7F32]╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝╚══════╝ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═╝[/]"""
|
|
|
|
HERMES_CADUCEUS = """[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⡀⠀⣀⣀⠀⢀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#CD7F32]⠀⠀⠀⠀⠀⠀⢀⣠⣴⣾⣿⣿⣇⠸⣿⣿⠇⣸⣿⣿⣷⣦⣄⡀⠀⠀⠀⠀⠀⠀[/]
|
|
[#FFBF00]⠀⢀⣠⣴⣶⠿⠋⣩⡿⣿⡿⠻⣿⡇⢠⡄⢸⣿⠟⢿⣿⢿⣍⠙⠿⣶⣦⣄⡀⠀[/]
|
|
[#FFBF00]⠀⠀⠉⠉⠁⠶⠟⠋⠀⠉⠀⢀⣈⣁⡈⢁⣈⣁⡀⠀⠉⠀⠙⠻⠶⠈⠉⠉⠀⠀[/]
|
|
[#FFD700]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣴⣿⡿⠛⢁⡈⠛⢿⣿⣦⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#FFD700]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠿⣿⣦⣤⣈⠁⢠⣴⣿⠿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#FFBF00]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠉⠻⢿⣿⣦⡉⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#FFBF00]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⢷⣦⣈⠛⠃⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢠⣴⠦⠈⠙⠿⣦⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#CD7F32]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠸⣿⣤⡈⠁⢤⣿⠇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠉⠛⠷⠄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⠑⢶⣄⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⠁⢰⡆⠈⡿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠳⠈⣡⠞⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]
|
|
[#B8860B]⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀[/]"""
|
|
|
|
COMPACT_BANNER = """
|
|
[bold #FFD700]╔══════════════════════════════════════════════════════════════╗[/]
|
|
[bold #FFD700]║[/] [#FFBF00]⚕ NOUS HERMES[/] [dim #B8860B]- AI Agent Framework[/] [bold #FFD700]║[/]
|
|
[bold #FFD700]║[/] [#CD7F32]Messenger of the Digital Gods[/] [dim #B8860B]Nous Research[/] [bold #FFD700]║[/]
|
|
[bold #FFD700]╚══════════════════════════════════════════════════════════════╝[/]
|
|
"""
|
|
|
|
|
|
# =========================================================================
|
|
# Skills scanning
|
|
# =========================================================================
|
|
|
|
def get_available_skills() -> Dict[str, List[str]]:
|
|
"""Return skills grouped by category, filtered by platform and disabled state.
|
|
|
|
Delegates to ``_find_all_skills()`` from ``tools/skills_tool`` which already
|
|
handles platform gating (``platforms:`` frontmatter) and respects the
|
|
user's ``skills.disabled`` config list.
|
|
"""
|
|
try:
|
|
from tools.skills_tool import _find_all_skills
|
|
all_skills = _find_all_skills() # already filtered
|
|
except Exception:
|
|
return {}
|
|
|
|
skills_by_category: Dict[str, List[str]] = {}
|
|
for skill in all_skills:
|
|
category = skill.get("category") or "general"
|
|
skills_by_category.setdefault(category, []).append(skill["name"])
|
|
return skills_by_category
|
|
|
|
|
|
# =========================================================================
|
|
# Update check
|
|
# =========================================================================
|
|
|
|
# Cache update check results for 6 hours to avoid repeated git fetches
|
|
_UPDATE_CHECK_CACHE_SECONDS = 6 * 3600
|
|
|
|
|
|
def check_for_updates() -> Optional[int]:
|
|
"""Check how many commits behind origin/main the local repo is.
|
|
|
|
Does a ``git fetch`` at most once every 6 hours (cached to
|
|
``~/.hermes/.update_check``). Returns the number of commits behind,
|
|
or ``None`` if the check fails or isn't applicable.
|
|
"""
|
|
hermes_home = get_hermes_home()
|
|
repo_dir = hermes_home / "hermes-agent"
|
|
cache_file = hermes_home / ".update_check"
|
|
|
|
# Must be a git repo — fall back to project root for dev installs
|
|
if not (repo_dir / ".git").exists():
|
|
repo_dir = Path(__file__).parent.parent.resolve()
|
|
if not (repo_dir / ".git").exists():
|
|
return None
|
|
|
|
# Read cache
|
|
now = time.time()
|
|
try:
|
|
if cache_file.exists():
|
|
cached = json.loads(cache_file.read_text())
|
|
if now - cached.get("ts", 0) < _UPDATE_CHECK_CACHE_SECONDS:
|
|
return cached.get("behind")
|
|
except Exception:
|
|
pass
|
|
|
|
# Fetch latest refs (fast — only downloads ref metadata, no files)
|
|
try:
|
|
subprocess.run(
|
|
["git", "fetch", "origin", "--quiet"],
|
|
capture_output=True, timeout=10,
|
|
cwd=str(repo_dir),
|
|
)
|
|
except Exception:
|
|
pass # Offline or timeout — use stale refs, that's fine
|
|
|
|
# Count commits behind
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-list", "--count", "HEAD..origin/main"],
|
|
capture_output=True, text=True, timeout=5,
|
|
cwd=str(repo_dir),
|
|
)
|
|
if result.returncode == 0:
|
|
behind = int(result.stdout.strip())
|
|
else:
|
|
behind = None
|
|
except Exception:
|
|
behind = None
|
|
|
|
# Write cache
|
|
try:
|
|
cache_file.write_text(json.dumps({"ts": now, "behind": behind}))
|
|
except Exception:
|
|
pass
|
|
|
|
return behind
|
|
|
|
|
|
# =========================================================================
|
|
# Non-blocking update check
|
|
# =========================================================================
|
|
|
|
_update_result: Optional[int] = None
|
|
_update_check_done = threading.Event()
|
|
|
|
|
|
def prefetch_update_check():
|
|
"""Kick off update check in a background daemon thread."""
|
|
def _run():
|
|
global _update_result
|
|
_update_result = check_for_updates()
|
|
_update_check_done.set()
|
|
t = threading.Thread(target=_run, daemon=True)
|
|
t.start()
|
|
|
|
|
|
def get_update_result(timeout: float = 0.5) -> Optional[int]:
|
|
"""Get result of prefetched check. Returns None if not ready."""
|
|
_update_check_done.wait(timeout=timeout)
|
|
return _update_result
|
|
|
|
|
|
# =========================================================================
|
|
# Welcome banner
|
|
# =========================================================================
|
|
|
|
def _format_context_length(tokens: int) -> str:
|
|
"""Format a token count for display (e.g. 128000 → '128K', 1048576 → '1M')."""
|
|
if tokens >= 1_000_000:
|
|
val = tokens / 1_000_000
|
|
return f"{val:g}M"
|
|
elif tokens >= 1_000:
|
|
val = tokens / 1_000
|
|
return f"{val:g}K"
|
|
return str(tokens)
|
|
|
|
|
|
def _display_toolset_name(toolset_name: str) -> str:
|
|
"""Normalize internal/legacy toolset identifiers for banner display."""
|
|
if not toolset_name:
|
|
return "unknown"
|
|
return (
|
|
toolset_name[:-6]
|
|
if toolset_name.endswith("_tools")
|
|
else toolset_name
|
|
)
|
|
|
|
|
|
def build_welcome_banner(console: Console, model: str, cwd: str,
|
|
tools: List[dict] = None,
|
|
enabled_toolsets: List[str] = None,
|
|
session_id: str = None,
|
|
get_toolset_for_tool=None,
|
|
context_length: int = None):
|
|
"""Build and print a welcome banner with caduceus on left and info on right.
|
|
|
|
Args:
|
|
console: Rich Console instance.
|
|
model: Current model name.
|
|
cwd: Current working directory.
|
|
tools: List of tool definitions.
|
|
enabled_toolsets: List of enabled toolset names.
|
|
session_id: Session identifier.
|
|
get_toolset_for_tool: Callable to map tool name -> toolset name.
|
|
context_length: Model's context window size in tokens.
|
|
"""
|
|
from model_tools import check_tool_availability, TOOLSET_REQUIREMENTS
|
|
if get_toolset_for_tool is None:
|
|
from model_tools import get_toolset_for_tool
|
|
|
|
tools = tools or []
|
|
enabled_toolsets = enabled_toolsets or []
|
|
|
|
_, unavailable_toolsets = check_tool_availability(quiet=True)
|
|
disabled_tools = set()
|
|
# Tools whose toolset has a check_fn are lazy-initialized (e.g. honcho,
|
|
# homeassistant) — they show as unavailable at banner time because the
|
|
# check hasn't run yet, but they aren't misconfigured.
|
|
lazy_tools = set()
|
|
for item in unavailable_toolsets:
|
|
toolset_name = item.get("name", "")
|
|
ts_req = TOOLSET_REQUIREMENTS.get(toolset_name, {})
|
|
tools_in_ts = item.get("tools", [])
|
|
if ts_req.get("check_fn"):
|
|
lazy_tools.update(tools_in_ts)
|
|
else:
|
|
disabled_tools.update(tools_in_ts)
|
|
|
|
layout_table = Table.grid(padding=(0, 2))
|
|
layout_table.add_column("left", justify="center")
|
|
layout_table.add_column("right", justify="left")
|
|
|
|
# Resolve skin colors once for the entire banner
|
|
accent = _skin_color("banner_accent", "#FFBF00")
|
|
dim = _skin_color("banner_dim", "#B8860B")
|
|
text = _skin_color("banner_text", "#FFF8DC")
|
|
session_color = _skin_color("session_border", "#8B8682")
|
|
|
|
# Use skin's custom caduceus art if provided
|
|
try:
|
|
from hermes_cli.skin_engine import get_active_skin
|
|
_bskin = get_active_skin()
|
|
_hero = _bskin.banner_hero if hasattr(_bskin, 'banner_hero') and _bskin.banner_hero else HERMES_CADUCEUS
|
|
except Exception:
|
|
_bskin = None
|
|
_hero = HERMES_CADUCEUS
|
|
left_lines = ["", _hero, ""]
|
|
model_short = model.split("/")[-1] if "/" in model else model
|
|
if model_short.endswith(".gguf"):
|
|
model_short = model_short[:-5]
|
|
if len(model_short) > 28:
|
|
model_short = model_short[:25] + "..."
|
|
ctx_str = f" [dim {dim}]·[/] [dim {dim}]{_format_context_length(context_length)} context[/]" if context_length else ""
|
|
left_lines.append(f"[{accent}]{model_short}[/]{ctx_str} [dim {dim}]·[/] [dim {dim}]Nous Research[/]")
|
|
left_lines.append(f"[dim {dim}]{cwd}[/]")
|
|
if session_id:
|
|
left_lines.append(f"[dim {session_color}]Session: {session_id}[/]")
|
|
left_content = "\n".join(left_lines)
|
|
|
|
right_lines = [f"[bold {accent}]Available Tools[/]"]
|
|
toolsets_dict: Dict[str, list] = {}
|
|
|
|
for tool in tools:
|
|
tool_name = tool["function"]["name"]
|
|
toolset = _display_toolset_name(get_toolset_for_tool(tool_name) or "other")
|
|
toolsets_dict.setdefault(toolset, []).append(tool_name)
|
|
|
|
for item in unavailable_toolsets:
|
|
toolset_id = item.get("id", item.get("name", "unknown"))
|
|
display_name = _display_toolset_name(toolset_id)
|
|
if display_name not in toolsets_dict:
|
|
toolsets_dict[display_name] = []
|
|
for tool_name in item.get("tools", []):
|
|
if tool_name not in toolsets_dict[display_name]:
|
|
toolsets_dict[display_name].append(tool_name)
|
|
|
|
sorted_toolsets = sorted(toolsets_dict.keys())
|
|
display_toolsets = sorted_toolsets[:8]
|
|
remaining_toolsets = len(sorted_toolsets) - 8
|
|
|
|
for toolset in display_toolsets:
|
|
tool_names = toolsets_dict[toolset]
|
|
colored_names = []
|
|
for name in sorted(tool_names):
|
|
if name in disabled_tools:
|
|
colored_names.append(f"[red]{name}[/]")
|
|
elif name in lazy_tools:
|
|
colored_names.append(f"[yellow]{name}[/]")
|
|
else:
|
|
colored_names.append(f"[{text}]{name}[/]")
|
|
|
|
tools_str = ", ".join(colored_names)
|
|
if len(", ".join(sorted(tool_names))) > 45:
|
|
short_names = []
|
|
length = 0
|
|
for name in sorted(tool_names):
|
|
if length + len(name) + 2 > 42:
|
|
short_names.append("...")
|
|
break
|
|
short_names.append(name)
|
|
length += len(name) + 2
|
|
colored_names = []
|
|
for name in short_names:
|
|
if name == "...":
|
|
colored_names.append("[dim]...[/]")
|
|
elif name in disabled_tools:
|
|
colored_names.append(f"[red]{name}[/]")
|
|
elif name in lazy_tools:
|
|
colored_names.append(f"[yellow]{name}[/]")
|
|
else:
|
|
colored_names.append(f"[{text}]{name}[/]")
|
|
tools_str = ", ".join(colored_names)
|
|
|
|
right_lines.append(f"[dim {dim}]{toolset}:[/] {tools_str}")
|
|
|
|
if remaining_toolsets > 0:
|
|
right_lines.append(f"[dim {dim}](and {remaining_toolsets} more toolsets...)[/]")
|
|
|
|
# MCP Servers section (only if configured)
|
|
try:
|
|
from tools.mcp_tool import get_mcp_status
|
|
mcp_status = get_mcp_status()
|
|
except Exception:
|
|
mcp_status = []
|
|
|
|
if mcp_status:
|
|
right_lines.append("")
|
|
right_lines.append(f"[bold {accent}]MCP Servers[/]")
|
|
for srv in mcp_status:
|
|
if srv["connected"]:
|
|
right_lines.append(
|
|
f"[dim {dim}]{srv['name']}[/] [{text}]({srv['transport']})[/] "
|
|
f"[dim {dim}]—[/] [{text}]{srv['tools']} tool(s)[/]"
|
|
)
|
|
else:
|
|
right_lines.append(
|
|
f"[red]{srv['name']}[/] [dim]({srv['transport']})[/] "
|
|
f"[red]— failed[/]"
|
|
)
|
|
|
|
right_lines.append("")
|
|
right_lines.append(f"[bold {accent}]Available Skills[/]")
|
|
skills_by_category = get_available_skills()
|
|
total_skills = sum(len(s) for s in skills_by_category.values())
|
|
|
|
if skills_by_category:
|
|
for category in sorted(skills_by_category.keys()):
|
|
skill_names = sorted(skills_by_category[category])
|
|
if len(skill_names) > 8:
|
|
display_names = skill_names[:8]
|
|
skills_str = ", ".join(display_names) + f" +{len(skill_names) - 8} more"
|
|
else:
|
|
skills_str = ", ".join(skill_names)
|
|
if len(skills_str) > 50:
|
|
skills_str = skills_str[:47] + "..."
|
|
right_lines.append(f"[dim {dim}]{category}:[/] [{text}]{skills_str}[/]")
|
|
else:
|
|
right_lines.append(f"[dim {dim}]No skills installed[/]")
|
|
|
|
right_lines.append("")
|
|
mcp_connected = sum(1 for s in mcp_status if s["connected"]) if mcp_status else 0
|
|
summary_parts = [f"{len(tools)} tools", f"{total_skills} skills"]
|
|
if mcp_connected:
|
|
summary_parts.append(f"{mcp_connected} MCP servers")
|
|
summary_parts.append("/help for commands")
|
|
# Show active profile name when not 'default'
|
|
try:
|
|
from hermes_cli.profiles import get_active_profile_name
|
|
_profile_name = get_active_profile_name()
|
|
if _profile_name and _profile_name != "default":
|
|
right_lines.append(f"[bold {accent}]Profile:[/] [{text}]{_profile_name}[/]")
|
|
except Exception:
|
|
pass # Never break the banner over a profiles.py bug
|
|
|
|
right_lines.append(f"[dim {dim}]{' · '.join(summary_parts)}[/]")
|
|
|
|
# Update check — use prefetched result if available
|
|
try:
|
|
behind = get_update_result(timeout=0.5)
|
|
if behind and behind > 0:
|
|
commits_word = "commit" if behind == 1 else "commits"
|
|
right_lines.append(
|
|
f"[bold yellow]⚠ {behind} {commits_word} behind[/]"
|
|
f"[dim yellow] — run [bold]hermes update[/bold] to update[/]"
|
|
)
|
|
except Exception:
|
|
pass # Never break the banner over an update check
|
|
|
|
right_content = "\n".join(right_lines)
|
|
layout_table.add_row(left_content, right_content)
|
|
|
|
agent_name = _skin_branding("agent_name", "Hermes Agent")
|
|
title_color = _skin_color("banner_title", "#FFD700")
|
|
border_color = _skin_color("banner_border", "#CD7F32")
|
|
outer_panel = Panel(
|
|
layout_table,
|
|
title=f"[bold {title_color}]{agent_name} v{VERSION} ({RELEASE_DATE})[/]",
|
|
border_style=border_color,
|
|
padding=(0, 2),
|
|
)
|
|
|
|
console.print()
|
|
term_width = shutil.get_terminal_size().columns
|
|
if term_width >= 95:
|
|
_logo = _bskin.banner_logo if _bskin and hasattr(_bskin, 'banner_logo') and _bskin.banner_logo else HERMES_AGENT_LOGO
|
|
console.print(_logo)
|
|
console.print()
|
|
console.print(outer_panel)
|