Files
hermes-agent/hermes_cli/main.py
Teknium 88643a1ba9 feat: overhaul context length detection with models.dev and provider-aware resolution (#2158)
Replace the fragile hardcoded context length system with a multi-source
resolution chain that correctly identifies context windows per provider.

Key changes:

- New agent/models_dev.py: Fetches and caches the models.dev registry
  (3800+ models across 100+ providers with per-provider context windows).
  In-memory cache (1hr TTL) + disk cache for cold starts.

- Rewritten get_model_context_length() resolution chain:
  0. Config override (model.context_length)
  1. Custom providers per-model context_length
  2. Persistent disk cache
  3. Endpoint /models (local servers)
  4. Anthropic /v1/models API (max_input_tokens, API-key only)
  5. OpenRouter live API (existing, unchanged)
  6. Nous suffix-match via OpenRouter (dot/dash normalization)
  7. models.dev registry lookup (provider-aware)
  8. Thin hardcoded defaults (broad family patterns)
  9. 128K fallback (was 2M)

- Provider-aware context: same model now correctly resolves to different
  context windows per provider (e.g. claude-opus-4.6: 1M on Anthropic,
  128K on GitHub Copilot). Provider name flows through ContextCompressor.

- DEFAULT_CONTEXT_LENGTHS shrunk from 80+ entries to ~16 broad patterns.
  models.dev replaces the per-model hardcoding.

- CONTEXT_PROBE_TIERS changed from [2M, 1M, 512K, 200K, 128K, 64K, 32K]
  to [128K, 64K, 32K, 16K, 8K]. Unknown models no longer start at 2M.

- hermes model: prompts for context_length when configuring custom
  endpoints. Supports shorthand (32k, 128K). Saved to custom_providers
  per-model config.

- custom_providers schema extended with optional models dict for
  per-model context_length (backward compatible).

- Nous Portal: suffix-matches bare IDs (claude-opus-4-6) against
  OpenRouter's prefixed IDs (anthropic/claude-opus-4.6) with dot/dash
  normalization. Handles all 15 current Nous models.

- Anthropic direct: queries /v1/models for max_input_tokens. Only works
  with regular API keys (sk-ant-api*), not OAuth tokens. Falls through
  to models.dev for OAuth users.

Tests: 5574 passed (18 new tests for models_dev + updated probe tiers)
Docs: Updated configuration.md context length section, AGENTS.md

Co-authored-by: Test <test@test.com>
2026-03-20 06:04:33 -07:00

4071 lines
156 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Hermes CLI - Main entry point.
Usage:
hermes # Interactive chat (default)
hermes chat # Interactive chat
hermes gateway # Run gateway in foreground
hermes gateway start # Start gateway as service
hermes gateway stop # Stop gateway service
hermes gateway status # Show gateway status
hermes gateway install # Install gateway service
hermes gateway uninstall # Uninstall gateway service
hermes setup # Interactive setup wizard
hermes logout # Clear stored authentication
hermes status # Show status of all components
hermes cron # Manage cron jobs
hermes cron list # List cron jobs
hermes cron status # Check if cron scheduler is running
hermes doctor # Check configuration and dependencies
hermes honcho setup # Configure Honcho AI memory integration
hermes honcho status # Show Honcho config and connection status
hermes honcho sessions # List directory → session name mappings
hermes honcho map <name> # Map current directory to a session name
hermes honcho peer # Show peer names and dialectic settings
hermes honcho peer --user NAME # Set user peer name
hermes honcho peer --ai NAME # Set AI peer name
hermes honcho peer --reasoning LEVEL # Set dialectic reasoning level
hermes honcho mode # Show current memory mode
hermes honcho mode [hybrid|honcho|local] # Set memory mode
hermes honcho tokens # Show token budget settings
hermes honcho tokens --context N # Set session.context() token cap
hermes honcho tokens --dialectic N # Set dialectic result char cap
hermes honcho identity # Show AI peer identity representation
hermes honcho identity <file> # Seed AI peer identity from a file (SOUL.md etc.)
hermes honcho migrate # Step-by-step migration guide: OpenClaw native → Hermes + Honcho
hermes version Show version
hermes update Update to latest version
hermes uninstall Uninstall Hermes Agent
hermes acp Run as an ACP server for editor integration
hermes sessions browse Interactive session picker with search
hermes claw migrate --dry-run # Preview migration without changes
"""
import argparse
import os
import subprocess
import sys
from pathlib import Path
from typing import Optional
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(PROJECT_ROOT))
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_cli.config import get_hermes_home
from hermes_cli.env_loader import load_hermes_dotenv
load_hermes_dotenv(project_env=PROJECT_ROOT / '.env')
# Point mini-swe-agent at ~/.hermes/ so it shares our config
os.environ.setdefault("MSWEA_GLOBAL_CONFIG_DIR", str(get_hermes_home()))
os.environ.setdefault("MSWEA_SILENT_STARTUP", "1")
import logging
import time as _time
from datetime import datetime
from hermes_cli import __version__, __release_date__
from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
def _relative_time(ts) -> str:
"""Format a timestamp as relative time (e.g., '2h ago', 'yesterday')."""
if not ts:
return "?"
delta = _time.time() - ts
if delta < 60:
return "just now"
if delta < 3600:
return f"{int(delta / 60)}m ago"
if delta < 86400:
return f"{int(delta / 3600)}h ago"
if delta < 172800:
return "yesterday"
if delta < 604800:
return f"{int(delta / 86400)}d ago"
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
def _has_any_provider_configured() -> bool:
"""Check if at least one inference provider is usable."""
from hermes_cli.config import get_env_path, get_hermes_home
from hermes_cli.auth import get_auth_status
# Check env vars (may be set by .env or shell).
# OPENAI_BASE_URL alone counts — local models (vLLM, llama.cpp, etc.)
# often don't require an API key.
from hermes_cli.auth import PROVIDER_REGISTRY
# Collect all provider env vars
provider_env_vars = {"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN", "OPENAI_BASE_URL"}
for pconfig in PROVIDER_REGISTRY.values():
if pconfig.auth_type == "api_key":
provider_env_vars.update(pconfig.api_key_env_vars)
if any(os.getenv(v) for v in provider_env_vars):
return True
# Check .env file for keys
env_file = get_env_path()
if env_file.exists():
try:
for line in env_file.read_text().splitlines():
line = line.strip()
if line.startswith("#") or "=" not in line:
continue
key, _, val = line.partition("=")
val = val.strip().strip("'\"")
if key.strip() in provider_env_vars and val:
return True
except Exception:
pass
# Check provider-specific auth fallbacks (for example, Copilot via gh auth).
try:
for provider_id, pconfig in PROVIDER_REGISTRY.items():
if pconfig.auth_type != "api_key":
continue
status = get_auth_status(provider_id)
if status.get("logged_in"):
return True
except Exception:
pass
# Check for Nous Portal OAuth credentials
auth_file = get_hermes_home() / "auth.json"
if auth_file.exists():
try:
import json
auth = json.loads(auth_file.read_text())
active = auth.get("active_provider")
if active:
status = get_auth_status(active)
if status.get("logged_in"):
return True
except Exception:
pass
# Check for Claude Code OAuth credentials (~/.claude/.credentials.json)
# These are used by resolve_anthropic_token() at runtime but were missing
# from this startup gate check.
try:
from agent.anthropic_adapter import read_claude_code_credentials, is_claude_code_token_valid
creds = read_claude_code_credentials()
if creds and (is_claude_code_token_valid(creds) or creds.get("refreshToken")):
return True
except Exception:
pass
return False
def _session_browse_picker(sessions: list) -> Optional[str]:
"""Interactive curses-based session browser with live search filtering.
Returns the selected session ID, or None if cancelled.
Uses curses (not simple_term_menu) to avoid the ghost-duplication rendering
bug in tmux/iTerm when arrow keys are used.
"""
if not sessions:
print("No sessions found.")
return None
# Try curses-based picker first
try:
import curses
result_holder = [None]
def _format_row(s, max_x):
"""Format a session row for display."""
title = (s.get("title") or "").strip()
preview = (s.get("preview") or "").strip()
source = s.get("source", "")[:6]
last_active = _relative_time(s.get("last_active"))
sid = s["id"][:18]
# Adaptive column widths based on terminal width
# Layout: [arrow 3] [title/preview flexible] [active 12] [src 6] [id 18]
fixed_cols = 3 + 12 + 6 + 18 + 6 # arrow + active + src + id + padding
name_width = max(20, max_x - fixed_cols)
if title:
name = title[:name_width]
elif preview:
name = preview[:name_width]
else:
name = sid
return f"{name:<{name_width}} {last_active:<10} {source:<5} {sid}"
def _match(s, query):
"""Check if a session matches the search query (case-insensitive)."""
q = query.lower()
return (
q in (s.get("title") or "").lower()
or q in (s.get("preview") or "").lower()
or q in s.get("id", "").lower()
or q in (s.get("source") or "").lower()
)
def _curses_browse(stdscr):
curses.curs_set(0)
if curses.has_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1) # selected
curses.init_pair(2, curses.COLOR_YELLOW, -1) # header
curses.init_pair(3, curses.COLOR_CYAN, -1) # search
curses.init_pair(4, 8, -1) # dim
cursor = 0
scroll_offset = 0
search_text = ""
filtered = list(sessions)
while True:
stdscr.clear()
max_y, max_x = stdscr.getmaxyx()
if max_y < 5 or max_x < 40:
# Terminal too small
try:
stdscr.addstr(0, 0, "Terminal too small")
except curses.error:
pass
stdscr.refresh()
stdscr.getch()
return
# Header line
if search_text:
header = f" Browse sessions — filter: {search_text}"
header_attr = curses.A_BOLD
if curses.has_colors():
header_attr |= curses.color_pair(3)
else:
header = " Browse sessions — ↑↓ navigate Enter select Type to filter Esc quit"
header_attr = curses.A_BOLD
if curses.has_colors():
header_attr |= curses.color_pair(2)
try:
stdscr.addnstr(0, 0, header, max_x - 1, header_attr)
except curses.error:
pass
# Column header line
fixed_cols = 3 + 12 + 6 + 18 + 6
name_width = max(20, max_x - fixed_cols)
col_header = f" {'Title / Preview':<{name_width}} {'Active':<10} {'Src':<5} {'ID'}"
try:
dim_attr = curses.color_pair(4) if curses.has_colors() else curses.A_DIM
stdscr.addnstr(1, 0, col_header, max_x - 1, dim_attr)
except curses.error:
pass
# Compute visible area
visible_rows = max_y - 4 # header + col header + blank + footer
if visible_rows < 1:
visible_rows = 1
# Clamp cursor and scroll
if not filtered:
try:
msg = " No sessions match the filter."
stdscr.addnstr(3, 0, msg, max_x - 1, curses.A_DIM)
except curses.error:
pass
else:
if cursor >= len(filtered):
cursor = len(filtered) - 1
if cursor < 0:
cursor = 0
if cursor < scroll_offset:
scroll_offset = cursor
elif cursor >= scroll_offset + visible_rows:
scroll_offset = cursor - visible_rows + 1
for draw_i, i in enumerate(range(
scroll_offset,
min(len(filtered), scroll_offset + visible_rows)
)):
y = draw_i + 3
if y >= max_y - 1:
break
s = filtered[i]
arrow = "" if i == cursor else " "
row = arrow + _format_row(s, max_x - 3)
attr = curses.A_NORMAL
if i == cursor:
attr = curses.A_BOLD
if curses.has_colors():
attr |= curses.color_pair(1)
try:
stdscr.addnstr(y, 0, row, max_x - 1, attr)
except curses.error:
pass
# Footer
footer_y = max_y - 1
if filtered:
footer = f" {cursor + 1}/{len(filtered)} sessions"
if len(filtered) < len(sessions):
footer += f" (filtered from {len(sessions)})"
else:
footer = f" 0/{len(sessions)} sessions"
try:
stdscr.addnstr(footer_y, 0, footer, max_x - 1,
curses.color_pair(4) if curses.has_colors() else curses.A_DIM)
except curses.error:
pass
stdscr.refresh()
key = stdscr.getch()
if key in (curses.KEY_UP, ):
if filtered:
cursor = (cursor - 1) % len(filtered)
elif key in (curses.KEY_DOWN, ):
if filtered:
cursor = (cursor + 1) % len(filtered)
elif key in (curses.KEY_ENTER, 10, 13):
if filtered:
result_holder[0] = filtered[cursor]["id"]
return
elif key == 27: # Esc
if search_text:
# First Esc clears the search
search_text = ""
filtered = list(sessions)
cursor = 0
scroll_offset = 0
else:
# Second Esc exits
return
elif key in (curses.KEY_BACKSPACE, 127, 8):
if search_text:
search_text = search_text[:-1]
if search_text:
filtered = [s for s in sessions if _match(s, search_text)]
else:
filtered = list(sessions)
cursor = 0
scroll_offset = 0
elif key == ord('q') and not search_text:
return
elif 32 <= key <= 126:
# Printable character → add to search filter
search_text += chr(key)
filtered = [s for s in sessions if _match(s, search_text)]
cursor = 0
scroll_offset = 0
curses.wrapper(_curses_browse)
return result_holder[0]
except Exception:
pass
# Fallback: numbered list (Windows without curses, etc.)
print("\n Browse sessions (enter number to resume, q to cancel)\n")
for i, s in enumerate(sessions):
title = (s.get("title") or "").strip()
preview = (s.get("preview") or "").strip()
label = title or preview or s["id"]
if len(label) > 50:
label = label[:47] + "..."
last_active = _relative_time(s.get("last_active"))
src = s.get("source", "")[:6]
print(f" {i + 1:>3}. {label:<50} {last_active:<10} {src}")
while True:
try:
val = input(f"\n Select [1-{len(sessions)}]: ").strip()
if not val or val.lower() in ("q", "quit", "exit"):
return None
idx = int(val) - 1
if 0 <= idx < len(sessions):
return sessions[idx]["id"]
print(f" Invalid selection. Enter 1-{len(sessions)} or q to cancel.")
except ValueError:
print(f" Invalid input. Enter a number or q to cancel.")
except (KeyboardInterrupt, EOFError):
print()
return None
def _resolve_last_cli_session() -> Optional[str]:
"""Look up the most recent CLI session ID from SQLite. Returns None if unavailable."""
try:
from hermes_state import SessionDB
db = SessionDB()
sessions = db.search_sessions(source="cli", limit=1)
db.close()
if sessions:
return sessions[0]["id"]
except Exception:
pass
return None
def _resolve_session_by_name_or_id(name_or_id: str) -> Optional[str]:
"""Resolve a session name (title) or ID to a session ID.
- If it looks like a session ID (contains underscore + hex), try direct lookup first.
- Otherwise, treat it as a title and use resolve_session_by_title (auto-latest).
- Falls back to the other method if the first doesn't match.
"""
try:
from hermes_state import SessionDB
db = SessionDB()
# Try as exact session ID first
session = db.get_session(name_or_id)
if session:
db.close()
return session["id"]
# Try as title (with auto-latest for lineage)
session_id = db.resolve_session_by_title(name_or_id)
db.close()
return session_id
except Exception:
pass
return None
def cmd_chat(args):
"""Run interactive chat CLI."""
# Resolve --continue into --resume with the latest CLI session or by name
continue_val = getattr(args, "continue_last", None)
if continue_val and not getattr(args, "resume", None):
if isinstance(continue_val, str):
# -c "session name" — resolve by title or ID
resolved = _resolve_session_by_name_or_id(continue_val)
if resolved:
args.resume = resolved
else:
print(f"No session found matching '{continue_val}'.")
print("Use 'hermes sessions list' to see available sessions.")
sys.exit(1)
else:
# -c with no argument — continue the most recent session
last_id = _resolve_last_cli_session()
if last_id:
args.resume = last_id
else:
print("No previous CLI session found to continue.")
sys.exit(1)
# Resolve --resume by title if it's not a direct session ID
resume_val = getattr(args, "resume", None)
if resume_val:
resolved = _resolve_session_by_name_or_id(resume_val)
if resolved:
args.resume = resolved
# If resolution fails, keep the original value — _init_agent will
# report "Session not found" with the original input
# First-run guard: check if any provider is configured before launching
if not _has_any_provider_configured():
print()
print("It looks like Hermes isn't configured yet -- no API keys or providers found.")
print()
print(" Run: hermes setup")
print()
from hermes_cli.setup import is_interactive_stdin, print_noninteractive_setup_guidance
if not is_interactive_stdin():
print_noninteractive_setup_guidance(
"No interactive TTY detected for the first-run setup prompt."
)
sys.exit(1)
try:
reply = input("Run setup now? [Y/n] ").strip().lower()
except (EOFError, KeyboardInterrupt):
reply = "n"
if reply in ("", "y", "yes"):
cmd_setup(args)
return
print()
print("You can run 'hermes setup' at any time to configure.")
sys.exit(1)
# Start update check in background (runs while other init happens)
try:
from hermes_cli.banner import prefetch_update_check
prefetch_update_check()
except Exception:
pass
# Sync bundled skills on every CLI launch (fast -- skips unchanged skills)
try:
from tools.skills_sync import sync_skills
sync_skills(quiet=True)
except Exception:
pass
# --yolo: bypass all dangerous command approvals
if getattr(args, "yolo", False):
os.environ["HERMES_YOLO_MODE"] = "1"
# Import and run the CLI
from cli import main as cli_main
# Build kwargs from args
kwargs = {
"model": args.model,
"provider": getattr(args, "provider", None),
"toolsets": args.toolsets,
"skills": getattr(args, "skills", None),
"verbose": args.verbose,
"quiet": getattr(args, "quiet", False),
"query": args.query,
"resume": getattr(args, "resume", None),
"worktree": getattr(args, "worktree", False),
"checkpoints": getattr(args, "checkpoints", False),
"pass_session_id": getattr(args, "pass_session_id", False),
}
# Filter out None values
kwargs = {k: v for k, v in kwargs.items() if v is not None}
try:
cli_main(**kwargs)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
def cmd_gateway(args):
"""Gateway management commands."""
from hermes_cli.gateway import gateway_command
gateway_command(args)
def cmd_whatsapp(args):
"""Set up WhatsApp: choose mode, configure, install bridge, pair via QR."""
import os
import subprocess
from pathlib import Path
from hermes_cli.config import get_env_value, save_env_value
print()
print("⚕ WhatsApp Setup")
print("=" * 50)
# ── Step 1: Choose mode ──────────────────────────────────────────────
current_mode = get_env_value("WHATSAPP_MODE") or ""
if not current_mode:
print()
print("How will you use WhatsApp with Hermes?")
print()
print(" 1. Separate bot number (recommended)")
print(" People message the bot's number directly — cleanest experience.")
print(" Requires a second phone number with WhatsApp installed on a device.")
print()
print(" 2. Personal number (self-chat)")
print(" You message yourself to talk to the agent.")
print(" Quick to set up, but the UX is less intuitive.")
print()
try:
choice = input(" Choose [1/2]: ").strip()
except (EOFError, KeyboardInterrupt):
print("\nSetup cancelled.")
return
if choice == "1":
save_env_value("WHATSAPP_MODE", "bot")
wa_mode = "bot"
print(" ✓ Mode: separate bot number")
print()
print(" ┌─────────────────────────────────────────────────┐")
print(" │ Getting a second number for the bot: │")
print(" │ │")
print(" │ Easiest: Install WhatsApp Business (free app) │")
print(" │ on your phone with a second number: │")
print(" │ • Dual-SIM: use your 2nd SIM slot │")
print(" │ • Google Voice: free US number (voice.google) │")
print(" │ • Prepaid SIM: $3-10, verify once │")
print(" │ │")
print(" │ WhatsApp Business runs alongside your personal │")
print(" │ WhatsApp — no second phone needed. │")
print(" └─────────────────────────────────────────────────┘")
else:
save_env_value("WHATSAPP_MODE", "self-chat")
wa_mode = "self-chat"
print(" ✓ Mode: personal number (self-chat)")
else:
wa_mode = current_mode
mode_label = "separate bot number" if wa_mode == "bot" else "personal number (self-chat)"
print(f"\n✓ Mode: {mode_label}")
# ── Step 2: Enable WhatsApp ──────────────────────────────────────────
print()
current = get_env_value("WHATSAPP_ENABLED")
if current and current.lower() == "true":
print("✓ WhatsApp is already enabled")
else:
save_env_value("WHATSAPP_ENABLED", "true")
print("✓ WhatsApp enabled")
# ── Step 3: Allowed users ────────────────────────────────────────────
current_users = get_env_value("WHATSAPP_ALLOWED_USERS") or ""
if current_users:
print(f"✓ Allowed users: {current_users}")
try:
response = input("\n Update allowed users? [y/N] ").strip()
except (EOFError, KeyboardInterrupt):
response = "n"
if response.lower() in ("y", "yes"):
if wa_mode == "bot":
phone = input(" Phone numbers that can message the bot (comma-separated): ").strip()
else:
phone = input(" Your phone number (e.g. 15551234567): ").strip()
if phone:
save_env_value("WHATSAPP_ALLOWED_USERS", phone.replace(" ", ""))
print(f" ✓ Updated to: {phone}")
else:
print()
if wa_mode == "bot":
print(" Who should be allowed to message the bot?")
phone = input(" Phone numbers (comma-separated, or * for anyone): ").strip()
else:
phone = input(" Your phone number (e.g. 15551234567): ").strip()
if phone:
save_env_value("WHATSAPP_ALLOWED_USERS", phone.replace(" ", ""))
print(f" ✓ Allowed users set: {phone}")
else:
print(" ⚠ No allowlist — the agent will respond to ALL incoming messages")
# ── Step 4: Install bridge dependencies ──────────────────────────────
project_root = Path(__file__).resolve().parents[1]
bridge_dir = project_root / "scripts" / "whatsapp-bridge"
bridge_script = bridge_dir / "bridge.js"
if not bridge_script.exists():
print(f"\n✗ Bridge script not found at {bridge_script}")
return
if not (bridge_dir / "node_modules").exists():
print("\n→ Installing WhatsApp bridge dependencies...")
result = subprocess.run(
["npm", "install"],
cwd=str(bridge_dir),
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
print(f" ✗ npm install failed: {result.stderr}")
return
print(" ✓ Dependencies installed")
else:
print("✓ Bridge dependencies already installed")
# ── Step 5: Check for existing session ───────────────────────────────
session_dir = get_hermes_home() / "whatsapp" / "session"
session_dir.mkdir(parents=True, exist_ok=True)
if (session_dir / "creds.json").exists():
print("✓ Existing WhatsApp session found")
try:
response = input("\n Re-pair? This will clear the existing session. [y/N] ").strip()
except (EOFError, KeyboardInterrupt):
response = "n"
if response.lower() in ("y", "yes"):
import shutil
shutil.rmtree(session_dir, ignore_errors=True)
session_dir.mkdir(parents=True, exist_ok=True)
print(" ✓ Session cleared")
else:
print("\n✓ WhatsApp is configured and paired!")
print(" Start the gateway with: hermes gateway")
return
# ── Step 6: QR code pairing ──────────────────────────────────────────
print()
print("" * 50)
if wa_mode == "bot":
print("📱 Open WhatsApp (or WhatsApp Business) on the")
print(" phone with the BOT's number, then scan:")
else:
print("📱 Open WhatsApp on your phone, then scan:")
print()
print(" Settings → Linked Devices → Link a Device")
print("" * 50)
print()
try:
subprocess.run(
["node", str(bridge_script), "--pair-only", "--session", str(session_dir)],
cwd=str(bridge_dir),
)
except KeyboardInterrupt:
pass
# ── Step 7: Post-pairing ─────────────────────────────────────────────
print()
if (session_dir / "creds.json").exists():
print("✓ WhatsApp paired successfully!")
print()
if wa_mode == "bot":
print(" Next steps:")
print(" 1. Start the gateway: hermes gateway")
print(" 2. Send a message to the bot's WhatsApp number")
print(" 3. The agent will reply automatically")
print()
print(" Tip: Agent responses are prefixed with '⚕ Hermes Agent'")
else:
print(" Next steps:")
print(" 1. Start the gateway: hermes gateway")
print(" 2. Open WhatsApp → Message Yourself")
print(" 3. Type a message — the agent will reply")
print()
print(" Tip: Agent responses are prefixed with '⚕ Hermes Agent'")
print(" so you can tell them apart from your own messages.")
print()
print(" Or install as a service: hermes gateway install")
else:
print("⚠ Pairing may not have completed. Run 'hermes whatsapp' to try again.")
def cmd_setup(args):
"""Interactive setup wizard."""
from hermes_cli.setup import run_setup_wizard
run_setup_wizard(args)
def cmd_model(args):
"""Select default model — starts with provider selection, then model picker."""
from hermes_cli.auth import (
resolve_provider, get_provider_auth_state, PROVIDER_REGISTRY,
_prompt_model_selection, _save_model_choice, _update_config_for_provider,
resolve_nous_runtime_credentials, fetch_nous_models, AuthError, format_auth_error,
_login_nous,
)
from hermes_cli.config import load_config, save_config, get_env_value, save_env_value
config = load_config()
current_model = config.get("model")
if isinstance(current_model, dict):
current_model = current_model.get("default", "")
current_model = current_model or "(not set)"
# Read effective provider the same way the CLI does at startup:
# config.yaml model.provider > env var > auto-detect
import os
config_provider = None
model_cfg = config.get("model")
if isinstance(model_cfg, dict):
config_provider = model_cfg.get("provider")
effective_provider = (
config_provider
or os.getenv("HERMES_INFERENCE_PROVIDER")
or "auto"
)
try:
active = resolve_provider(effective_provider)
except AuthError as exc:
warning = format_auth_error(exc)
print(f"Warning: {warning} Falling back to auto provider detection.")
active = resolve_provider("auto")
# Detect custom endpoint
if active == "openrouter" and get_env_value("OPENAI_BASE_URL"):
active = "custom"
provider_labels = {
"openrouter": "OpenRouter",
"nous": "Nous Portal",
"openai-codex": "OpenAI Codex",
"copilot-acp": "GitHub Copilot ACP",
"copilot": "GitHub Copilot",
"anthropic": "Anthropic",
"zai": "Z.AI / GLM",
"kimi-coding": "Kimi / Moonshot",
"minimax": "MiniMax",
"minimax-cn": "MiniMax (China)",
"opencode-zen": "OpenCode Zen",
"opencode-go": "OpenCode Go",
"ai-gateway": "AI Gateway",
"kilocode": "Kilo Code",
"alibaba": "Alibaba Cloud (DashScope)",
"custom": "Custom endpoint",
}
active_label = provider_labels.get(active, active)
print()
print(f" Current model: {current_model}")
print(f" Active provider: {active_label}")
print()
# Step 1: Provider selection — put active provider first with marker
providers = [
("openrouter", "OpenRouter (100+ models, pay-per-use)"),
("nous", "Nous Portal (Nous Research subscription)"),
("openai-codex", "OpenAI Codex"),
("copilot-acp", "GitHub Copilot ACP (spawns `copilot --acp --stdio`)"),
("copilot", "GitHub Copilot (uses GITHUB_TOKEN or gh auth token)"),
("anthropic", "Anthropic (Claude models — API key or Claude Code)"),
("zai", "Z.AI / GLM (Zhipu AI direct API)"),
("kimi-coding", "Kimi / Moonshot (Moonshot AI direct API)"),
("minimax", "MiniMax (global direct API)"),
("minimax-cn", "MiniMax China (domestic direct API)"),
("kilocode", "Kilo Code (Kilo Gateway API)"),
("opencode-zen", "OpenCode Zen (35+ curated models, pay-as-you-go)"),
("opencode-go", "OpenCode Go (open models, $10/month subscription)"),
("ai-gateway", "AI Gateway (Vercel — 200+ models, pay-per-use)"),
("alibaba", "Alibaba Cloud / DashScope (Qwen models, Anthropic-compatible)"),
]
# Add user-defined custom providers from config.yaml
custom_providers_cfg = config.get("custom_providers") or []
_custom_provider_map = {} # key → {name, base_url, api_key}
if isinstance(custom_providers_cfg, list):
for entry in custom_providers_cfg:
if not isinstance(entry, dict):
continue
name = entry.get("name", "").strip()
base_url = entry.get("base_url", "").strip()
if not name or not base_url:
continue
# Generate a stable key from the name
key = "custom:" + name.lower().replace(" ", "-")
short_url = base_url.replace("https://", "").replace("http://", "").rstrip("/")
saved_model = entry.get("model", "")
model_hint = f"{saved_model}" if saved_model else ""
providers.append((key, f"{name} ({short_url}){model_hint}"))
_custom_provider_map[key] = {
"name": name,
"base_url": base_url,
"api_key": entry.get("api_key", ""),
"model": saved_model,
}
# Always add the manual custom endpoint option last
providers.append(("custom", "Custom endpoint (enter URL manually)"))
# Add removal option if there are saved custom providers
if _custom_provider_map:
providers.append(("remove-custom", "Remove a saved custom provider"))
# Reorder so the active provider is at the top
known_keys = {k for k, _ in providers}
active_key = active if active in known_keys else "custom"
ordered = []
for key, label in providers:
if key == active_key:
ordered.insert(0, (key, f"{label} ← currently active"))
else:
ordered.append((key, label))
ordered.append(("cancel", "Cancel"))
provider_idx = _prompt_provider_choice([label for _, label in ordered])
if provider_idx is None or ordered[provider_idx][0] == "cancel":
print("No change.")
return
selected_provider = ordered[provider_idx][0]
# Step 2: Provider-specific setup + model selection
if selected_provider == "openrouter":
_model_flow_openrouter(config, current_model)
elif selected_provider == "nous":
_model_flow_nous(config, current_model)
elif selected_provider == "openai-codex":
_model_flow_openai_codex(config, current_model)
elif selected_provider == "copilot-acp":
_model_flow_copilot_acp(config, current_model)
elif selected_provider == "copilot":
_model_flow_copilot(config, current_model)
elif selected_provider == "custom":
_model_flow_custom(config)
elif selected_provider.startswith("custom:") and selected_provider in _custom_provider_map:
_model_flow_named_custom(config, _custom_provider_map[selected_provider])
elif selected_provider == "remove-custom":
_remove_custom_provider(config)
elif selected_provider == "anthropic":
_model_flow_anthropic(config, current_model)
elif selected_provider == "kimi-coding":
_model_flow_kimi(config, current_model)
elif selected_provider in ("zai", "minimax", "minimax-cn", "kilocode", "opencode-zen", "opencode-go", "ai-gateway", "alibaba"):
_model_flow_api_key_provider(config, selected_provider, current_model)
def _prompt_provider_choice(choices):
"""Show provider selection menu. Returns index or None."""
try:
from simple_term_menu import TerminalMenu
menu_items = [f" {c}" for c in choices]
menu = TerminalMenu(
menu_items, cursor_index=0,
menu_cursor="-> ", menu_cursor_style=("fg_green", "bold"),
menu_highlight_style=("fg_green",),
cycle_cursor=True, clear_screen=False,
title="Select provider:",
)
idx = menu.show()
print()
return idx
except (ImportError, NotImplementedError):
pass
# Fallback: numbered list
print("Select provider:")
for i, c in enumerate(choices, 1):
print(f" {i}. {c}")
print()
while True:
try:
val = input(f"Choice [1-{len(choices)}]: ").strip()
if not val:
return None
idx = int(val) - 1
if 0 <= idx < len(choices):
return idx
print(f"Please enter 1-{len(choices)}")
except ValueError:
print("Please enter a number")
except (KeyboardInterrupt, EOFError):
print()
return None
def _model_flow_openrouter(config, current_model=""):
"""OpenRouter provider: ensure API key, then pick model."""
from hermes_cli.auth import _prompt_model_selection, _save_model_choice, deactivate_provider
from hermes_cli.config import get_env_value, save_env_value
api_key = get_env_value("OPENROUTER_API_KEY")
if not api_key:
print("No OpenRouter API key configured.")
print("Get one at: https://openrouter.ai/keys")
print()
try:
key = input("OpenRouter API key (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not key:
print("Cancelled.")
return
save_env_value("OPENROUTER_API_KEY", key)
print("API key saved.")
print()
from hermes_cli.models import model_ids
openrouter_models = model_ids()
selected = _prompt_model_selection(openrouter_models, current_model=current_model)
if selected:
# Clear any custom endpoint and set provider to openrouter
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
_save_model_choice(selected)
# Update config provider and deactivate any OAuth provider
from hermes_cli.config import load_config, save_config
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "openrouter"
model["base_url"] = OPENROUTER_BASE_URL
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via OpenRouter)")
else:
print("No change.")
def _model_flow_nous(config, current_model=""):
"""Nous Portal provider: ensure logged in, then pick model."""
from hermes_cli.auth import (
get_provider_auth_state, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, resolve_nous_runtime_credentials,
fetch_nous_models, AuthError, format_auth_error,
_login_nous, PROVIDER_REGISTRY,
)
from hermes_cli.config import get_env_value, save_env_value
import argparse
state = get_provider_auth_state("nous")
if not state or not state.get("access_token"):
print("Not logged into Nous Portal. Starting login...")
print()
try:
mock_args = argparse.Namespace(
portal_url=None, inference_url=None, client_id=None,
scope=None, no_browser=False, timeout=15.0,
ca_bundle=None, insecure=False,
)
_login_nous(mock_args, PROVIDER_REGISTRY["nous"])
except SystemExit:
print("Login cancelled or failed.")
return
except Exception as exc:
print(f"Login failed: {exc}")
return
# login_nous already handles model selection + config update
return
# Already logged in — fetch models and select
print("Fetching models from Nous Portal...")
try:
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
model_ids = fetch_nous_models(
inference_base_url=creds.get("base_url", ""),
api_key=creds.get("api_key", ""),
)
except Exception as exc:
relogin = isinstance(exc, AuthError) and exc.relogin_required
msg = format_auth_error(exc) if isinstance(exc, AuthError) else str(exc)
if relogin:
print(f"Session expired: {msg}")
print("Re-authenticating with Nous Portal...\n")
try:
mock_args = argparse.Namespace(
portal_url=None, inference_url=None, client_id=None,
scope=None, no_browser=False, timeout=15.0,
ca_bundle=None, insecure=False,
)
_login_nous(mock_args, PROVIDER_REGISTRY["nous"])
except Exception as login_exc:
print(f"Re-login failed: {login_exc}")
return
print(f"Could not fetch models: {msg}")
return
if not model_ids:
print("No models returned by the inference API.")
return
selected = _prompt_model_selection(model_ids, current_model=current_model)
if selected:
_save_model_choice(selected)
# Reactivate Nous as the provider and update config
inference_url = creds.get("base_url", "")
_update_config_for_provider("nous", inference_url)
# Clear any custom endpoint that might conflict
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
print(f"Default model set to: {selected} (via Nous Portal)")
else:
print("No change.")
def _model_flow_openai_codex(config, current_model=""):
"""OpenAI Codex provider: ensure logged in, then pick model."""
from hermes_cli.auth import (
get_codex_auth_status, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, _login_openai_codex,
PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
)
from hermes_cli.codex_models import get_codex_model_ids
from hermes_cli.config import get_env_value, save_env_value
import argparse
status = get_codex_auth_status()
if not status.get("logged_in"):
print("Not logged into OpenAI Codex. Starting login...")
print()
try:
mock_args = argparse.Namespace()
_login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
except SystemExit:
print("Login cancelled or failed.")
return
except Exception as exc:
print(f"Login failed: {exc}")
return
_codex_token = None
try:
from hermes_cli.auth import resolve_codex_runtime_credentials
_codex_creds = resolve_codex_runtime_credentials()
_codex_token = _codex_creds.get("api_key")
except Exception:
pass
codex_models = get_codex_model_ids(access_token=_codex_token)
selected = _prompt_model_selection(codex_models, current_model=current_model)
if selected:
_save_model_choice(selected)
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
# Clear custom endpoint env vars that would otherwise override Codex.
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
print(f"Default model set to: {selected} (via OpenAI Codex)")
else:
print("No change.")
def _model_flow_custom(config):
"""Custom endpoint: collect URL, API key, and model name.
Automatically saves the endpoint to ``custom_providers`` in config.yaml
so it appears in the provider menu on subsequent runs.
"""
from hermes_cli.auth import _save_model_choice, deactivate_provider
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
current_url = get_env_value("OPENAI_BASE_URL") or ""
current_key = get_env_value("OPENAI_API_KEY") or ""
print("Custom OpenAI-compatible endpoint configuration:")
if current_url:
print(f" Current URL: {current_url}")
if current_key:
print(f" Current key: {current_key[:8]}...")
print()
try:
base_url = input(f"API base URL [{current_url or 'e.g. https://api.example.com/v1'}]: ").strip()
api_key = input(f"API key [{current_key[:8] + '...' if current_key else 'optional'}]: ").strip()
model_name = input("Model name (e.g. gpt-4, llama-3-70b): ").strip()
context_length_str = input("Context length in tokens [leave blank for auto-detect]: ").strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
context_length = None
if context_length_str:
try:
context_length = int(context_length_str.replace(",", "").replace("k", "000").replace("K", "000"))
if context_length <= 0:
context_length = None
except ValueError:
print(f"Invalid context length: {context_length_str} — will auto-detect.")
context_length = None
if not base_url and not current_url:
print("No URL provided. Cancelled.")
return
# Validate URL format
effective_url = base_url or current_url
if not effective_url.startswith(("http://", "https://")):
print(f"Invalid URL: {effective_url} (must start with http:// or https://)")
return
effective_key = api_key or current_key
from hermes_cli.models import probe_api_models
probe = probe_api_models(effective_key, effective_url)
if probe.get("used_fallback") and probe.get("resolved_base_url"):
print(
f"Warning: endpoint verification worked at {probe['resolved_base_url']}/models, "
f"not the exact URL you entered. Saving the working base URL instead."
)
effective_url = probe["resolved_base_url"]
if base_url:
base_url = effective_url
elif probe.get("models") is not None:
print(
f"Verified endpoint via {probe.get('probed_url')} "
f"({len(probe.get('models') or [])} model(s) visible)"
)
else:
print(
f"Warning: could not verify this endpoint via {probe.get('probed_url')}. "
f"Hermes will still save it."
)
if probe.get("suggested_base_url"):
print(f" If this server expects /v1, try base URL: {probe['suggested_base_url']}")
if base_url:
save_env_value("OPENAI_BASE_URL", effective_url)
if api_key:
save_env_value("OPENAI_API_KEY", api_key)
if model_name:
_save_model_choice(model_name)
# Update config and deactivate any OAuth provider
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "custom"
model["base_url"] = effective_url
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {model_name} (via {effective_url})")
else:
if base_url or api_key:
deactivate_provider()
print("Endpoint saved. Use `/model` in chat or `hermes model` to set a model.")
# Auto-save to custom_providers so it appears in the menu next time
_save_custom_provider(effective_url, effective_key, model_name or "", context_length=context_length)
def _save_custom_provider(base_url, api_key="", model="", context_length=None):
"""Save a custom endpoint to custom_providers in config.yaml.
Deduplicates by base_url — if the URL already exists, updates the
model name and context_length but doesn't add a duplicate entry.
Auto-generates a display name from the URL hostname.
"""
from hermes_cli.config import load_config, save_config
cfg = load_config()
providers = cfg.get("custom_providers") or []
if not isinstance(providers, list):
providers = []
# Check if this URL is already saved — update model/context_length if so
for entry in providers:
if isinstance(entry, dict) and entry.get("base_url", "").rstrip("/") == base_url.rstrip("/"):
changed = False
if model and entry.get("model") != model:
entry["model"] = model
changed = True
if model and context_length:
models_cfg = entry.get("models", {})
if not isinstance(models_cfg, dict):
models_cfg = {}
models_cfg[model] = {"context_length": context_length}
entry["models"] = models_cfg
changed = True
if changed:
cfg["custom_providers"] = providers
save_config(cfg)
return # already saved, updated if needed
# Auto-generate a name from the URL
import re
clean = base_url.replace("https://", "").replace("http://", "").rstrip("/")
# Remove /v1 suffix for cleaner names
clean = re.sub(r"/v1/?$", "", clean)
# Use hostname:port as the name
name = clean.split("/")[0]
# Capitalize for readability
if "localhost" in name or "127.0.0.1" in name:
name = f"Local ({name})"
elif "runpod" in name.lower():
name = f"RunPod ({name})"
else:
name = name.capitalize()
entry = {"name": name, "base_url": base_url}
if api_key:
entry["api_key"] = api_key
if model:
entry["model"] = model
if model and context_length:
entry["models"] = {model: {"context_length": context_length}}
providers.append(entry)
cfg["custom_providers"] = providers
save_config(cfg)
print(f" 💾 Saved to custom providers as \"{name}\" (edit in config.yaml)")
def _remove_custom_provider(config):
"""Let the user remove a saved custom provider from config.yaml."""
from hermes_cli.config import load_config, save_config
cfg = load_config()
providers = cfg.get("custom_providers") or []
if not isinstance(providers, list) or not providers:
print("No custom providers configured.")
return
print("Remove a custom provider:\n")
choices = []
for entry in providers:
if isinstance(entry, dict):
name = entry.get("name", "unnamed")
url = entry.get("base_url", "")
short_url = url.replace("https://", "").replace("http://", "").rstrip("/")
choices.append(f"{name} ({short_url})")
else:
choices.append(str(entry))
choices.append("Cancel")
try:
from simple_term_menu import TerminalMenu
menu = TerminalMenu(
[f" {c}" for c in choices], cursor_index=0,
menu_cursor="-> ", menu_cursor_style=("fg_red", "bold"),
menu_highlight_style=("fg_red",),
cycle_cursor=True, clear_screen=False,
title="Select provider to remove:",
)
idx = menu.show()
print()
except (ImportError, NotImplementedError):
for i, c in enumerate(choices, 1):
print(f" {i}. {c}")
print()
try:
val = input(f"Choice [1-{len(choices)}]: ").strip()
idx = int(val) - 1 if val else None
except (ValueError, KeyboardInterrupt, EOFError):
idx = None
if idx is None or idx >= len(providers):
print("No change.")
return
removed = providers.pop(idx)
cfg["custom_providers"] = providers
save_config(cfg)
removed_name = removed.get("name", "unnamed") if isinstance(removed, dict) else str(removed)
print(f"✅ Removed \"{removed_name}\" from custom providers.")
def _model_flow_named_custom(config, provider_info):
"""Handle a named custom provider from config.yaml custom_providers list.
If the entry has a saved model name, activates it immediately.
Otherwise probes the endpoint's /models API to let the user pick one.
"""
from hermes_cli.auth import _save_model_choice, deactivate_provider
from hermes_cli.config import save_env_value, load_config, save_config
from hermes_cli.models import fetch_api_models
name = provider_info["name"]
base_url = provider_info["base_url"]
api_key = provider_info.get("api_key", "")
saved_model = provider_info.get("model", "")
# If a model is saved, just activate immediately — no probing needed
if saved_model:
save_env_value("OPENAI_BASE_URL", base_url)
if api_key:
save_env_value("OPENAI_API_KEY", api_key)
_save_model_choice(saved_model)
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "custom"
model["base_url"] = base_url
save_config(cfg)
deactivate_provider()
print(f"✅ Switched to: {saved_model}")
print(f" Provider: {name} ({base_url})")
return
# No saved model — probe endpoint and let user pick
print(f" Provider: {name}")
print(f" URL: {base_url}")
print()
print("No model saved for this provider. Fetching available models...")
models = fetch_api_models(api_key, base_url, timeout=8.0)
if models:
print(f"Found {len(models)} model(s):\n")
try:
from simple_term_menu import TerminalMenu
menu_items = [f" {m}" for m in models] + [" Cancel"]
menu = TerminalMenu(
menu_items, cursor_index=0,
menu_cursor="-> ", menu_cursor_style=("fg_green", "bold"),
menu_highlight_style=("fg_green",),
cycle_cursor=True, clear_screen=False,
title=f"Select model from {name}:",
)
idx = menu.show()
print()
if idx is None or idx >= len(models):
print("Cancelled.")
return
model_name = models[idx]
except (ImportError, NotImplementedError):
for i, m in enumerate(models, 1):
print(f" {i}. {m}")
print(f" {len(models) + 1}. Cancel")
print()
try:
val = input(f"Choice [1-{len(models) + 1}]: ").strip()
if not val:
print("Cancelled.")
return
idx = int(val) - 1
if idx < 0 or idx >= len(models):
print("Cancelled.")
return
model_name = models[idx]
except (ValueError, KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
else:
print("Could not fetch models from endpoint. Enter model name manually.")
try:
model_name = input("Model name: ").strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
if not model_name:
print("No model specified. Cancelled.")
return
# Activate and save the model to the custom_providers entry
save_env_value("OPENAI_BASE_URL", base_url)
if api_key:
save_env_value("OPENAI_API_KEY", api_key)
_save_model_choice(model_name)
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "custom"
model["base_url"] = base_url
save_config(cfg)
deactivate_provider()
# Save model name to the custom_providers entry for next time
_save_custom_provider(base_url, api_key, model_name)
print(f"\n✅ Model set to: {model_name}")
print(f" Provider: {name} ({base_url})")
# Curated model lists for direct API-key providers
_PROVIDER_MODELS = {
"copilot-acp": [
"copilot-acp",
],
"copilot": [
"gpt-5.4",
"gpt-5.4-mini",
"gpt-5-mini",
"gpt-5.3-codex",
"gpt-5.2-codex",
"gpt-4.1",
"gpt-4o",
"gpt-4o-mini",
"claude-opus-4.6",
"claude-sonnet-4.6",
"claude-sonnet-4.5",
"claude-haiku-4.5",
"gemini-2.5-pro",
"grok-code-fast-1",
],
"zai": [
"glm-5",
"glm-4.7",
"glm-4.5",
"glm-4.5-flash",
],
"kimi-coding": [
"kimi-for-coding",
"kimi-k2.5",
"kimi-k2-thinking",
"kimi-k2-thinking-turbo",
"kimi-k2-turbo-preview",
"kimi-k2-0905-preview",
],
"moonshot": [
"kimi-k2.5",
"kimi-k2-thinking",
"kimi-k2-turbo-preview",
"kimi-k2-0905-preview",
],
"minimax": [
"MiniMax-M2.5",
"MiniMax-M2.5-highspeed",
"MiniMax-M2.1",
],
"minimax-cn": [
"MiniMax-M2.5",
"MiniMax-M2.5-highspeed",
"MiniMax-M2.1",
],
"kilocode": [
"anthropic/claude-opus-4.6",
"anthropic/claude-sonnet-4.6",
"openai/gpt-5.4",
"google/gemini-3-pro-preview",
"google/gemini-3-flash-preview",
],
}
def _current_reasoning_effort(config) -> str:
agent_cfg = config.get("agent")
if isinstance(agent_cfg, dict):
return str(agent_cfg.get("reasoning_effort") or "").strip().lower()
return ""
def _set_reasoning_effort(config, effort: str) -> None:
agent_cfg = config.get("agent")
if not isinstance(agent_cfg, dict):
agent_cfg = {}
config["agent"] = agent_cfg
agent_cfg["reasoning_effort"] = effort
def _prompt_reasoning_effort_selection(efforts, current_effort=""):
"""Prompt for a reasoning effort. Returns effort, 'none', or None to keep current."""
ordered = list(dict.fromkeys(str(effort).strip().lower() for effort in efforts if str(effort).strip()))
if not ordered:
return None
def _label(effort):
if effort == current_effort:
return f"{effort} ← currently in use"
return effort
disable_label = "Disable reasoning"
skip_label = "Skip (keep current)"
if current_effort == "none":
default_idx = len(ordered)
elif current_effort in ordered:
default_idx = ordered.index(current_effort)
elif "medium" in ordered:
default_idx = ordered.index("medium")
else:
default_idx = 0
try:
from simple_term_menu import TerminalMenu
choices = [f" {_label(effort)}" for effort in ordered]
choices.append(f" {disable_label}")
choices.append(f" {skip_label}")
menu = TerminalMenu(
choices,
cursor_index=default_idx,
menu_cursor="-> ",
menu_cursor_style=("fg_green", "bold"),
menu_highlight_style=("fg_green",),
cycle_cursor=True,
clear_screen=False,
title="Select reasoning effort:",
)
idx = menu.show()
if idx is None:
return None
print()
if idx < len(ordered):
return ordered[idx]
if idx == len(ordered):
return "none"
return None
except (ImportError, NotImplementedError):
pass
print("Select reasoning effort:")
for i, effort in enumerate(ordered, 1):
print(f" {i}. {_label(effort)}")
n = len(ordered)
print(f" {n + 1}. {disable_label}")
print(f" {n + 2}. {skip_label}")
print()
while True:
try:
choice = input(f"Choice [1-{n + 2}] (default: keep current): ").strip()
if not choice:
return None
idx = int(choice)
if 1 <= idx <= n:
return ordered[idx - 1]
if idx == n + 1:
return "none"
if idx == n + 2:
return None
print(f"Please enter 1-{n + 2}")
except ValueError:
print("Please enter a number")
except (KeyboardInterrupt, EOFError):
return None
def _model_flow_copilot(config, current_model=""):
"""GitHub Copilot flow using env vars, gh CLI, or OAuth device code."""
from hermes_cli.auth import (
PROVIDER_REGISTRY,
_prompt_model_selection,
_save_model_choice,
deactivate_provider,
resolve_api_key_provider_credentials,
)
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
from hermes_cli.models import (
fetch_api_models,
fetch_github_model_catalog,
github_model_reasoning_efforts,
copilot_model_api_mode,
normalize_copilot_model_id,
)
provider_id = "copilot"
pconfig = PROVIDER_REGISTRY[provider_id]
creds = resolve_api_key_provider_credentials(provider_id)
api_key = creds.get("api_key", "")
source = creds.get("source", "")
if not api_key:
print("No GitHub token configured for GitHub Copilot.")
print()
print(" Supported token types:")
print(" → OAuth token (gho_*) via `copilot login` or device code flow")
print(" → Fine-grained PAT (github_pat_*) with Copilot Requests permission")
print(" → GitHub App token (ghu_*) via environment variable")
print(" ✗ Classic PAT (ghp_*) NOT supported by Copilot API")
print()
print(" Options:")
print(" 1. Login with GitHub (OAuth device code flow)")
print(" 2. Enter a token manually")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1-3]: ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if choice == "1":
try:
from hermes_cli.copilot_auth import copilot_device_code_login
token = copilot_device_code_login()
if token:
save_env_value("COPILOT_GITHUB_TOKEN", token)
print(" Copilot token saved.")
print()
else:
print(" Login cancelled or failed.")
return
except Exception as exc:
print(f" Login failed: {exc}")
return
elif choice == "2":
try:
new_key = input(" Token (COPILOT_GITHUB_TOKEN): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not new_key:
print(" Cancelled.")
return
# Validate token type
try:
from hermes_cli.copilot_auth import validate_copilot_token
valid, msg = validate_copilot_token(new_key)
if not valid:
print(f"{msg}")
return
except ImportError:
pass
save_env_value("COPILOT_GITHUB_TOKEN", new_key)
print(" Token saved.")
print()
else:
print(" Cancelled.")
return
creds = resolve_api_key_provider_credentials(provider_id)
api_key = creds.get("api_key", "")
source = creds.get("source", "")
else:
if source in ("GITHUB_TOKEN", "GH_TOKEN"):
print(f" GitHub token: {api_key[:8]}... ✓ ({source})")
elif source == "gh auth token":
print(" GitHub token: ✓ (from `gh auth token`)")
else:
print(" GitHub token: ✓")
print()
effective_base = pconfig.inference_base_url
catalog = fetch_github_model_catalog(api_key)
live_models = [item.get("id", "") for item in catalog if item.get("id")] if catalog else fetch_api_models(api_key, effective_base)
normalized_current_model = normalize_copilot_model_id(
current_model,
catalog=catalog,
api_key=api_key,
) or current_model
if live_models:
model_list = [model_id for model_id in live_models if model_id]
print(f" Found {len(model_list)} model(s) from GitHub Copilot")
else:
model_list = _PROVIDER_MODELS.get(provider_id, [])
if model_list:
print(" ⚠ Could not auto-detect models from GitHub Copilot — showing defaults.")
print(' Use "Enter custom model name" if you do not see your model.')
if model_list:
selected = _prompt_model_selection(model_list, current_model=normalized_current_model)
else:
try:
selected = input("Model name: ").strip()
except (KeyboardInterrupt, EOFError):
selected = None
if selected:
selected = normalize_copilot_model_id(
selected,
catalog=catalog,
api_key=api_key,
) or selected
# Clear stale custom-endpoint overrides so the Copilot provider wins cleanly.
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
initial_cfg = load_config()
current_effort = _current_reasoning_effort(initial_cfg)
reasoning_efforts = github_model_reasoning_efforts(
selected,
catalog=catalog,
api_key=api_key,
)
selected_effort = None
if reasoning_efforts:
print(f" {selected} supports reasoning controls.")
selected_effort = _prompt_reasoning_effort_selection(
reasoning_efforts, current_effort=current_effort
)
_save_model_choice(selected)
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
model["api_mode"] = copilot_model_api_mode(
selected,
catalog=catalog,
api_key=api_key,
)
if selected_effort is not None:
_set_reasoning_effort(cfg, selected_effort)
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via {pconfig.name})")
if reasoning_efforts:
if selected_effort == "none":
print("Reasoning disabled for this model.")
elif selected_effort:
print(f"Reasoning effort set to: {selected_effort}")
else:
print("No change.")
def _model_flow_copilot_acp(config, current_model=""):
"""GitHub Copilot ACP flow using the local Copilot CLI."""
from hermes_cli.auth import (
PROVIDER_REGISTRY,
_prompt_model_selection,
_save_model_choice,
deactivate_provider,
get_external_process_provider_status,
resolve_api_key_provider_credentials,
resolve_external_process_provider_credentials,
)
from hermes_cli.models import (
fetch_github_model_catalog,
normalize_copilot_model_id,
)
from hermes_cli.config import load_config, save_config
del config
provider_id = "copilot-acp"
pconfig = PROVIDER_REGISTRY[provider_id]
status = get_external_process_provider_status(provider_id)
resolved_command = status.get("resolved_command") or status.get("command") or "copilot"
effective_base = status.get("base_url") or pconfig.inference_base_url
print(" GitHub Copilot ACP delegates Hermes turns to `copilot --acp`.")
print(" Hermes currently starts its own ACP subprocess for each request.")
print(" Hermes uses your selected model as a hint for the Copilot ACP session.")
print(f" Command: {resolved_command}")
print(f" Backend marker: {effective_base}")
print()
try:
creds = resolve_external_process_provider_credentials(provider_id)
except Exception as exc:
print(f"{exc}")
print(" Set HERMES_COPILOT_ACP_COMMAND or COPILOT_CLI_PATH if Copilot CLI is installed elsewhere.")
return
effective_base = creds.get("base_url") or effective_base
catalog_api_key = ""
try:
catalog_creds = resolve_api_key_provider_credentials("copilot")
catalog_api_key = catalog_creds.get("api_key", "")
except Exception:
pass
catalog = fetch_github_model_catalog(catalog_api_key)
normalized_current_model = normalize_copilot_model_id(
current_model,
catalog=catalog,
api_key=catalog_api_key,
) or current_model
if catalog:
model_list = [item.get("id", "") for item in catalog if item.get("id")]
print(f" Found {len(model_list)} model(s) from GitHub Copilot")
else:
model_list = _PROVIDER_MODELS.get("copilot", [])
if model_list:
print(" ⚠ Could not auto-detect models from GitHub Copilot — showing defaults.")
print(' Use "Enter custom model name" if you do not see your model.')
if model_list:
selected = _prompt_model_selection(
model_list,
current_model=normalized_current_model,
)
else:
try:
selected = input("Model name: ").strip()
except (KeyboardInterrupt, EOFError):
selected = None
if not selected:
print("No change.")
return
selected = normalize_copilot_model_id(
selected,
catalog=catalog,
api_key=catalog_api_key,
) or selected
_save_model_choice(selected)
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via {pconfig.name})")
def _model_flow_kimi(config, current_model=""):
"""Kimi / Moonshot model selection with automatic endpoint routing.
- sk-kimi-* keys → api.kimi.com/coding/v1 (Kimi Coding Plan)
- Other keys → api.moonshot.ai/v1 (legacy Moonshot)
No manual base URL prompt — endpoint is determined by key prefix.
"""
from hermes_cli.auth import (
PROVIDER_REGISTRY, KIMI_CODE_BASE_URL, _prompt_model_selection,
_save_model_choice, deactivate_provider,
)
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
provider_id = "kimi-coding"
pconfig = PROVIDER_REGISTRY[provider_id]
key_env = pconfig.api_key_env_vars[0] if pconfig.api_key_env_vars else ""
base_url_env = pconfig.base_url_env_var or ""
# Step 1: Check / prompt for API key
existing_key = ""
for ev in pconfig.api_key_env_vars:
existing_key = get_env_value(ev) or os.getenv(ev, "")
if existing_key:
break
if not existing_key:
print(f"No {pconfig.name} API key configured.")
if key_env:
try:
new_key = input(f"{key_env} (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not new_key:
print("Cancelled.")
return
save_env_value(key_env, new_key)
existing_key = new_key
print("API key saved.")
print()
else:
print(f" {pconfig.name} API key: {existing_key[:8]}... ✓")
print()
# Step 2: Auto-detect endpoint from key prefix
is_coding_plan = existing_key.startswith("sk-kimi-")
if is_coding_plan:
effective_base = KIMI_CODE_BASE_URL
print(f" Detected Kimi Coding Plan key → {effective_base}")
else:
effective_base = pconfig.inference_base_url
print(f" Using Moonshot endpoint → {effective_base}")
# Clear any manual base URL override so auto-detection works at runtime
if base_url_env and get_env_value(base_url_env):
save_env_value(base_url_env, "")
print()
# Step 3: Model selection — show appropriate models for the endpoint
if is_coding_plan:
# Coding Plan models (kimi-for-coding first)
model_list = [
"kimi-for-coding",
"kimi-k2.5",
"kimi-k2-thinking",
"kimi-k2-thinking-turbo",
]
else:
# Legacy Moonshot models (excludes Coding Plan-only models)
model_list = _PROVIDER_MODELS.get("moonshot", [])
if model_list:
selected = _prompt_model_selection(model_list, current_model=current_model)
else:
try:
selected = input("Enter model name: ").strip()
except (KeyboardInterrupt, EOFError):
selected = None
if selected:
# Clear custom endpoint if set (avoid confusion)
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
_save_model_choice(selected)
# Update config with provider and base URL
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
save_config(cfg)
deactivate_provider()
endpoint_label = "Kimi Coding" if is_coding_plan else "Moonshot"
print(f"Default model set to: {selected} (via {endpoint_label})")
else:
print("No change.")
def _model_flow_api_key_provider(config, provider_id, current_model=""):
"""Generic flow for API-key providers (z.ai, MiniMax)."""
from hermes_cli.auth import (
PROVIDER_REGISTRY, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, deactivate_provider,
)
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
pconfig = PROVIDER_REGISTRY[provider_id]
key_env = pconfig.api_key_env_vars[0] if pconfig.api_key_env_vars else ""
base_url_env = pconfig.base_url_env_var or ""
# Check / prompt for API key
existing_key = ""
for ev in pconfig.api_key_env_vars:
existing_key = get_env_value(ev) or os.getenv(ev, "")
if existing_key:
break
if not existing_key:
print(f"No {pconfig.name} API key configured.")
if key_env:
try:
new_key = input(f"{key_env} (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not new_key:
print("Cancelled.")
return
save_env_value(key_env, new_key)
print("API key saved.")
print()
else:
print(f" {pconfig.name} API key: {existing_key[:8]}... ✓")
print()
# Optional base URL override
current_base = ""
if base_url_env:
current_base = get_env_value(base_url_env) or os.getenv(base_url_env, "")
effective_base = current_base or pconfig.inference_base_url
try:
override = input(f"Base URL [{effective_base}]: ").strip()
except (KeyboardInterrupt, EOFError):
print()
override = ""
if override and base_url_env:
save_env_value(base_url_env, override)
effective_base = override
# Model selection — try live /models endpoint first, fall back to defaults
from hermes_cli.models import fetch_api_models
api_key_for_probe = existing_key or (get_env_value(key_env) if key_env else "")
live_models = fetch_api_models(api_key_for_probe, effective_base)
if live_models:
model_list = live_models
print(f" Found {len(model_list)} model(s) from {pconfig.name} API")
else:
model_list = _PROVIDER_MODELS.get(provider_id, [])
if model_list:
print(f" ⚠ Could not auto-detect models from API — showing defaults.")
print(f" Use \"Enter custom model name\" if you don't see your model.")
# else: no defaults either, will fall through to raw input
if model_list:
selected = _prompt_model_selection(model_list, current_model=current_model)
else:
try:
selected = input("Model name: ").strip()
except (KeyboardInterrupt, EOFError):
selected = None
if selected:
# Clear custom endpoint if set (avoid confusion)
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
_save_model_choice(selected)
# Update config with provider and base URL
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via {pconfig.name})")
else:
print("No change.")
def _run_anthropic_oauth_flow(save_env_value):
"""Run the Claude OAuth setup-token flow. Returns True if credentials were saved."""
from agent.anthropic_adapter import (
run_oauth_setup_token,
read_claude_code_credentials,
is_claude_code_token_valid,
)
from hermes_cli.config import (
save_anthropic_oauth_token,
use_anthropic_claude_code_credentials,
)
def _activate_claude_code_credentials_if_available() -> bool:
try:
creds = read_claude_code_credentials()
except Exception:
creds = None
if creds and (
is_claude_code_token_valid(creds)
or bool(creds.get("refreshToken"))
):
use_anthropic_claude_code_credentials(save_fn=save_env_value)
print(" ✓ Claude Code credentials linked.")
print(" Hermes will use Claude's credential store directly instead of copying a setup-token into ~/.hermes/.env.")
return True
return False
try:
print()
print(" Running 'claude setup-token' — follow the prompts below.")
print(" A browser window will open for you to authorize access.")
print()
token = run_oauth_setup_token()
if token:
if _activate_claude_code_credentials_if_available():
return True
save_anthropic_oauth_token(token, save_fn=save_env_value)
print(" ✓ OAuth credentials saved.")
return True
# Subprocess completed but no token auto-detected — ask user to paste
print()
print(" If the setup-token was displayed above, paste it here:")
print()
try:
manual_token = input(" Paste setup-token (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return False
if manual_token:
save_anthropic_oauth_token(manual_token, save_fn=save_env_value)
print(" ✓ Setup-token saved.")
return True
print(" ⚠ Could not detect saved credentials.")
return False
except FileNotFoundError:
# Claude CLI not installed — guide user through manual setup
print()
print(" The 'claude' CLI is required for OAuth login.")
print()
print(" To install and authenticate:")
print()
print(" 1. Install Claude Code: npm install -g @anthropic-ai/claude-code")
print(" 2. Run: claude setup-token")
print(" 3. Follow the browser prompts to authorize")
print(" 4. Re-run: hermes model")
print()
print(" Or paste an existing setup-token now (sk-ant-oat-...):")
print()
try:
token = input(" Setup-token (or Enter to cancel): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return False
if token:
save_anthropic_oauth_token(token, save_fn=save_env_value)
print(" ✓ Setup-token saved.")
return True
print(" Cancelled — install Claude Code and try again.")
return False
def _model_flow_anthropic(config, current_model=""):
"""Flow for Anthropic provider — OAuth subscription, API key, or Claude Code creds."""
import os
from hermes_cli.auth import (
PROVIDER_REGISTRY, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, deactivate_provider,
)
from hermes_cli.config import (
get_env_value, save_env_value, load_config, save_config,
save_anthropic_api_key,
)
from hermes_cli.models import _PROVIDER_MODELS
pconfig = PROVIDER_REGISTRY["anthropic"]
# Check ALL credential sources
existing_key = (
get_env_value("ANTHROPIC_TOKEN")
or os.getenv("ANTHROPIC_TOKEN", "")
or get_env_value("ANTHROPIC_API_KEY")
or os.getenv("ANTHROPIC_API_KEY", "")
or os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "")
)
cc_available = False
try:
from agent.anthropic_adapter import read_claude_code_credentials, is_claude_code_token_valid
cc_creds = read_claude_code_credentials()
if cc_creds and is_claude_code_token_valid(cc_creds):
cc_available = True
except Exception:
pass
has_creds = bool(existing_key) or cc_available
needs_auth = not has_creds
if has_creds:
# Show what we found
if existing_key:
print(f" Anthropic credentials: {existing_key[:12]}... ✓")
elif cc_available:
print(" Claude Code credentials: ✓ (auto-detected)")
print()
print(" 1. Use existing credentials")
print(" 2. Reauthenticate (new OAuth login)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
choice = "1"
if choice == "2":
needs_auth = True
elif choice == "3":
return
# choice == "1" or default: use existing, proceed to model selection
if needs_auth:
# Show auth method choice
print()
print(" Choose authentication method:")
print()
print(" 1. Claude Pro/Max subscription (OAuth login)")
print(" 2. Anthropic API key (pay-per-token)")
print(" 3. Cancel")
print()
try:
choice = input(" Choice [1/2/3]: ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if choice == "1":
if not _run_anthropic_oauth_flow(save_env_value):
return
elif choice == "2":
print()
print(" Get an API key at: https://console.anthropic.com/settings/keys")
print()
try:
api_key = input(" API key (sk-ant-...): ").strip()
except (KeyboardInterrupt, EOFError):
print()
return
if not api_key:
print(" Cancelled.")
return
save_anthropic_api_key(api_key, save_fn=save_env_value)
print(" ✓ API key saved.")
else:
print(" No change.")
return
print()
# Model selection
model_list = _PROVIDER_MODELS.get("anthropic", [])
if model_list:
selected = _prompt_model_selection(model_list, current_model=current_model)
else:
try:
selected = input("Model name (e.g., claude-sonnet-4-20250514): ").strip()
except (KeyboardInterrupt, EOFError):
selected = None
if selected:
# Clear custom endpoint if set
if get_env_value("OPENAI_BASE_URL"):
save_env_value("OPENAI_BASE_URL", "")
save_env_value("OPENAI_API_KEY", "")
_save_model_choice(selected)
# Update config with provider — clear base_url since
# resolve_runtime_provider() always hardcodes Anthropic's URL.
# Leaving a stale base_url in config can contaminate other
# providers if the user switches without running 'hermes model'.
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "anthropic"
model.pop("base_url", None)
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via Anthropic)")
else:
print("No change.")
def cmd_login(args):
"""Authenticate Hermes CLI with a provider."""
from hermes_cli.auth import login_command
login_command(args)
def cmd_logout(args):
"""Clear provider authentication."""
from hermes_cli.auth import logout_command
logout_command(args)
def cmd_status(args):
"""Show status of all components."""
from hermes_cli.status import show_status
show_status(args)
def cmd_cron(args):
"""Cron job management."""
from hermes_cli.cron import cron_command
cron_command(args)
def cmd_doctor(args):
"""Check configuration and dependencies."""
from hermes_cli.doctor import run_doctor
run_doctor(args)
def cmd_config(args):
"""Configuration management."""
from hermes_cli.config import config_command
config_command(args)
def cmd_version(args):
"""Show version."""
print(f"Hermes Agent v{__version__} ({__release_date__})")
print(f"Project: {PROJECT_ROOT}")
# Show Python version
print(f"Python: {sys.version.split()[0]}")
# Check for key dependencies
try:
import openai
print(f"OpenAI SDK: {openai.__version__}")
except ImportError:
print("OpenAI SDK: Not installed")
# Show update status (synchronous — acceptable since user asked for version info)
try:
from hermes_cli.banner import check_for_updates
behind = check_for_updates()
if behind and behind > 0:
commits_word = "commit" if behind == 1 else "commits"
print(f"Update available: {behind} {commits_word} behind — run 'hermes update'")
elif behind == 0:
print("Up to date")
except Exception:
pass
def cmd_uninstall(args):
"""Uninstall Hermes Agent."""
from hermes_cli.uninstall import run_uninstall
run_uninstall(args)
def _update_via_zip(args):
"""Update Hermes Agent by downloading a ZIP archive.
Used on Windows when git file I/O is broken (antivirus, NTFS filter
drivers causing 'Invalid argument' errors on file creation).
"""
import shutil
import tempfile
import zipfile
from urllib.request import urlretrieve
branch = "main"
zip_url = f"https://github.com/NousResearch/hermes-agent/archive/refs/heads/{branch}.zip"
print("→ Downloading latest version...")
try:
tmp_dir = tempfile.mkdtemp(prefix="hermes-update-")
zip_path = os.path.join(tmp_dir, f"hermes-agent-{branch}.zip")
urlretrieve(zip_url, zip_path)
print("→ Extracting...")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(tmp_dir)
# GitHub ZIPs extract to hermes-agent-<branch>/
extracted = os.path.join(tmp_dir, f"hermes-agent-{branch}")
if not os.path.isdir(extracted):
# Try to find it
for d in os.listdir(tmp_dir):
candidate = os.path.join(tmp_dir, d)
if os.path.isdir(candidate) and d != "__MACOSX":
extracted = candidate
break
# Copy updated files over existing installation, preserving venv/node_modules/.git
preserve = {'venv', 'node_modules', '.git', '__pycache__', '.env'}
update_count = 0
for item in os.listdir(extracted):
if item in preserve:
continue
src = os.path.join(extracted, item)
dst = os.path.join(str(PROJECT_ROOT), item)
if os.path.isdir(src):
if os.path.exists(dst):
shutil.rmtree(dst)
shutil.copytree(src, dst)
else:
shutil.copy2(src, dst)
update_count += 1
print(f"✓ Updated {update_count} items from ZIP")
# Cleanup
shutil.rmtree(tmp_dir, ignore_errors=True)
except Exception as e:
print(f"✗ ZIP update failed: {e}")
sys.exit(1)
# Reinstall Python dependencies (try .[all] first for optional extras,
# fall back to . if extras fail — mirrors the install script behavior)
print("→ Updating Python dependencies...")
import subprocess
uv_bin = shutil.which("uv")
if uv_bin:
uv_env = {**os.environ, "VIRTUAL_ENV": str(PROJECT_ROOT / "venv")}
try:
subprocess.run(
[uv_bin, "pip", "install", "-e", ".[all]", "--quiet"],
cwd=PROJECT_ROOT, check=True, env=uv_env,
)
except subprocess.CalledProcessError:
print(" ⚠ Optional extras failed, installing base dependencies...")
subprocess.run(
[uv_bin, "pip", "install", "-e", ".", "--quiet"],
cwd=PROJECT_ROOT, check=True, env=uv_env,
)
else:
venv_pip = PROJECT_ROOT / "venv" / ("Scripts" if sys.platform == "win32" else "bin") / "pip"
pip_cmd = [str(venv_pip)] if venv_pip.exists() else ["pip"]
try:
subprocess.run(pip_cmd + ["install", "-e", ".[all]", "--quiet"], cwd=PROJECT_ROOT, check=True)
except subprocess.CalledProcessError:
print(" ⚠ Optional extras failed, installing base dependencies...")
subprocess.run(pip_cmd + ["install", "-e", ".", "--quiet"], cwd=PROJECT_ROOT, check=True)
# Sync skills
try:
from tools.skills_sync import sync_skills
print("→ Syncing bundled skills...")
result = sync_skills(quiet=True)
if result["copied"]:
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
print(" ✓ Skills are up to date")
except Exception:
pass
print()
print("✓ Update complete!")
def _stash_local_changes_if_needed(git_cmd: list[str], cwd: Path) -> Optional[str]:
status = subprocess.run(
git_cmd + ["status", "--porcelain"],
cwd=cwd,
capture_output=True,
text=True,
check=True,
)
if not status.stdout.strip():
return None
from datetime import datetime, timezone
stash_name = datetime.now(timezone.utc).strftime("hermes-update-autostash-%Y%m%d-%H%M%S")
print("→ Local changes detected — stashing before update...")
subprocess.run(
git_cmd + ["stash", "push", "--include-untracked", "-m", stash_name],
cwd=cwd,
check=True,
)
stash_ref = subprocess.run(
git_cmd + ["rev-parse", "--verify", "refs/stash"],
cwd=cwd,
capture_output=True,
text=True,
check=True,
).stdout.strip()
return stash_ref
def _resolve_stash_selector(git_cmd: list[str], cwd: Path, stash_ref: str) -> Optional[str]:
stash_list = subprocess.run(
git_cmd + ["stash", "list", "--format=%gd %H"],
cwd=cwd,
capture_output=True,
text=True,
check=True,
)
for line in stash_list.stdout.splitlines():
selector, _, commit = line.partition(" ")
if commit.strip() == stash_ref:
return selector.strip()
return None
def _print_stash_cleanup_guidance(stash_ref: str, stash_selector: Optional[str] = None) -> None:
print(" Check `git status` first so you don't accidentally reapply the same change twice.")
print(" Find the saved entry with: git stash list --format='%gd %H %s'")
if stash_selector:
print(f" Remove it with: git stash drop {stash_selector}")
else:
print(f" Look for commit {stash_ref}, then drop its selector with: git stash drop stash@{{N}}")
def _restore_stashed_changes(
git_cmd: list[str],
cwd: Path,
stash_ref: str,
prompt_user: bool = False,
) -> bool:
if prompt_user:
print()
print("⚠ Local changes were stashed before updating.")
print(" Restoring them may reapply local customizations onto the updated codebase.")
print(" Review the result afterward if Hermes behaves unexpectedly.")
print("Restore local changes now? [Y/n]")
response = input().strip().lower()
if response not in ("", "y", "yes"):
print("Skipped restoring local changes.")
print("Your changes are still preserved in git stash.")
print(f"Restore manually with: git stash apply {stash_ref}")
return False
print("→ Restoring local changes...")
restore = subprocess.run(
git_cmd + ["stash", "apply", stash_ref],
cwd=cwd,
capture_output=True,
text=True,
)
if restore.returncode != 0:
print("✗ Update pulled new code, but restoring local changes failed.")
if restore.stdout.strip():
print(restore.stdout.strip())
if restore.stderr.strip():
print(restore.stderr.strip())
print("Your changes are still preserved in git stash.")
print(f"Resolve manually with: git stash apply {stash_ref}")
sys.exit(1)
stash_selector = _resolve_stash_selector(git_cmd, cwd, stash_ref)
if stash_selector is None:
print("⚠ Local changes were restored, but Hermes couldn't find the stash entry to drop.")
print(" The stash was left in place. You can remove it manually after checking the result.")
_print_stash_cleanup_guidance(stash_ref)
else:
drop = subprocess.run(
git_cmd + ["stash", "drop", stash_selector],
cwd=cwd,
capture_output=True,
text=True,
)
if drop.returncode != 0:
print("⚠ Local changes were restored, but Hermes couldn't drop the saved stash entry.")
if drop.stdout.strip():
print(drop.stdout.strip())
if drop.stderr.strip():
print(drop.stderr.strip())
print(" The stash was left in place. You can remove it manually after checking the result.")
_print_stash_cleanup_guidance(stash_ref, stash_selector)
print("⚠ Local changes were restored on top of the updated codebase.")
print(" Review `git diff` / `git status` if Hermes behaves unexpectedly.")
return True
def _invalidate_update_cache():
"""Delete the update-check cache so ``hermes --version`` doesn't
report a stale "commits behind" count after a successful update."""
try:
cache_file = Path(os.getenv(
"HERMES_HOME", Path.home() / ".hermes"
)) / ".update_check"
if cache_file.exists():
cache_file.unlink()
except Exception:
pass
def cmd_update(args):
"""Update Hermes Agent to the latest version."""
import shutil
print("⚕ Updating Hermes Agent...")
print()
# Try git-based update first, fall back to ZIP download on Windows
# when git file I/O is broken (antivirus, NTFS filter drivers, etc.)
use_zip_update = False
git_dir = PROJECT_ROOT / '.git'
if not git_dir.exists():
if sys.platform == "win32":
use_zip_update = True
else:
print("✗ Not a git repository. Please reinstall:")
print(" curl -fsSL https://raw.githubusercontent.com/NousResearch/hermes-agent/main/scripts/install.sh | bash")
sys.exit(1)
# On Windows, git can fail with "unable to write loose object file: Invalid argument"
# due to filesystem atomicity issues. Set the recommended workaround.
if sys.platform == "win32" and git_dir.exists():
subprocess.run(
["git", "-c", "windows.appendAtomically=false", "config", "windows.appendAtomically", "false"],
cwd=PROJECT_ROOT, check=False, capture_output=True
)
if use_zip_update:
# ZIP-based update for Windows when git is broken
_update_via_zip(args)
return
# Fetch and pull
try:
print("→ Fetching updates...")
git_cmd = ["git"]
if sys.platform == "win32":
git_cmd = ["git", "-c", "windows.appendAtomically=false"]
subprocess.run(git_cmd + ["fetch", "origin"], cwd=PROJECT_ROOT, check=True)
# Get current branch
result = subprocess.run(
git_cmd + ["rev-parse", "--abbrev-ref", "HEAD"],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
check=True
)
branch = result.stdout.strip()
# Fall back to main if the current branch doesn't exist on the remote
verify = subprocess.run(
git_cmd + ["rev-parse", "--verify", f"origin/{branch}"],
cwd=PROJECT_ROOT, capture_output=True, text=True,
)
if verify.returncode != 0:
branch = "main"
# Check if there are updates
result = subprocess.run(
git_cmd + ["rev-list", f"HEAD..origin/{branch}", "--count"],
cwd=PROJECT_ROOT,
capture_output=True,
text=True,
check=True
)
commit_count = int(result.stdout.strip())
if commit_count == 0:
_invalidate_update_cache()
print("✓ Already up to date!")
return
print(f"→ Found {commit_count} new commit(s)")
auto_stash_ref = _stash_local_changes_if_needed(git_cmd, PROJECT_ROOT)
prompt_for_restore = auto_stash_ref is not None and sys.stdin.isatty() and sys.stdout.isatty()
print("→ Pulling updates...")
try:
subprocess.run(git_cmd + ["pull", "origin", branch], cwd=PROJECT_ROOT, check=True)
finally:
if auto_stash_ref is not None:
_restore_stashed_changes(
git_cmd,
PROJECT_ROOT,
auto_stash_ref,
prompt_user=prompt_for_restore,
)
_invalidate_update_cache()
# Reinstall Python dependencies (try .[all] first for optional extras,
# fall back to . if extras fail — mirrors the install script behavior)
print("→ Updating Python dependencies...")
uv_bin = shutil.which("uv")
if uv_bin:
uv_env = {**os.environ, "VIRTUAL_ENV": str(PROJECT_ROOT / "venv")}
try:
subprocess.run(
[uv_bin, "pip", "install", "-e", ".[all]", "--quiet"],
cwd=PROJECT_ROOT, check=True, env=uv_env,
)
except subprocess.CalledProcessError:
print(" ⚠ Optional extras failed, installing base dependencies...")
subprocess.run(
[uv_bin, "pip", "install", "-e", ".", "--quiet"],
cwd=PROJECT_ROOT, check=True, env=uv_env,
)
else:
venv_pip = PROJECT_ROOT / "venv" / ("Scripts" if sys.platform == "win32" else "bin") / "pip"
pip_cmd = [str(venv_pip)] if venv_pip.exists() else ["pip"]
try:
subprocess.run(pip_cmd + ["install", "-e", ".[all]", "--quiet"], cwd=PROJECT_ROOT, check=True)
except subprocess.CalledProcessError:
print(" ⚠ Optional extras failed, installing base dependencies...")
subprocess.run(pip_cmd + ["install", "-e", ".", "--quiet"], cwd=PROJECT_ROOT, check=True)
# Check for Node.js deps
if (PROJECT_ROOT / "package.json").exists():
import shutil
if shutil.which("npm"):
print("→ Updating Node.js dependencies...")
subprocess.run(["npm", "install", "--silent"], cwd=PROJECT_ROOT, check=False)
print()
print("✓ Code updated!")
# Sync bundled skills (copies new, updates changed, respects user deletions)
try:
from tools.skills_sync import sync_skills
print()
print("→ Syncing bundled skills...")
result = sync_skills(quiet=True)
if result["copied"]:
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
print(" ✓ Skills are up to date")
except Exception as e:
logger.debug("Skills sync during update failed: %s", e)
# Check for config migrations
print()
print("→ Checking configuration for new options...")
from hermes_cli.config import (
get_missing_env_vars, get_missing_config_fields,
check_config_version, migrate_config
)
missing_env = get_missing_env_vars(required_only=True)
missing_config = get_missing_config_fields()
current_ver, latest_ver = check_config_version()
needs_migration = missing_env or missing_config or current_ver < latest_ver
if needs_migration:
print()
if missing_env:
print(f" ⚠️ {len(missing_env)} new required setting(s) need configuration")
if missing_config:
print(f" {len(missing_config)} new config option(s) available")
print()
response = input("Would you like to configure them now? [Y/n]: ").strip().lower()
if response in ('', 'y', 'yes'):
print()
results = migrate_config(interactive=True, quiet=False)
if results["env_added"] or results["config_added"]:
print()
print("✓ Configuration updated!")
else:
print()
print("Skipped. Run 'hermes config migrate' later to configure.")
else:
print(" ✓ Configuration is up to date")
print()
print("✓ Update complete!")
# Auto-restart gateway if it's running.
# Uses the PID file (scoped to HERMES_HOME) to find this
# installation's gateway — safe with multiple installations.
try:
from gateway.status import get_running_pid, remove_pid_file
from hermes_cli.gateway import (
get_service_name, get_launchd_plist_path, is_macos, is_linux,
refresh_launchd_plist_if_needed,
_ensure_user_systemd_env, get_systemd_linger_status,
)
import signal as _signal
_gw_service_name = get_service_name()
existing_pid = get_running_pid()
has_systemd_service = False
has_launchd_service = False
try:
_ensure_user_systemd_env()
check = subprocess.run(
["systemctl", "--user", "is-active", _gw_service_name],
capture_output=True, text=True, timeout=5,
)
has_systemd_service = check.stdout.strip() == "active"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
# Check for macOS launchd service
if is_macos():
try:
plist_path = get_launchd_plist_path()
if plist_path.exists():
check = subprocess.run(
["launchctl", "list", "ai.hermes.gateway"],
capture_output=True, text=True, timeout=5,
)
has_launchd_service = check.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
if existing_pid or has_systemd_service or has_launchd_service:
print()
# When a service manager is handling the gateway, let it
# manage the lifecycle — don't manually SIGTERM the PID
# (launchd KeepAlive would respawn immediately, causing races).
if has_systemd_service:
import time as _time
if existing_pid:
try:
os.kill(existing_pid, _signal.SIGTERM)
print(f"→ Stopped gateway process (PID {existing_pid})")
except ProcessLookupError:
pass
except PermissionError:
print(f"⚠ Permission denied killing gateway PID {existing_pid}")
remove_pid_file()
_time.sleep(1) # Brief pause for port/socket release
print("→ Restarting gateway service...")
restart = subprocess.run(
["systemctl", "--user", "restart", _gw_service_name],
capture_output=True, text=True, timeout=15,
)
if restart.returncode == 0:
print("✓ Gateway restarted.")
else:
print(f"⚠ Gateway restart failed: {restart.stderr.strip()}")
# Check if linger is the issue
if is_linux():
linger_ok, _detail = get_systemd_linger_status()
if linger_ok is not True:
import getpass
_username = getpass.getuser()
print()
print(" Linger must be enabled for the gateway user service to function.")
print(f" Run: sudo loginctl enable-linger {_username}")
print()
print(" Then restart the gateway:")
print(" hermes gateway restart")
else:
print(" Try manually: hermes gateway restart")
elif has_launchd_service:
# Refresh the plist first (picks up --replace and other
# changes from the update we just pulled).
refresh_launchd_plist_if_needed()
# Explicit stop+start — don't rely on KeepAlive respawn
# after a manual SIGTERM, which would race with the
# PID file cleanup.
print("→ Restarting gateway service...")
stop = subprocess.run(
["launchctl", "stop", "ai.hermes.gateway"],
capture_output=True, text=True, timeout=10,
)
start = subprocess.run(
["launchctl", "start", "ai.hermes.gateway"],
capture_output=True, text=True, timeout=10,
)
if start.returncode == 0:
print("✓ Gateway restarted via launchd.")
else:
print(f"⚠ Gateway restart failed: {start.stderr.strip()}")
print(" Try manually: hermes gateway restart")
elif existing_pid:
try:
os.kill(existing_pid, _signal.SIGTERM)
print(f"→ Stopped gateway process (PID {existing_pid})")
except ProcessLookupError:
pass # Already gone
except PermissionError:
print(f"⚠ Permission denied killing gateway PID {existing_pid}")
remove_pid_file()
print(" Gateway was running manually (not as a service).")
print(" Restart it with: hermes gateway run")
except Exception as e:
logger.debug("Gateway restart during update failed: %s", e)
print()
print("Tip: You can now select a provider and model:")
print(" hermes model # Select provider and model")
except subprocess.CalledProcessError as e:
if sys.platform == "win32":
print(f"⚠ Git update failed: {e}")
print("→ Falling back to ZIP download...")
print()
_update_via_zip(args)
else:
print(f"✗ Update failed: {e}")
sys.exit(1)
def _coalesce_session_name_args(argv: list) -> list:
"""Join unquoted multi-word session names after -c/--continue and -r/--resume.
When a user types ``hermes -c Pokemon Agent Dev`` without quoting the
session name, argparse sees three separate tokens. This function merges
them into a single argument so argparse receives
``['-c', 'Pokemon Agent Dev']`` instead.
Tokens are collected after the flag until we hit another flag (``-*``)
or a known top-level subcommand.
"""
_SUBCOMMANDS = {
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
"sessions", "insights", "version", "update", "uninstall",
}
_SESSION_FLAGS = {"-c", "--continue", "-r", "--resume"}
result = []
i = 0
while i < len(argv):
token = argv[i]
if token in _SESSION_FLAGS:
result.append(token)
i += 1
# Collect subsequent non-flag, non-subcommand tokens as one name
parts: list = []
while i < len(argv) and not argv[i].startswith("-") and argv[i] not in _SUBCOMMANDS:
parts.append(argv[i])
i += 1
if parts:
result.append(" ".join(parts))
else:
result.append(token)
i += 1
return result
def main():
"""Main entry point for hermes CLI."""
parser = argparse.ArgumentParser(
prog="hermes",
description="Hermes Agent - AI assistant with tool-calling capabilities",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
hermes Start interactive chat
hermes chat -q "Hello" Single query mode
hermes -c Resume the most recent session
hermes -c "my project" Resume a session by name (latest in lineage)
hermes --resume <session_id> Resume a specific session by ID
hermes setup Run setup wizard
hermes logout Clear stored authentication
hermes model Select default model
hermes config View configuration
hermes config edit Edit config in $EDITOR
hermes config set model gpt-4 Set a config value
hermes gateway Run messaging gateway
hermes -s hermes-agent-dev,github-auth
hermes -w Start in isolated git worktree
hermes gateway install Install gateway background service
hermes sessions list List past sessions
hermes sessions browse Interactive session picker
hermes sessions rename ID T Rename/title a session
hermes update Update to latest version
For more help on a command:
hermes <command> --help
"""
)
parser.add_argument(
"--version", "-V",
action="store_true",
help="Show version and exit"
)
parser.add_argument(
"--resume", "-r",
metavar="SESSION",
default=None,
help="Resume a previous session by ID or title"
)
parser.add_argument(
"--continue", "-c",
dest="continue_last",
nargs="?",
const=True,
default=None,
metavar="SESSION_NAME",
help="Resume a session by name, or the most recent if no name given"
)
parser.add_argument(
"--worktree", "-w",
action="store_true",
default=False,
help="Run in an isolated git worktree (for parallel agents)"
)
parser.add_argument(
"--skills", "-s",
action="append",
default=None,
help="Preload one or more skills for the session (repeat flag or comma-separate)"
)
parser.add_argument(
"--yolo",
action="store_true",
default=False,
help="Bypass all dangerous command approval prompts (use at your own risk)"
)
parser.add_argument(
"--pass-session-id",
action="store_true",
default=False,
help="Include the session ID in the agent's system prompt"
)
subparsers = parser.add_subparsers(dest="command", help="Command to run")
# =========================================================================
# chat command
# =========================================================================
chat_parser = subparsers.add_parser(
"chat",
help="Interactive chat with the agent",
description="Start an interactive chat session with Hermes Agent"
)
chat_parser.add_argument(
"-q", "--query",
help="Single query (non-interactive mode)"
)
chat_parser.add_argument(
"-m", "--model",
help="Model to use (e.g., anthropic/claude-sonnet-4)"
)
chat_parser.add_argument(
"-t", "--toolsets",
help="Comma-separated toolsets to enable"
)
chat_parser.add_argument(
"-s", "--skills",
action="append",
default=None,
help="Preload one or more skills for the session (repeat flag or comma-separate)"
)
chat_parser.add_argument(
"--provider",
choices=["auto", "openrouter", "nous", "openai-codex", "copilot-acp", "copilot", "anthropic", "zai", "kimi-coding", "minimax", "minimax-cn", "kilocode"],
default=None,
help="Inference provider (default: auto)"
)
chat_parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Verbose output"
)
chat_parser.add_argument(
"-Q", "--quiet",
action="store_true",
help="Quiet mode for programmatic use: suppress banner, spinner, and tool previews. Only output the final response and session info."
)
chat_parser.add_argument(
"--resume", "-r",
metavar="SESSION_ID",
help="Resume a previous session by ID (shown on exit)"
)
chat_parser.add_argument(
"--continue", "-c",
dest="continue_last",
nargs="?",
const=True,
default=None,
metavar="SESSION_NAME",
help="Resume a session by name, or the most recent if no name given"
)
chat_parser.add_argument(
"--worktree", "-w",
action="store_true",
default=False,
help="Run in an isolated git worktree (for parallel agents on the same repo)"
)
chat_parser.add_argument(
"--checkpoints",
action="store_true",
default=False,
help="Enable filesystem checkpoints before destructive file operations (use /rollback to restore)"
)
chat_parser.add_argument(
"--yolo",
action="store_true",
default=False,
help="Bypass all dangerous command approval prompts (use at your own risk)"
)
chat_parser.add_argument(
"--pass-session-id",
action="store_true",
default=False,
help="Include the session ID in the agent's system prompt"
)
chat_parser.set_defaults(func=cmd_chat)
# =========================================================================
# model command
# =========================================================================
model_parser = subparsers.add_parser(
"model",
help="Select default model and provider",
description="Interactively select your inference provider and default model"
)
model_parser.set_defaults(func=cmd_model)
# =========================================================================
# gateway command
# =========================================================================
gateway_parser = subparsers.add_parser(
"gateway",
help="Messaging gateway management",
description="Manage the messaging gateway (Telegram, Discord, WhatsApp)"
)
gateway_subparsers = gateway_parser.add_subparsers(dest="gateway_command")
# gateway run (default)
gateway_run = gateway_subparsers.add_parser("run", help="Run gateway in foreground")
gateway_run.add_argument("-v", "--verbose", action="store_true")
gateway_run.add_argument("--replace", action="store_true",
help="Replace any existing gateway instance (useful for systemd)")
# gateway start
gateway_start = gateway_subparsers.add_parser("start", help="Start gateway service")
gateway_start.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway stop
gateway_stop = gateway_subparsers.add_parser("stop", help="Stop gateway service")
gateway_stop.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway restart
gateway_restart = gateway_subparsers.add_parser("restart", help="Restart gateway service")
gateway_restart.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway status
gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status")
gateway_status.add_argument("--deep", action="store_true", help="Deep status check")
gateway_status.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway install
gateway_install = gateway_subparsers.add_parser("install", help="Install gateway as service")
gateway_install.add_argument("--force", action="store_true", help="Force reinstall")
gateway_install.add_argument("--system", action="store_true", help="Install as a Linux system-level service (starts at boot)")
gateway_install.add_argument("--run-as-user", dest="run_as_user", help="User account the Linux system service should run as")
# gateway uninstall
gateway_uninstall = gateway_subparsers.add_parser("uninstall", help="Uninstall gateway service")
gateway_uninstall.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway setup
gateway_setup = gateway_subparsers.add_parser("setup", help="Configure messaging platforms")
gateway_parser.set_defaults(func=cmd_gateway)
# =========================================================================
# setup command
# =========================================================================
setup_parser = subparsers.add_parser(
"setup",
help="Interactive setup wizard",
description="Configure Hermes Agent with an interactive wizard. "
"Run a specific section: hermes setup model|terminal|gateway|tools|agent"
)
setup_parser.add_argument(
"section",
nargs="?",
choices=["model", "terminal", "gateway", "tools", "agent"],
default=None,
help="Run a specific setup section instead of the full wizard"
)
setup_parser.add_argument(
"--non-interactive",
action="store_true",
help="Non-interactive mode (use defaults/env vars)"
)
setup_parser.add_argument(
"--reset",
action="store_true",
help="Reset configuration to defaults"
)
setup_parser.set_defaults(func=cmd_setup)
# =========================================================================
# whatsapp command
# =========================================================================
whatsapp_parser = subparsers.add_parser(
"whatsapp",
help="Set up WhatsApp integration",
description="Configure WhatsApp and pair via QR code"
)
whatsapp_parser.set_defaults(func=cmd_whatsapp)
# =========================================================================
# login command
# =========================================================================
login_parser = subparsers.add_parser(
"login",
help="Authenticate with an inference provider",
description="Run OAuth device authorization flow for Hermes CLI"
)
login_parser.add_argument(
"--provider",
choices=["nous", "openai-codex"],
default=None,
help="Provider to authenticate with (default: nous)"
)
login_parser.add_argument(
"--portal-url",
help="Portal base URL (default: production portal)"
)
login_parser.add_argument(
"--inference-url",
help="Inference API base URL (default: production inference API)"
)
login_parser.add_argument(
"--client-id",
default=None,
help="OAuth client id to use (default: hermes-cli)"
)
login_parser.add_argument(
"--scope",
default=None,
help="OAuth scope to request"
)
login_parser.add_argument(
"--no-browser",
action="store_true",
help="Do not attempt to open the browser automatically"
)
login_parser.add_argument(
"--timeout",
type=float,
default=15.0,
help="HTTP request timeout in seconds (default: 15)"
)
login_parser.add_argument(
"--ca-bundle",
help="Path to CA bundle PEM file for TLS verification"
)
login_parser.add_argument(
"--insecure",
action="store_true",
help="Disable TLS verification (testing only)"
)
login_parser.set_defaults(func=cmd_login)
# =========================================================================
# logout command
# =========================================================================
logout_parser = subparsers.add_parser(
"logout",
help="Clear authentication for an inference provider",
description="Remove stored credentials and reset provider config"
)
logout_parser.add_argument(
"--provider",
choices=["nous", "openai-codex"],
default=None,
help="Provider to log out from (default: active provider)"
)
logout_parser.set_defaults(func=cmd_logout)
# =========================================================================
# status command
# =========================================================================
status_parser = subparsers.add_parser(
"status",
help="Show status of all components",
description="Display status of Hermes Agent components"
)
status_parser.add_argument(
"--all",
action="store_true",
help="Show all details (redacted for sharing)"
)
status_parser.add_argument(
"--deep",
action="store_true",
help="Run deep checks (may take longer)"
)
status_parser.set_defaults(func=cmd_status)
# =========================================================================
# cron command
# =========================================================================
cron_parser = subparsers.add_parser(
"cron",
help="Cron job management",
description="Manage scheduled tasks"
)
cron_subparsers = cron_parser.add_subparsers(dest="cron_command")
# cron list
cron_list = cron_subparsers.add_parser("list", help="List scheduled jobs")
cron_list.add_argument("--all", action="store_true", help="Include disabled jobs")
# cron create/add
cron_create = cron_subparsers.add_parser("create", aliases=["add"], help="Create a scheduled job")
cron_create.add_argument("schedule", help="Schedule like '30m', 'every 2h', or '0 9 * * *'")
cron_create.add_argument("prompt", nargs="?", help="Optional self-contained prompt or task instruction")
cron_create.add_argument("--name", help="Optional human-friendly job name")
cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id")
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
# cron edit
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
cron_edit.add_argument("job_id", help="Job ID to edit")
cron_edit.add_argument("--schedule", help="New schedule")
cron_edit.add_argument("--prompt", help="New prompt/task instruction")
cron_edit.add_argument("--name", help="New job name")
cron_edit.add_argument("--deliver", help="New delivery target")
cron_edit.add_argument("--repeat", type=int, help="New repeat count")
cron_edit.add_argument("--skill", dest="skills", action="append", help="Replace the job's skills with this set. Repeat to attach multiple skills.")
cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.")
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
# lifecycle actions
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")
cron_pause.add_argument("job_id", help="Job ID to pause")
cron_resume = cron_subparsers.add_parser("resume", help="Resume a paused job")
cron_resume.add_argument("job_id", help="Job ID to resume")
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")
# cron status
cron_subparsers.add_parser("status", help="Check if cron scheduler is running")
# cron tick (mostly for debugging)
cron_subparsers.add_parser("tick", help="Run due jobs once and exit")
cron_parser.set_defaults(func=cmd_cron)
# =========================================================================
# doctor command
# =========================================================================
doctor_parser = subparsers.add_parser(
"doctor",
help="Check configuration and dependencies",
description="Diagnose issues with Hermes Agent setup"
)
doctor_parser.add_argument(
"--fix",
action="store_true",
help="Attempt to fix issues automatically"
)
doctor_parser.set_defaults(func=cmd_doctor)
# =========================================================================
# config command
# =========================================================================
config_parser = subparsers.add_parser(
"config",
help="View and edit configuration",
description="Manage Hermes Agent configuration"
)
config_subparsers = config_parser.add_subparsers(dest="config_command")
# config show (default)
config_show = config_subparsers.add_parser("show", help="Show current configuration")
# config edit
config_edit = config_subparsers.add_parser("edit", help="Open config file in editor")
# config set
config_set = config_subparsers.add_parser("set", help="Set a configuration value")
config_set.add_argument("key", nargs="?", help="Configuration key (e.g., model, terminal.backend)")
config_set.add_argument("value", nargs="?", help="Value to set")
# config path
config_path = config_subparsers.add_parser("path", help="Print config file path")
# config env-path
config_env = config_subparsers.add_parser("env-path", help="Print .env file path")
# config check
config_check = config_subparsers.add_parser("check", help="Check for missing/outdated config")
# config migrate
config_migrate = config_subparsers.add_parser("migrate", help="Update config with new options")
config_parser.set_defaults(func=cmd_config)
# =========================================================================
# pairing command
# =========================================================================
pairing_parser = subparsers.add_parser(
"pairing",
help="Manage DM pairing codes for user authorization",
description="Approve or revoke user access via pairing codes"
)
pairing_sub = pairing_parser.add_subparsers(dest="pairing_action")
pairing_list_parser = pairing_sub.add_parser("list", help="Show pending + approved users")
pairing_approve_parser = pairing_sub.add_parser("approve", help="Approve a pairing code")
pairing_approve_parser.add_argument("platform", help="Platform name (telegram, discord, slack, whatsapp)")
pairing_approve_parser.add_argument("code", help="Pairing code to approve")
pairing_revoke_parser = pairing_sub.add_parser("revoke", help="Revoke user access")
pairing_revoke_parser.add_argument("platform", help="Platform name")
pairing_revoke_parser.add_argument("user_id", help="User ID to revoke")
pairing_clear_parser = pairing_sub.add_parser("clear-pending", help="Clear all pending codes")
def cmd_pairing(args):
from hermes_cli.pairing import pairing_command
pairing_command(args)
pairing_parser.set_defaults(func=cmd_pairing)
# =========================================================================
# skills command
# =========================================================================
skills_parser = subparsers.add_parser(
"skills",
help="Search, install, configure, and manage skills",
description="Search, install, inspect, audit, configure, and manage skills from skills.sh, well-known agent skill endpoints, GitHub, ClawHub, and other registries."
)
skills_subparsers = skills_parser.add_subparsers(dest="skills_action")
skills_browse = skills_subparsers.add_parser("browse", help="Browse all available skills (paginated)")
skills_browse.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
skills_browse.add_argument("--size", type=int, default=20, help="Results per page (default: 20)")
skills_browse.add_argument("--source", default="all",
choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"],
help="Filter by source (default: all)")
skills_search = skills_subparsers.add_parser("search", help="Search skill registries")
skills_search.add_argument("query", help="Search query")
skills_search.add_argument("--source", default="all", choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"])
skills_search.add_argument("--limit", type=int, default=10, help="Max results")
skills_install = skills_subparsers.add_parser("install", help="Install a skill")
skills_install.add_argument("identifier", help="Skill identifier (e.g. openai/skills/skill-creator)")
skills_install.add_argument("--category", default="", help="Category folder to install into")
skills_install.add_argument("--force", action="store_true", help="Install despite blocked scan verdict")
skills_install.add_argument("--yes", "-y", action="store_true", help="Skip confirmation prompt (needed in TUI mode)")
skills_inspect = skills_subparsers.add_parser("inspect", help="Preview a skill without installing")
skills_inspect.add_argument("identifier", help="Skill identifier")
skills_list = skills_subparsers.add_parser("list", help="List installed skills")
skills_list.add_argument("--source", default="all", choices=["all", "hub", "builtin", "local"])
skills_check = skills_subparsers.add_parser("check", help="Check installed hub skills for updates")
skills_check.add_argument("name", nargs="?", help="Specific skill to check (default: all)")
skills_update = skills_subparsers.add_parser("update", help="Update installed hub skills")
skills_update.add_argument("name", nargs="?", help="Specific skill to update (default: all outdated skills)")
skills_audit = skills_subparsers.add_parser("audit", help="Re-scan installed hub skills")
skills_audit.add_argument("name", nargs="?", help="Specific skill to audit (default: all)")
skills_uninstall = skills_subparsers.add_parser("uninstall", help="Remove a hub-installed skill")
skills_uninstall.add_argument("name", help="Skill name to remove")
skills_publish = skills_subparsers.add_parser("publish", help="Publish a skill to a registry")
skills_publish.add_argument("skill_path", help="Path to skill directory")
skills_publish.add_argument("--to", default="github", choices=["github", "clawhub"], help="Target registry")
skills_publish.add_argument("--repo", default="", help="Target GitHub repo (e.g. openai/skills)")
skills_snapshot = skills_subparsers.add_parser("snapshot", help="Export/import skill configurations")
snapshot_subparsers = skills_snapshot.add_subparsers(dest="snapshot_action")
snap_export = snapshot_subparsers.add_parser("export", help="Export installed skills to a file")
snap_export.add_argument("output", help="Output JSON file path")
snap_import = snapshot_subparsers.add_parser("import", help="Import and install skills from a file")
snap_import.add_argument("input", help="Input JSON file path")
snap_import.add_argument("--force", action="store_true", help="Force install despite caution verdict")
skills_tap = skills_subparsers.add_parser("tap", help="Manage skill sources")
tap_subparsers = skills_tap.add_subparsers(dest="tap_action")
tap_subparsers.add_parser("list", help="List configured taps")
tap_add = tap_subparsers.add_parser("add", help="Add a GitHub repo as skill source")
tap_add.add_argument("repo", help="GitHub repo (e.g. owner/repo)")
tap_rm = tap_subparsers.add_parser("remove", help="Remove a tap")
tap_rm.add_argument("name", help="Tap name to remove")
# config sub-action: interactive enable/disable
skills_subparsers.add_parser("config", help="Interactive skill configuration — enable/disable individual skills")
def cmd_skills(args):
# Route 'config' action to skills_config module
if getattr(args, 'skills_action', None) == 'config':
from hermes_cli.skills_config import skills_command as skills_config_command
skills_config_command(args)
else:
from hermes_cli.skills_hub import skills_command
skills_command(args)
skills_parser.set_defaults(func=cmd_skills)
# =========================================================================
# honcho command
# =========================================================================
honcho_parser = subparsers.add_parser(
"honcho",
help="Manage Honcho AI memory integration",
description=(
"Honcho is a memory layer that persists across sessions.\n\n"
"Each conversation is stored as a peer interaction in a workspace. "
"Honcho builds a representation of the user over time — conclusions, "
"patterns, context — and surfaces the relevant slice at the start of "
"each turn so Hermes knows who you are without you having to repeat yourself.\n\n"
"Modes: hybrid (Honcho + local MEMORY.md), honcho (Honcho only), "
"local (MEMORY.md only). Write frequency is configurable so memory "
"writes never block the response."
),
formatter_class=__import__("argparse").RawDescriptionHelpFormatter,
)
honcho_subparsers = honcho_parser.add_subparsers(dest="honcho_command")
honcho_subparsers.add_parser("setup", help="Interactive setup wizard for Honcho integration")
honcho_subparsers.add_parser("status", help="Show current Honcho config and connection status")
honcho_subparsers.add_parser("sessions", help="List known Honcho session mappings")
honcho_map = honcho_subparsers.add_parser(
"map", help="Map current directory to a Honcho session name (no arg = list mappings)"
)
honcho_map.add_argument(
"session_name", nargs="?", default=None,
help="Session name to associate with this directory. Omit to list current mappings.",
)
honcho_peer = honcho_subparsers.add_parser(
"peer", help="Show or update peer names and dialectic reasoning level"
)
honcho_peer.add_argument("--user", metavar="NAME", help="Set user peer name")
honcho_peer.add_argument("--ai", metavar="NAME", help="Set AI peer name")
honcho_peer.add_argument(
"--reasoning",
metavar="LEVEL",
choices=("minimal", "low", "medium", "high", "max"),
help="Set default dialectic reasoning level (minimal/low/medium/high/max)",
)
honcho_mode = honcho_subparsers.add_parser(
"mode", help="Show or set memory mode (hybrid/honcho/local)"
)
honcho_mode.add_argument(
"mode", nargs="?", metavar="MODE",
choices=("hybrid", "honcho", "local"),
help="Memory mode to set (hybrid/honcho/local). Omit to show current.",
)
honcho_tokens = honcho_subparsers.add_parser(
"tokens", help="Show or set token budget for context and dialectic"
)
honcho_tokens.add_argument(
"--context", type=int, metavar="N",
help="Max tokens Honcho returns from session.context() per turn",
)
honcho_tokens.add_argument(
"--dialectic", type=int, metavar="N",
help="Max chars of dialectic result to inject into system prompt",
)
honcho_identity = honcho_subparsers.add_parser(
"identity", help="Seed or show the AI peer's Honcho identity representation"
)
honcho_identity.add_argument(
"file", nargs="?", default=None,
help="Path to file to seed from (e.g. SOUL.md). Omit to show usage.",
)
honcho_identity.add_argument(
"--show", action="store_true",
help="Show current AI peer representation from Honcho",
)
honcho_subparsers.add_parser(
"migrate",
help="Step-by-step migration guide from openclaw-honcho to Hermes Honcho",
)
def cmd_honcho(args):
from honcho_integration.cli import honcho_command
honcho_command(args)
honcho_parser.set_defaults(func=cmd_honcho)
# =========================================================================
# tools command
# =========================================================================
tools_parser = subparsers.add_parser(
"tools",
help="Configure which tools are enabled per platform",
description=(
"Enable, disable, or list tools for CLI, Telegram, Discord, etc.\n\n"
"Built-in toolsets use plain names (e.g. web, memory).\n"
"MCP tools use server:tool notation (e.g. github:create_issue).\n\n"
"Run 'hermes tools' with no subcommand for the interactive configuration UI."
),
)
tools_parser.add_argument(
"--summary",
action="store_true",
help="Print a summary of enabled tools per platform and exit"
)
tools_sub = tools_parser.add_subparsers(dest="tools_action")
# hermes tools list [--platform cli]
tools_list_p = tools_sub.add_parser(
"list",
help="Show all tools and their enabled/disabled status",
)
tools_list_p.add_argument(
"--platform", default="cli",
help="Platform to show (default: cli)",
)
# hermes tools disable <name...> [--platform cli]
tools_disable_p = tools_sub.add_parser(
"disable",
help="Disable toolsets or MCP tools",
)
tools_disable_p.add_argument(
"names", nargs="+", metavar="NAME",
help="Toolset name (e.g. web) or MCP tool in server:tool form",
)
tools_disable_p.add_argument(
"--platform", default="cli",
help="Platform to apply to (default: cli)",
)
# hermes tools enable <name...> [--platform cli]
tools_enable_p = tools_sub.add_parser(
"enable",
help="Enable toolsets or MCP tools",
)
tools_enable_p.add_argument(
"names", nargs="+", metavar="NAME",
help="Toolset name or MCP tool in server:tool form",
)
tools_enable_p.add_argument(
"--platform", default="cli",
help="Platform to apply to (default: cli)",
)
def cmd_tools(args):
action = getattr(args, "tools_action", None)
if action in ("list", "disable", "enable"):
from hermes_cli.tools_config import tools_disable_enable_command
tools_disable_enable_command(args)
else:
from hermes_cli.tools_config import tools_command
tools_command(args)
tools_parser.set_defaults(func=cmd_tools)
# =========================================================================
# sessions command
# =========================================================================
sessions_parser = subparsers.add_parser(
"sessions",
help="Manage session history (list, rename, export, prune, delete)",
description="View and manage the SQLite session store"
)
sessions_subparsers = sessions_parser.add_subparsers(dest="sessions_action")
sessions_list = sessions_subparsers.add_parser("list", help="List recent sessions")
sessions_list.add_argument("--source", help="Filter by source (cli, telegram, discord, etc.)")
sessions_list.add_argument("--limit", type=int, default=20, help="Max sessions to show")
sessions_export = sessions_subparsers.add_parser("export", help="Export sessions to a JSONL file")
sessions_export.add_argument("output", help="Output JSONL file path")
sessions_export.add_argument("--source", help="Filter by source")
sessions_export.add_argument("--session-id", help="Export a specific session")
sessions_delete = sessions_subparsers.add_parser("delete", help="Delete a specific session")
sessions_delete.add_argument("session_id", help="Session ID to delete")
sessions_delete.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_prune = sessions_subparsers.add_parser("prune", help="Delete old sessions")
sessions_prune.add_argument("--older-than", type=int, default=90, help="Delete sessions older than N days (default: 90)")
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
sessions_rename.add_argument("session_id", help="Session ID to rename")
sessions_rename.add_argument("title", nargs="+", help="New title for the session")
sessions_browse = sessions_subparsers.add_parser(
"browse",
help="Interactive session picker — browse, search, and resume sessions",
)
sessions_browse.add_argument("--source", help="Filter by source (cli, telegram, discord, etc.)")
sessions_browse.add_argument("--limit", type=int, default=50, help="Max sessions to load (default: 50)")
def cmd_sessions(args):
import json as _json
try:
from hermes_state import SessionDB
db = SessionDB()
except Exception as e:
print(f"Error: Could not open session database: {e}")
return
action = args.sessions_action
if action == "list":
sessions = db.list_sessions_rich(source=args.source, limit=args.limit)
if not sessions:
print("No sessions found.")
return
has_titles = any(s.get("title") for s in sessions)
if has_titles:
print(f"{'Title':<32} {'Preview':<40} {'Last Active':<13} {'ID'}")
print("" * 110)
else:
print(f"{'Preview':<50} {'Last Active':<13} {'Src':<6} {'ID'}")
print("" * 95)
for s in sessions:
last_active = _relative_time(s.get("last_active"))
preview = s.get("preview", "")[:38] if has_titles else s.get("preview", "")[:48]
if has_titles:
title = (s.get("title") or "")[:30]
sid = s["id"]
print(f"{title:<32} {preview:<40} {last_active:<13} {sid}")
else:
sid = s["id"]
print(f"{preview:<50} {last_active:<13} {s['source']:<6} {sid}")
elif action == "export":
if args.session_id:
resolved_session_id = db.resolve_session_id(args.session_id)
if not resolved_session_id:
print(f"Session '{args.session_id}' not found.")
return
data = db.export_session(resolved_session_id)
if not data:
print(f"Session '{args.session_id}' not found.")
return
with open(args.output, "w", encoding="utf-8") as f:
f.write(_json.dumps(data, ensure_ascii=False) + "\n")
print(f"Exported 1 session to {args.output}")
else:
sessions = db.export_all(source=args.source)
with open(args.output, "w", encoding="utf-8") as f:
for s in sessions:
f.write(_json.dumps(s, ensure_ascii=False) + "\n")
print(f"Exported {len(sessions)} sessions to {args.output}")
elif action == "delete":
resolved_session_id = db.resolve_session_id(args.session_id)
if not resolved_session_id:
print(f"Session '{args.session_id}' not found.")
return
if not args.yes:
confirm = input(f"Delete session '{resolved_session_id}' and all its messages? [y/N] ")
if confirm.lower() not in ("y", "yes"):
print("Cancelled.")
return
if db.delete_session(resolved_session_id):
print(f"Deleted session '{resolved_session_id}'.")
else:
print(f"Session '{args.session_id}' not found.")
elif action == "prune":
days = args.older_than
source_msg = f" from '{args.source}'" if args.source else ""
if not args.yes:
confirm = input(f"Delete all ended sessions older than {days} days{source_msg}? [y/N] ")
if confirm.lower() not in ("y", "yes"):
print("Cancelled.")
return
count = db.prune_sessions(older_than_days=days, source=args.source)
print(f"Pruned {count} session(s).")
elif action == "rename":
resolved_session_id = db.resolve_session_id(args.session_id)
if not resolved_session_id:
print(f"Session '{args.session_id}' not found.")
return
title = " ".join(args.title)
try:
if db.set_session_title(resolved_session_id, title):
print(f"Session '{resolved_session_id}' renamed to: {title}")
else:
print(f"Session '{args.session_id}' not found.")
except ValueError as e:
print(f"Error: {e}")
elif action == "browse":
limit = getattr(args, "limit", 50) or 50
source = getattr(args, "source", None)
sessions = db.list_sessions_rich(source=source, limit=limit)
db.close()
if not sessions:
print("No sessions found.")
return
selected_id = _session_browse_picker(sessions)
if not selected_id:
print("Cancelled.")
return
# Launch hermes --resume <id> by replacing the current process
print(f"Resuming session: {selected_id}")
import shutil
hermes_bin = shutil.which("hermes")
if hermes_bin:
os.execvp(hermes_bin, ["hermes", "--resume", selected_id])
else:
# Fallback: re-invoke via python -m
os.execvp(
sys.executable,
[sys.executable, "-m", "hermes_cli.main", "--resume", selected_id],
)
return # won't reach here after execvp
elif action == "stats":
total = db.session_count()
msgs = db.message_count()
print(f"Total sessions: {total}")
print(f"Total messages: {msgs}")
for src in ["cli", "telegram", "discord", "whatsapp", "slack"]:
c = db.session_count(source=src)
if c > 0:
print(f" {src}: {c} sessions")
db_path = db.db_path
if db_path.exists():
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.1f} MB")
else:
sessions_parser.print_help()
db.close()
sessions_parser.set_defaults(func=cmd_sessions)
# =========================================================================
# insights command
# =========================================================================
insights_parser = subparsers.add_parser(
"insights",
help="Show usage insights and analytics",
description="Analyze session history to show token usage, costs, tool patterns, and activity trends"
)
insights_parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)")
insights_parser.add_argument("--source", help="Filter by platform (cli, telegram, discord, etc.)")
def cmd_insights(args):
try:
from hermes_state import SessionDB
from agent.insights import InsightsEngine
db = SessionDB()
engine = InsightsEngine(db)
report = engine.generate(days=args.days, source=args.source)
print(engine.format_terminal(report))
db.close()
except Exception as e:
print(f"Error generating insights: {e}")
insights_parser.set_defaults(func=cmd_insights)
# =========================================================================
# claw command (OpenClaw migration)
# =========================================================================
claw_parser = subparsers.add_parser(
"claw",
help="OpenClaw migration tools",
description="Migrate settings, memories, skills, and API keys from OpenClaw to Hermes"
)
claw_subparsers = claw_parser.add_subparsers(dest="claw_action")
# claw migrate
claw_migrate = claw_subparsers.add_parser(
"migrate",
help="Migrate from OpenClaw to Hermes",
description="Import settings, memories, skills, and API keys from an OpenClaw installation"
)
claw_migrate.add_argument(
"--source",
help="Path to OpenClaw directory (default: ~/.openclaw)"
)
claw_migrate.add_argument(
"--dry-run",
action="store_true",
help="Preview what would be migrated without making changes"
)
claw_migrate.add_argument(
"--preset",
choices=["user-data", "full"],
default="full",
help="Migration preset (default: full). 'user-data' excludes secrets"
)
claw_migrate.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files (default: skip conflicts)"
)
claw_migrate.add_argument(
"--migrate-secrets",
action="store_true",
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.)"
)
claw_migrate.add_argument(
"--workspace-target",
help="Absolute path to copy workspace instructions into"
)
claw_migrate.add_argument(
"--skill-conflict",
choices=["skip", "overwrite", "rename"],
default="skip",
help="How to handle skill name conflicts (default: skip)"
)
claw_migrate.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts"
)
def cmd_claw(args):
from hermes_cli.claw import claw_command
claw_command(args)
claw_parser.set_defaults(func=cmd_claw)
# =========================================================================
# version command
# =========================================================================
version_parser = subparsers.add_parser(
"version",
help="Show version information"
)
version_parser.set_defaults(func=cmd_version)
# =========================================================================
# update command
# =========================================================================
update_parser = subparsers.add_parser(
"update",
help="Update Hermes Agent to the latest version",
description="Pull the latest changes from git and reinstall dependencies"
)
update_parser.set_defaults(func=cmd_update)
# =========================================================================
# uninstall command
# =========================================================================
uninstall_parser = subparsers.add_parser(
"uninstall",
help="Uninstall Hermes Agent",
description="Remove Hermes Agent from your system. Can keep configs/data for reinstall."
)
uninstall_parser.add_argument(
"--full",
action="store_true",
help="Full uninstall - remove everything including configs and data"
)
uninstall_parser.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts"
)
uninstall_parser.set_defaults(func=cmd_uninstall)
# =========================================================================
# acp command
# =========================================================================
acp_parser = subparsers.add_parser(
"acp",
help="Run Hermes Agent as an ACP (Agent Client Protocol) server",
description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)",
)
def cmd_acp(args):
"""Launch Hermes Agent as an ACP server."""
try:
from acp_adapter.entry import main as acp_main
acp_main()
except ImportError:
print("ACP dependencies not installed.")
print("Install them with: pip install -e '.[acp]'")
sys.exit(1)
acp_parser.set_defaults(func=cmd_acp)
# =========================================================================
# Parse and execute
# =========================================================================
# Pre-process argv so unquoted multi-word session names after -c / -r
# are merged into a single token before argparse sees them.
# e.g. ``hermes -c Pokemon Agent Dev`` → ``hermes -c 'Pokemon Agent Dev'``
_processed_argv = _coalesce_session_name_args(sys.argv[1:])
args = parser.parse_args(_processed_argv)
# Handle --version flag
if args.version:
cmd_version(args)
return
# Handle top-level --resume / --continue as shortcut to chat
if (args.resume or args.continue_last) and args.command is None:
args.command = "chat"
args.query = None
args.model = None
args.provider = None
args.toolsets = None
args.verbose = False
if not hasattr(args, "worktree"):
args.worktree = False
cmd_chat(args)
return
# Default to chat if no command specified
if args.command is None:
args.query = None
args.model = None
args.provider = None
args.toolsets = None
args.verbose = False
args.resume = None
args.continue_last = None
if not hasattr(args, "worktree"):
args.worktree = False
cmd_chat(args)
return
# Execute the command
if hasattr(args, 'func'):
args.func(args)
else:
parser.print_help()
if __name__ == "__main__":
main()