When interactive TUI commands are invoked non-interactively (e.g. via the agent's terminal() tool through a subprocess pipe), curses loops spin at 100% CPU and input() calls hang indefinitely. Defense in depth — two layers: 1. Source-level guard in curses_checklist() (curses_ui.py + checklist.py): Returns cancel_returns immediately when stdin is not a TTY. This catches ALL callers automatically, including future code. 2. Command-level guards with clear error messages: - hermes tools (interactive checklist, not list/disable/enable) - hermes setup (interactive wizard) - hermes model (provider/model picker) - hermes whatsapp (pairing setup) - hermes skills config (skill toggle) - hermes mcp configure (tool selection) - hermes uninstall (confirmation prompt) Non-interactive subcommands (hermes tools list, hermes tools enable, hermes mcp add/remove/list/test, hermes skills search/install/browse) remain unaffected.
4890 lines
193 KiB
Python
4890 lines
193 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Hermes CLI - Main entry point.
|
||
|
||
Usage:
|
||
hermes # Interactive chat (default)
|
||
hermes chat # Interactive chat
|
||
hermes gateway # Run gateway in foreground
|
||
hermes gateway start # Start gateway as service
|
||
hermes gateway stop # Stop gateway service
|
||
hermes gateway status # Show gateway status
|
||
hermes gateway install # Install gateway service
|
||
hermes gateway uninstall # Uninstall gateway service
|
||
hermes setup # Interactive setup wizard
|
||
hermes logout # Clear stored authentication
|
||
hermes status # Show status of all components
|
||
hermes cron # Manage cron jobs
|
||
hermes cron list # List cron jobs
|
||
hermes cron status # Check if cron scheduler is running
|
||
hermes doctor # Check configuration and dependencies
|
||
hermes honcho setup # Configure Honcho AI memory integration
|
||
hermes honcho status # Show Honcho config and connection status
|
||
hermes honcho sessions # List directory → session name mappings
|
||
hermes honcho map <name> # Map current directory to a session name
|
||
hermes honcho peer # Show peer names and dialectic settings
|
||
hermes honcho peer --user NAME # Set user peer name
|
||
hermes honcho peer --ai NAME # Set AI peer name
|
||
hermes honcho peer --reasoning LEVEL # Set dialectic reasoning level
|
||
hermes honcho mode # Show current memory mode
|
||
hermes honcho mode [hybrid|honcho|local] # Set memory mode
|
||
hermes honcho tokens # Show token budget settings
|
||
hermes honcho tokens --context N # Set session.context() token cap
|
||
hermes honcho tokens --dialectic N # Set dialectic result char cap
|
||
hermes honcho identity # Show AI peer identity representation
|
||
hermes honcho identity <file> # Seed AI peer identity from a file (SOUL.md etc.)
|
||
hermes honcho migrate # Step-by-step migration guide: OpenClaw native → Hermes + Honcho
|
||
hermes version Show version
|
||
hermes update Update to latest version
|
||
hermes uninstall Uninstall Hermes Agent
|
||
hermes acp Run as an ACP server for editor integration
|
||
hermes sessions browse Interactive session picker with search
|
||
|
||
hermes claw migrate --dry-run # Preview migration without changes
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
def _require_tty(command_name: str) -> None:
|
||
"""Exit with a clear error if stdin is not a terminal.
|
||
|
||
Interactive TUI commands (hermes tools, hermes setup, hermes model) use
|
||
curses or input() prompts that spin at 100% CPU when stdin is a pipe.
|
||
This guard prevents accidental non-interactive invocation.
|
||
"""
|
||
if not sys.stdin.isatty():
|
||
print(
|
||
f"Error: 'hermes {command_name}' requires an interactive terminal.\n"
|
||
f"It cannot be run through a pipe or non-interactive subprocess.\n"
|
||
f"Run it directly in your terminal instead.",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(1)
|
||
|
||
|
||
# Add project root to path
|
||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||
sys.path.insert(0, str(PROJECT_ROOT))
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Profile override — MUST happen before any hermes module import.
|
||
#
|
||
# Many modules cache HERMES_HOME at import time (module-level constants).
|
||
# We intercept --profile/-p from sys.argv here and set the env var so that
|
||
# every subsequent ``os.getenv("HERMES_HOME", ...)`` resolves correctly.
|
||
# The flag is stripped from sys.argv so argparse never sees it.
|
||
# Falls back to ~/.hermes/active_profile for sticky default.
|
||
# ---------------------------------------------------------------------------
|
||
def _apply_profile_override() -> None:
|
||
"""Pre-parse --profile/-p and set HERMES_HOME before module imports."""
|
||
argv = sys.argv[1:]
|
||
profile_name = None
|
||
consume = 0
|
||
|
||
# 1. Check for explicit -p / --profile flag
|
||
for i, arg in enumerate(argv):
|
||
if arg in ("--profile", "-p") and i + 1 < len(argv):
|
||
profile_name = argv[i + 1]
|
||
consume = 2
|
||
break
|
||
elif arg.startswith("--profile="):
|
||
profile_name = arg.split("=", 1)[1]
|
||
consume = 1
|
||
break
|
||
|
||
# 2. If no flag, check ~/.hermes/active_profile
|
||
if profile_name is None:
|
||
try:
|
||
active_path = Path.home() / ".hermes" / "active_profile"
|
||
if active_path.exists():
|
||
name = active_path.read_text().strip()
|
||
if name and name != "default":
|
||
profile_name = name
|
||
consume = 0 # don't strip anything from argv
|
||
except (UnicodeDecodeError, OSError):
|
||
pass # corrupted file, skip
|
||
|
||
# 3. If we found a profile, resolve and set HERMES_HOME
|
||
if profile_name is not None:
|
||
try:
|
||
from hermes_cli.profiles import resolve_profile_env
|
||
hermes_home = resolve_profile_env(profile_name)
|
||
except (ValueError, FileNotFoundError) as exc:
|
||
print(f"Error: {exc}", file=sys.stderr)
|
||
sys.exit(1)
|
||
except Exception as exc:
|
||
# A bug in profiles.py must NEVER prevent hermes from starting
|
||
print(f"Warning: profile override failed ({exc}), using default", file=sys.stderr)
|
||
return
|
||
os.environ["HERMES_HOME"] = hermes_home
|
||
# Strip the flag from argv so argparse doesn't choke
|
||
if consume > 0:
|
||
for i, arg in enumerate(argv):
|
||
if arg in ("--profile", "-p"):
|
||
start = i + 1 # +1 because argv is sys.argv[1:]
|
||
sys.argv = sys.argv[:start] + sys.argv[start + consume:]
|
||
break
|
||
elif arg.startswith("--profile="):
|
||
start = i + 1
|
||
sys.argv = sys.argv[:start] + sys.argv[start + 1:]
|
||
break
|
||
|
||
_apply_profile_override()
|
||
|
||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||
# User-managed env files should override stale shell exports on restart.
|
||
from hermes_cli.config import get_hermes_home
|
||
from hermes_cli.env_loader import load_hermes_dotenv
|
||
load_hermes_dotenv(project_env=PROJECT_ROOT / '.env')
|
||
|
||
|
||
import logging
|
||
import time as _time
|
||
from datetime import datetime
|
||
|
||
from hermes_cli import __version__, __release_date__
|
||
from hermes_constants import OPENROUTER_BASE_URL
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _relative_time(ts) -> str:
|
||
"""Format a timestamp as relative time (e.g., '2h ago', 'yesterday')."""
|
||
if not ts:
|
||
return "?"
|
||
delta = _time.time() - ts
|
||
if delta < 60:
|
||
return "just now"
|
||
if delta < 3600:
|
||
return f"{int(delta / 60)}m ago"
|
||
if delta < 86400:
|
||
return f"{int(delta / 3600)}h ago"
|
||
if delta < 172800:
|
||
return "yesterday"
|
||
if delta < 604800:
|
||
return f"{int(delta / 86400)}d ago"
|
||
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
|
||
|
||
|
||
def _has_any_provider_configured() -> bool:
|
||
"""Check if at least one inference provider is usable."""
|
||
from hermes_cli.config import get_env_path, get_hermes_home
|
||
from hermes_cli.auth import get_auth_status
|
||
|
||
# Check env vars (may be set by .env or shell).
|
||
# OPENAI_BASE_URL alone counts — local models (vLLM, llama.cpp, etc.)
|
||
# often don't require an API key.
|
||
from hermes_cli.auth import PROVIDER_REGISTRY
|
||
|
||
# Collect all provider env vars
|
||
provider_env_vars = {"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN", "OPENAI_BASE_URL"}
|
||
for pconfig in PROVIDER_REGISTRY.values():
|
||
if pconfig.auth_type == "api_key":
|
||
provider_env_vars.update(pconfig.api_key_env_vars)
|
||
if any(os.getenv(v) for v in provider_env_vars):
|
||
return True
|
||
|
||
# Check .env file for keys
|
||
env_file = get_env_path()
|
||
if env_file.exists():
|
||
try:
|
||
for line in env_file.read_text().splitlines():
|
||
line = line.strip()
|
||
if line.startswith("#") or "=" not in line:
|
||
continue
|
||
key, _, val = line.partition("=")
|
||
val = val.strip().strip("'\"")
|
||
if key.strip() in provider_env_vars and val:
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# Check provider-specific auth fallbacks (for example, Copilot via gh auth).
|
||
try:
|
||
for provider_id, pconfig in PROVIDER_REGISTRY.items():
|
||
if pconfig.auth_type != "api_key":
|
||
continue
|
||
status = get_auth_status(provider_id)
|
||
if status.get("logged_in"):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# Check for Nous Portal OAuth credentials
|
||
auth_file = get_hermes_home() / "auth.json"
|
||
if auth_file.exists():
|
||
try:
|
||
import json
|
||
auth = json.loads(auth_file.read_text())
|
||
active = auth.get("active_provider")
|
||
if active:
|
||
status = get_auth_status(active)
|
||
if status.get("logged_in"):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# Check for Claude Code OAuth credentials (~/.claude/.credentials.json)
|
||
# These are used by resolve_anthropic_token() at runtime but were missing
|
||
# from this startup gate check.
|
||
try:
|
||
from agent.anthropic_adapter import read_claude_code_credentials, is_claude_code_token_valid
|
||
creds = read_claude_code_credentials()
|
||
if creds and (is_claude_code_token_valid(creds) or creds.get("refreshToken")):
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
return False
|
||
|
||
|
||
def _session_browse_picker(sessions: list) -> Optional[str]:
|
||
"""Interactive curses-based session browser with live search filtering.
|
||
|
||
Returns the selected session ID, or None if cancelled.
|
||
Uses curses (not simple_term_menu) to avoid the ghost-duplication rendering
|
||
bug in tmux/iTerm when arrow keys are used.
|
||
"""
|
||
if not sessions:
|
||
print("No sessions found.")
|
||
return None
|
||
|
||
# Try curses-based picker first
|
||
try:
|
||
import curses
|
||
|
||
result_holder = [None]
|
||
|
||
def _format_row(s, max_x):
|
||
"""Format a session row for display."""
|
||
title = (s.get("title") or "").strip()
|
||
preview = (s.get("preview") or "").strip()
|
||
source = s.get("source", "")[:6]
|
||
last_active = _relative_time(s.get("last_active"))
|
||
sid = s["id"][:18]
|
||
|
||
# Adaptive column widths based on terminal width
|
||
# Layout: [arrow 3] [title/preview flexible] [active 12] [src 6] [id 18]
|
||
fixed_cols = 3 + 12 + 6 + 18 + 6 # arrow + active + src + id + padding
|
||
name_width = max(20, max_x - fixed_cols)
|
||
|
||
if title:
|
||
name = title[:name_width]
|
||
elif preview:
|
||
name = preview[:name_width]
|
||
else:
|
||
name = sid
|
||
|
||
return f"{name:<{name_width}} {last_active:<10} {source:<5} {sid}"
|
||
|
||
def _match(s, query):
|
||
"""Check if a session matches the search query (case-insensitive)."""
|
||
q = query.lower()
|
||
return (
|
||
q in (s.get("title") or "").lower()
|
||
or q in (s.get("preview") or "").lower()
|
||
or q in s.get("id", "").lower()
|
||
or q in (s.get("source") or "").lower()
|
||
)
|
||
|
||
def _curses_browse(stdscr):
|
||
curses.curs_set(0)
|
||
if curses.has_colors():
|
||
curses.start_color()
|
||
curses.use_default_colors()
|
||
curses.init_pair(1, curses.COLOR_GREEN, -1) # selected
|
||
curses.init_pair(2, curses.COLOR_YELLOW, -1) # header
|
||
curses.init_pair(3, curses.COLOR_CYAN, -1) # search
|
||
curses.init_pair(4, 8, -1) # dim
|
||
|
||
cursor = 0
|
||
scroll_offset = 0
|
||
search_text = ""
|
||
filtered = list(sessions)
|
||
|
||
while True:
|
||
stdscr.clear()
|
||
max_y, max_x = stdscr.getmaxyx()
|
||
if max_y < 5 or max_x < 40:
|
||
# Terminal too small
|
||
try:
|
||
stdscr.addstr(0, 0, "Terminal too small")
|
||
except curses.error:
|
||
pass
|
||
stdscr.refresh()
|
||
stdscr.getch()
|
||
return
|
||
|
||
# Header line
|
||
if search_text:
|
||
header = f" Browse sessions — filter: {search_text}█"
|
||
header_attr = curses.A_BOLD
|
||
if curses.has_colors():
|
||
header_attr |= curses.color_pair(3)
|
||
else:
|
||
header = " Browse sessions — ↑↓ navigate Enter select Type to filter Esc quit"
|
||
header_attr = curses.A_BOLD
|
||
if curses.has_colors():
|
||
header_attr |= curses.color_pair(2)
|
||
try:
|
||
stdscr.addnstr(0, 0, header, max_x - 1, header_attr)
|
||
except curses.error:
|
||
pass
|
||
|
||
# Column header line
|
||
fixed_cols = 3 + 12 + 6 + 18 + 6
|
||
name_width = max(20, max_x - fixed_cols)
|
||
col_header = f" {'Title / Preview':<{name_width}} {'Active':<10} {'Src':<5} {'ID'}"
|
||
try:
|
||
dim_attr = curses.color_pair(4) if curses.has_colors() else curses.A_DIM
|
||
stdscr.addnstr(1, 0, col_header, max_x - 1, dim_attr)
|
||
except curses.error:
|
||
pass
|
||
|
||
# Compute visible area
|
||
visible_rows = max_y - 4 # header + col header + blank + footer
|
||
if visible_rows < 1:
|
||
visible_rows = 1
|
||
|
||
# Clamp cursor and scroll
|
||
if not filtered:
|
||
try:
|
||
msg = " No sessions match the filter."
|
||
stdscr.addnstr(3, 0, msg, max_x - 1, curses.A_DIM)
|
||
except curses.error:
|
||
pass
|
||
else:
|
||
if cursor >= len(filtered):
|
||
cursor = len(filtered) - 1
|
||
if cursor < 0:
|
||
cursor = 0
|
||
if cursor < scroll_offset:
|
||
scroll_offset = cursor
|
||
elif cursor >= scroll_offset + visible_rows:
|
||
scroll_offset = cursor - visible_rows + 1
|
||
|
||
for draw_i, i in enumerate(range(
|
||
scroll_offset,
|
||
min(len(filtered), scroll_offset + visible_rows)
|
||
)):
|
||
y = draw_i + 3
|
||
if y >= max_y - 1:
|
||
break
|
||
s = filtered[i]
|
||
arrow = " → " if i == cursor else " "
|
||
row = arrow + _format_row(s, max_x - 3)
|
||
attr = curses.A_NORMAL
|
||
if i == cursor:
|
||
attr = curses.A_BOLD
|
||
if curses.has_colors():
|
||
attr |= curses.color_pair(1)
|
||
try:
|
||
stdscr.addnstr(y, 0, row, max_x - 1, attr)
|
||
except curses.error:
|
||
pass
|
||
|
||
# Footer
|
||
footer_y = max_y - 1
|
||
if filtered:
|
||
footer = f" {cursor + 1}/{len(filtered)} sessions"
|
||
if len(filtered) < len(sessions):
|
||
footer += f" (filtered from {len(sessions)})"
|
||
else:
|
||
footer = f" 0/{len(sessions)} sessions"
|
||
try:
|
||
stdscr.addnstr(footer_y, 0, footer, max_x - 1,
|
||
curses.color_pair(4) if curses.has_colors() else curses.A_DIM)
|
||
except curses.error:
|
||
pass
|
||
|
||
stdscr.refresh()
|
||
key = stdscr.getch()
|
||
|
||
if key in (curses.KEY_UP, ):
|
||
if filtered:
|
||
cursor = (cursor - 1) % len(filtered)
|
||
elif key in (curses.KEY_DOWN, ):
|
||
if filtered:
|
||
cursor = (cursor + 1) % len(filtered)
|
||
elif key in (curses.KEY_ENTER, 10, 13):
|
||
if filtered:
|
||
result_holder[0] = filtered[cursor]["id"]
|
||
return
|
||
elif key == 27: # Esc
|
||
if search_text:
|
||
# First Esc clears the search
|
||
search_text = ""
|
||
filtered = list(sessions)
|
||
cursor = 0
|
||
scroll_offset = 0
|
||
else:
|
||
# Second Esc exits
|
||
return
|
||
elif key in (curses.KEY_BACKSPACE, 127, 8):
|
||
if search_text:
|
||
search_text = search_text[:-1]
|
||
if search_text:
|
||
filtered = [s for s in sessions if _match(s, search_text)]
|
||
else:
|
||
filtered = list(sessions)
|
||
cursor = 0
|
||
scroll_offset = 0
|
||
elif key == ord('q') and not search_text:
|
||
return
|
||
elif 32 <= key <= 126:
|
||
# Printable character → add to search filter
|
||
search_text += chr(key)
|
||
filtered = [s for s in sessions if _match(s, search_text)]
|
||
cursor = 0
|
||
scroll_offset = 0
|
||
|
||
curses.wrapper(_curses_browse)
|
||
return result_holder[0]
|
||
|
||
except Exception:
|
||
pass
|
||
|
||
# Fallback: numbered list (Windows without curses, etc.)
|
||
print("\n Browse sessions (enter number to resume, q to cancel)\n")
|
||
for i, s in enumerate(sessions):
|
||
title = (s.get("title") or "").strip()
|
||
preview = (s.get("preview") or "").strip()
|
||
label = title or preview or s["id"]
|
||
if len(label) > 50:
|
||
label = label[:47] + "..."
|
||
last_active = _relative_time(s.get("last_active"))
|
||
src = s.get("source", "")[:6]
|
||
print(f" {i + 1:>3}. {label:<50} {last_active:<10} {src}")
|
||
|
||
while True:
|
||
try:
|
||
val = input(f"\n Select [1-{len(sessions)}]: ").strip()
|
||
if not val or val.lower() in ("q", "quit", "exit"):
|
||
return None
|
||
idx = int(val) - 1
|
||
if 0 <= idx < len(sessions):
|
||
return sessions[idx]["id"]
|
||
print(f" Invalid selection. Enter 1-{len(sessions)} or q to cancel.")
|
||
except ValueError:
|
||
print(" Invalid input. Enter a number or q to cancel.")
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return None
|
||
|
||
|
||
def _resolve_last_cli_session() -> Optional[str]:
|
||
"""Look up the most recent CLI session ID from SQLite. Returns None if unavailable."""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
sessions = db.search_sessions(source="cli", limit=1)
|
||
db.close()
|
||
if sessions:
|
||
return sessions[0]["id"]
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _resolve_session_by_name_or_id(name_or_id: str) -> Optional[str]:
|
||
"""Resolve a session name (title) or ID to a session ID.
|
||
|
||
- If it looks like a session ID (contains underscore + hex), try direct lookup first.
|
||
- Otherwise, treat it as a title and use resolve_session_by_title (auto-latest).
|
||
- Falls back to the other method if the first doesn't match.
|
||
"""
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
|
||
# Try as exact session ID first
|
||
session = db.get_session(name_or_id)
|
||
if session:
|
||
db.close()
|
||
return session["id"]
|
||
|
||
# Try as title (with auto-latest for lineage)
|
||
session_id = db.resolve_session_by_title(name_or_id)
|
||
db.close()
|
||
return session_id
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def cmd_chat(args):
|
||
"""Run interactive chat CLI."""
|
||
# Resolve --continue into --resume with the latest CLI session or by name
|
||
continue_val = getattr(args, "continue_last", None)
|
||
if continue_val and not getattr(args, "resume", None):
|
||
if isinstance(continue_val, str):
|
||
# -c "session name" — resolve by title or ID
|
||
resolved = _resolve_session_by_name_or_id(continue_val)
|
||
if resolved:
|
||
args.resume = resolved
|
||
else:
|
||
print(f"No session found matching '{continue_val}'.")
|
||
print("Use 'hermes sessions list' to see available sessions.")
|
||
sys.exit(1)
|
||
else:
|
||
# -c with no argument — continue the most recent session
|
||
last_id = _resolve_last_cli_session()
|
||
if last_id:
|
||
args.resume = last_id
|
||
else:
|
||
print("No previous CLI session found to continue.")
|
||
sys.exit(1)
|
||
|
||
# Resolve --resume by title if it's not a direct session ID
|
||
resume_val = getattr(args, "resume", None)
|
||
if resume_val:
|
||
resolved = _resolve_session_by_name_or_id(resume_val)
|
||
if resolved:
|
||
args.resume = resolved
|
||
# If resolution fails, keep the original value — _init_agent will
|
||
# report "Session not found" with the original input
|
||
|
||
# First-run guard: check if any provider is configured before launching
|
||
if not _has_any_provider_configured():
|
||
print()
|
||
print("It looks like Hermes isn't configured yet -- no API keys or providers found.")
|
||
print()
|
||
print(" Run: hermes setup")
|
||
print()
|
||
|
||
from hermes_cli.setup import is_interactive_stdin, print_noninteractive_setup_guidance
|
||
|
||
if not is_interactive_stdin():
|
||
print_noninteractive_setup_guidance(
|
||
"No interactive TTY detected for the first-run setup prompt."
|
||
)
|
||
sys.exit(1)
|
||
|
||
try:
|
||
reply = input("Run setup now? [Y/n] ").strip().lower()
|
||
except (EOFError, KeyboardInterrupt):
|
||
reply = "n"
|
||
if reply in ("", "y", "yes"):
|
||
cmd_setup(args)
|
||
return
|
||
print()
|
||
print("You can run 'hermes setup' at any time to configure.")
|
||
sys.exit(1)
|
||
|
||
# Start update check in background (runs while other init happens)
|
||
try:
|
||
from hermes_cli.banner import prefetch_update_check
|
||
prefetch_update_check()
|
||
except Exception:
|
||
pass
|
||
|
||
# Sync bundled skills on every CLI launch (fast -- skips unchanged skills)
|
||
try:
|
||
from tools.skills_sync import sync_skills
|
||
sync_skills(quiet=True)
|
||
except Exception:
|
||
pass
|
||
|
||
# --yolo: bypass all dangerous command approvals
|
||
if getattr(args, "yolo", False):
|
||
os.environ["HERMES_YOLO_MODE"] = "1"
|
||
|
||
# --source: tag session source for filtering (e.g. 'tool' for third-party integrations)
|
||
if getattr(args, "source", None):
|
||
os.environ["HERMES_SESSION_SOURCE"] = args.source
|
||
|
||
# Import and run the CLI
|
||
from cli import main as cli_main
|
||
|
||
# Build kwargs from args
|
||
kwargs = {
|
||
"model": args.model,
|
||
"provider": getattr(args, "provider", None),
|
||
"toolsets": args.toolsets,
|
||
"skills": getattr(args, "skills", None),
|
||
"verbose": args.verbose,
|
||
"quiet": getattr(args, "quiet", False),
|
||
"query": args.query,
|
||
"resume": getattr(args, "resume", None),
|
||
"worktree": getattr(args, "worktree", False),
|
||
"checkpoints": getattr(args, "checkpoints", False),
|
||
"pass_session_id": getattr(args, "pass_session_id", False),
|
||
}
|
||
# Filter out None values
|
||
kwargs = {k: v for k, v in kwargs.items() if v is not None}
|
||
|
||
try:
|
||
cli_main(**kwargs)
|
||
except ValueError as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
def cmd_gateway(args):
|
||
"""Gateway management commands."""
|
||
from hermes_cli.gateway import gateway_command
|
||
gateway_command(args)
|
||
|
||
|
||
def cmd_whatsapp(args):
|
||
"""Set up WhatsApp: choose mode, configure, install bridge, pair via QR."""
|
||
_require_tty("whatsapp")
|
||
import subprocess
|
||
from pathlib import Path
|
||
from hermes_cli.config import get_env_value, save_env_value
|
||
|
||
print()
|
||
print("⚕ WhatsApp Setup")
|
||
print("=" * 50)
|
||
|
||
# ── Step 1: Choose mode ──────────────────────────────────────────────
|
||
current_mode = get_env_value("WHATSAPP_MODE") or ""
|
||
if not current_mode:
|
||
print()
|
||
print("How will you use WhatsApp with Hermes?")
|
||
print()
|
||
print(" 1. Separate bot number (recommended)")
|
||
print(" People message the bot's number directly — cleanest experience.")
|
||
print(" Requires a second phone number with WhatsApp installed on a device.")
|
||
print()
|
||
print(" 2. Personal number (self-chat)")
|
||
print(" You message yourself to talk to the agent.")
|
||
print(" Quick to set up, but the UX is less intuitive.")
|
||
print()
|
||
try:
|
||
choice = input(" Choose [1/2]: ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
print("\nSetup cancelled.")
|
||
return
|
||
|
||
if choice == "1":
|
||
save_env_value("WHATSAPP_MODE", "bot")
|
||
wa_mode = "bot"
|
||
print(" ✓ Mode: separate bot number")
|
||
print()
|
||
print(" ┌─────────────────────────────────────────────────┐")
|
||
print(" │ Getting a second number for the bot: │")
|
||
print(" │ │")
|
||
print(" │ Easiest: Install WhatsApp Business (free app) │")
|
||
print(" │ on your phone with a second number: │")
|
||
print(" │ • Dual-SIM: use your 2nd SIM slot │")
|
||
print(" │ • Google Voice: free US number (voice.google) │")
|
||
print(" │ • Prepaid SIM: $3-10, verify once │")
|
||
print(" │ │")
|
||
print(" │ WhatsApp Business runs alongside your personal │")
|
||
print(" │ WhatsApp — no second phone needed. │")
|
||
print(" └─────────────────────────────────────────────────┘")
|
||
else:
|
||
save_env_value("WHATSAPP_MODE", "self-chat")
|
||
wa_mode = "self-chat"
|
||
print(" ✓ Mode: personal number (self-chat)")
|
||
else:
|
||
wa_mode = current_mode
|
||
mode_label = "separate bot number" if wa_mode == "bot" else "personal number (self-chat)"
|
||
print(f"\n✓ Mode: {mode_label}")
|
||
|
||
# ── Step 2: Enable WhatsApp ──────────────────────────────────────────
|
||
print()
|
||
current = get_env_value("WHATSAPP_ENABLED")
|
||
if current and current.lower() == "true":
|
||
print("✓ WhatsApp is already enabled")
|
||
else:
|
||
save_env_value("WHATSAPP_ENABLED", "true")
|
||
print("✓ WhatsApp enabled")
|
||
|
||
# ── Step 3: Allowed users ────────────────────────────────────────────
|
||
current_users = get_env_value("WHATSAPP_ALLOWED_USERS") or ""
|
||
if current_users:
|
||
print(f"✓ Allowed users: {current_users}")
|
||
try:
|
||
response = input("\n Update allowed users? [y/N] ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
response = "n"
|
||
if response.lower() in ("y", "yes"):
|
||
if wa_mode == "bot":
|
||
phone = input(" Phone numbers that can message the bot (comma-separated): ").strip()
|
||
else:
|
||
phone = input(" Your phone number (e.g. 15551234567): ").strip()
|
||
if phone:
|
||
save_env_value("WHATSAPP_ALLOWED_USERS", phone.replace(" ", ""))
|
||
print(f" ✓ Updated to: {phone}")
|
||
else:
|
||
print()
|
||
if wa_mode == "bot":
|
||
print(" Who should be allowed to message the bot?")
|
||
phone = input(" Phone numbers (comma-separated, or * for anyone): ").strip()
|
||
else:
|
||
phone = input(" Your phone number (e.g. 15551234567): ").strip()
|
||
if phone:
|
||
save_env_value("WHATSAPP_ALLOWED_USERS", phone.replace(" ", ""))
|
||
print(f" ✓ Allowed users set: {phone}")
|
||
else:
|
||
print(" ⚠ No allowlist — the agent will respond to ALL incoming messages")
|
||
|
||
# ── Step 4: Install bridge dependencies ──────────────────────────────
|
||
project_root = Path(__file__).resolve().parents[1]
|
||
bridge_dir = project_root / "scripts" / "whatsapp-bridge"
|
||
bridge_script = bridge_dir / "bridge.js"
|
||
|
||
if not bridge_script.exists():
|
||
print(f"\n✗ Bridge script not found at {bridge_script}")
|
||
return
|
||
|
||
if not (bridge_dir / "node_modules").exists():
|
||
print("\n→ Installing WhatsApp bridge dependencies...")
|
||
result = subprocess.run(
|
||
["npm", "install"],
|
||
cwd=str(bridge_dir),
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=120,
|
||
)
|
||
if result.returncode != 0:
|
||
print(f" ✗ npm install failed: {result.stderr}")
|
||
return
|
||
print(" ✓ Dependencies installed")
|
||
else:
|
||
print("✓ Bridge dependencies already installed")
|
||
|
||
# ── Step 5: Check for existing session ───────────────────────────────
|
||
session_dir = get_hermes_home() / "whatsapp" / "session"
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
if (session_dir / "creds.json").exists():
|
||
print("✓ Existing WhatsApp session found")
|
||
try:
|
||
response = input("\n Re-pair? This will clear the existing session. [y/N] ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
response = "n"
|
||
if response.lower() in ("y", "yes"):
|
||
import shutil
|
||
shutil.rmtree(session_dir, ignore_errors=True)
|
||
session_dir.mkdir(parents=True, exist_ok=True)
|
||
print(" ✓ Session cleared")
|
||
else:
|
||
print("\n✓ WhatsApp is configured and paired!")
|
||
print(" Start the gateway with: hermes gateway")
|
||
return
|
||
|
||
# ── Step 6: QR code pairing ──────────────────────────────────────────
|
||
print()
|
||
print("─" * 50)
|
||
if wa_mode == "bot":
|
||
print("📱 Open WhatsApp (or WhatsApp Business) on the")
|
||
print(" phone with the BOT's number, then scan:")
|
||
else:
|
||
print("📱 Open WhatsApp on your phone, then scan:")
|
||
print()
|
||
print(" Settings → Linked Devices → Link a Device")
|
||
print("─" * 50)
|
||
print()
|
||
|
||
try:
|
||
subprocess.run(
|
||
["node", str(bridge_script), "--pair-only", "--session", str(session_dir)],
|
||
cwd=str(bridge_dir),
|
||
)
|
||
except KeyboardInterrupt:
|
||
pass
|
||
|
||
# ── Step 7: Post-pairing ─────────────────────────────────────────────
|
||
print()
|
||
if (session_dir / "creds.json").exists():
|
||
print("✓ WhatsApp paired successfully!")
|
||
print()
|
||
if wa_mode == "bot":
|
||
print(" Next steps:")
|
||
print(" 1. Start the gateway: hermes gateway")
|
||
print(" 2. Send a message to the bot's WhatsApp number")
|
||
print(" 3. The agent will reply automatically")
|
||
print()
|
||
print(" Tip: Agent responses are prefixed with '⚕ Hermes Agent'")
|
||
else:
|
||
print(" Next steps:")
|
||
print(" 1. Start the gateway: hermes gateway")
|
||
print(" 2. Open WhatsApp → Message Yourself")
|
||
print(" 3. Type a message — the agent will reply")
|
||
print()
|
||
print(" Tip: Agent responses are prefixed with '⚕ Hermes Agent'")
|
||
print(" so you can tell them apart from your own messages.")
|
||
print()
|
||
print(" Or install as a service: hermes gateway install")
|
||
else:
|
||
print("⚠ Pairing may not have completed. Run 'hermes whatsapp' to try again.")
|
||
|
||
|
||
def cmd_setup(args):
|
||
"""Interactive setup wizard."""
|
||
_require_tty("setup")
|
||
from hermes_cli.setup import run_setup_wizard
|
||
run_setup_wizard(args)
|
||
|
||
|
||
def cmd_model(args):
|
||
"""Select default model — starts with provider selection, then model picker."""
|
||
_require_tty("model")
|
||
from hermes_cli.auth import (
|
||
resolve_provider, AuthError, format_auth_error,
|
||
)
|
||
from hermes_cli.config import load_config, get_env_value
|
||
|
||
config = load_config()
|
||
current_model = config.get("model")
|
||
if isinstance(current_model, dict):
|
||
current_model = current_model.get("default", "")
|
||
current_model = current_model or "(not set)"
|
||
|
||
# Read effective provider the same way the CLI does at startup:
|
||
# config.yaml model.provider > env var > auto-detect
|
||
import os
|
||
config_provider = None
|
||
model_cfg = config.get("model")
|
||
if isinstance(model_cfg, dict):
|
||
config_provider = model_cfg.get("provider")
|
||
|
||
effective_provider = (
|
||
config_provider
|
||
or os.getenv("HERMES_INFERENCE_PROVIDER")
|
||
or "auto"
|
||
)
|
||
try:
|
||
active = resolve_provider(effective_provider)
|
||
except AuthError as exc:
|
||
warning = format_auth_error(exc)
|
||
print(f"Warning: {warning} Falling back to auto provider detection.")
|
||
active = resolve_provider("auto")
|
||
|
||
# Detect custom endpoint
|
||
if active == "openrouter" and get_env_value("OPENAI_BASE_URL"):
|
||
active = "custom"
|
||
|
||
provider_labels = {
|
||
"openrouter": "OpenRouter",
|
||
"nous": "Nous Portal",
|
||
"openai-codex": "OpenAI Codex",
|
||
"copilot-acp": "GitHub Copilot ACP",
|
||
"copilot": "GitHub Copilot",
|
||
"anthropic": "Anthropic",
|
||
"zai": "Z.AI / GLM",
|
||
"kimi-coding": "Kimi / Moonshot",
|
||
"minimax": "MiniMax",
|
||
"minimax-cn": "MiniMax (China)",
|
||
"opencode-zen": "OpenCode Zen",
|
||
"opencode-go": "OpenCode Go",
|
||
"ai-gateway": "AI Gateway",
|
||
"kilocode": "Kilo Code",
|
||
"alibaba": "Alibaba Cloud (DashScope)",
|
||
"huggingface": "Hugging Face",
|
||
"custom": "Custom endpoint",
|
||
}
|
||
active_label = provider_labels.get(active, active)
|
||
|
||
print()
|
||
print(f" Current model: {current_model}")
|
||
print(f" Active provider: {active_label}")
|
||
print()
|
||
|
||
# Step 1: Provider selection — put active provider first with marker
|
||
providers = [
|
||
("openrouter", "OpenRouter (100+ models, pay-per-use)"),
|
||
("nous", "Nous Portal (Nous Research subscription)"),
|
||
("openai-codex", "OpenAI Codex"),
|
||
("copilot-acp", "GitHub Copilot ACP (spawns `copilot --acp --stdio`)"),
|
||
("copilot", "GitHub Copilot (uses GITHUB_TOKEN or gh auth token)"),
|
||
("anthropic", "Anthropic (Claude models — API key or Claude Code)"),
|
||
("zai", "Z.AI / GLM (Zhipu AI direct API)"),
|
||
("kimi-coding", "Kimi / Moonshot (Moonshot AI direct API)"),
|
||
("minimax", "MiniMax (global direct API)"),
|
||
("minimax-cn", "MiniMax China (domestic direct API)"),
|
||
("kilocode", "Kilo Code (Kilo Gateway API)"),
|
||
("opencode-zen", "OpenCode Zen (35+ curated models, pay-as-you-go)"),
|
||
("opencode-go", "OpenCode Go (open models, $10/month subscription)"),
|
||
("ai-gateway", "AI Gateway (Vercel — 200+ models, pay-per-use)"),
|
||
("alibaba", "Alibaba Cloud / DashScope Coding (Qwen + multi-provider)"),
|
||
("huggingface", "Hugging Face Inference Providers (20+ open models)"),
|
||
]
|
||
|
||
# Add user-defined custom providers from config.yaml
|
||
custom_providers_cfg = config.get("custom_providers") or []
|
||
_custom_provider_map = {} # key → {name, base_url, api_key}
|
||
if isinstance(custom_providers_cfg, list):
|
||
for entry in custom_providers_cfg:
|
||
if not isinstance(entry, dict):
|
||
continue
|
||
name = (entry.get("name") or "").strip()
|
||
base_url = (entry.get("base_url") or "").strip()
|
||
if not name or not base_url:
|
||
continue
|
||
# Generate a stable key from the name
|
||
key = "custom:" + name.lower().replace(" ", "-")
|
||
short_url = base_url.replace("https://", "").replace("http://", "").rstrip("/")
|
||
saved_model = entry.get("model", "")
|
||
model_hint = f" — {saved_model}" if saved_model else ""
|
||
providers.append((key, f"{name} ({short_url}){model_hint}"))
|
||
_custom_provider_map[key] = {
|
||
"name": name,
|
||
"base_url": base_url,
|
||
"api_key": entry.get("api_key", ""),
|
||
"model": saved_model,
|
||
}
|
||
|
||
# Always add the manual custom endpoint option last
|
||
providers.append(("custom", "Custom endpoint (enter URL manually)"))
|
||
|
||
# Add removal option if there are saved custom providers
|
||
if _custom_provider_map:
|
||
providers.append(("remove-custom", "Remove a saved custom provider"))
|
||
|
||
# Reorder so the active provider is at the top
|
||
known_keys = {k for k, _ in providers}
|
||
active_key = active if active in known_keys else "custom"
|
||
ordered = []
|
||
for key, label in providers:
|
||
if key == active_key:
|
||
ordered.insert(0, (key, f"{label} ← currently active"))
|
||
else:
|
||
ordered.append((key, label))
|
||
ordered.append(("cancel", "Cancel"))
|
||
|
||
provider_idx = _prompt_provider_choice([label for _, label in ordered])
|
||
if provider_idx is None or ordered[provider_idx][0] == "cancel":
|
||
print("No change.")
|
||
return
|
||
|
||
selected_provider = ordered[provider_idx][0]
|
||
|
||
# Step 2: Provider-specific setup + model selection
|
||
if selected_provider == "openrouter":
|
||
_model_flow_openrouter(config, current_model)
|
||
elif selected_provider == "nous":
|
||
_model_flow_nous(config, current_model)
|
||
elif selected_provider == "openai-codex":
|
||
_model_flow_openai_codex(config, current_model)
|
||
elif selected_provider == "copilot-acp":
|
||
_model_flow_copilot_acp(config, current_model)
|
||
elif selected_provider == "copilot":
|
||
_model_flow_copilot(config, current_model)
|
||
elif selected_provider == "custom":
|
||
_model_flow_custom(config)
|
||
elif selected_provider.startswith("custom:") and selected_provider in _custom_provider_map:
|
||
_model_flow_named_custom(config, _custom_provider_map[selected_provider])
|
||
elif selected_provider == "remove-custom":
|
||
_remove_custom_provider(config)
|
||
elif selected_provider == "anthropic":
|
||
_model_flow_anthropic(config, current_model)
|
||
elif selected_provider == "kimi-coding":
|
||
_model_flow_kimi(config, current_model)
|
||
elif selected_provider in ("zai", "minimax", "minimax-cn", "kilocode", "opencode-zen", "opencode-go", "ai-gateway", "alibaba", "huggingface"):
|
||
_model_flow_api_key_provider(config, selected_provider, current_model)
|
||
|
||
|
||
def _prompt_provider_choice(choices):
|
||
"""Show provider selection menu. Returns index or None."""
|
||
try:
|
||
from simple_term_menu import TerminalMenu
|
||
menu_items = [f" {c}" for c in choices]
|
||
menu = TerminalMenu(
|
||
menu_items, cursor_index=0,
|
||
menu_cursor="-> ", menu_cursor_style=("fg_green", "bold"),
|
||
menu_highlight_style=("fg_green",),
|
||
cycle_cursor=True, clear_screen=False,
|
||
title="Select provider:",
|
||
)
|
||
idx = menu.show()
|
||
print()
|
||
return idx
|
||
except (ImportError, NotImplementedError):
|
||
pass
|
||
|
||
# Fallback: numbered list
|
||
print("Select provider:")
|
||
for i, c in enumerate(choices, 1):
|
||
print(f" {i}. {c}")
|
||
print()
|
||
while True:
|
||
try:
|
||
val = input(f"Choice [1-{len(choices)}]: ").strip()
|
||
if not val:
|
||
return None
|
||
idx = int(val) - 1
|
||
if 0 <= idx < len(choices):
|
||
return idx
|
||
print(f"Please enter 1-{len(choices)}")
|
||
except ValueError:
|
||
print("Please enter a number")
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return None
|
||
|
||
|
||
def _model_flow_openrouter(config, current_model=""):
|
||
"""OpenRouter provider: ensure API key, then pick model."""
|
||
from hermes_cli.auth import _prompt_model_selection, _save_model_choice, deactivate_provider
|
||
from hermes_cli.config import get_env_value, save_env_value
|
||
|
||
api_key = get_env_value("OPENROUTER_API_KEY")
|
||
if not api_key:
|
||
print("No OpenRouter API key configured.")
|
||
print("Get one at: https://openrouter.ai/keys")
|
||
print()
|
||
try:
|
||
key = input("OpenRouter API key (or Enter to cancel): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
if not key:
|
||
print("Cancelled.")
|
||
return
|
||
save_env_value("OPENROUTER_API_KEY", key)
|
||
print("API key saved.")
|
||
print()
|
||
|
||
from hermes_cli.models import model_ids
|
||
openrouter_models = model_ids()
|
||
|
||
selected = _prompt_model_selection(openrouter_models, current_model=current_model)
|
||
if selected:
|
||
# Clear any custom endpoint and set provider to openrouter
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
_save_model_choice(selected)
|
||
|
||
# Update config provider and deactivate any OAuth provider
|
||
from hermes_cli.config import load_config, save_config
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = "openrouter"
|
||
model["base_url"] = OPENROUTER_BASE_URL
|
||
model["api_mode"] = "chat_completions"
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
print(f"Default model set to: {selected} (via OpenRouter)")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def _model_flow_nous(config, current_model=""):
|
||
"""Nous Portal provider: ensure logged in, then pick model."""
|
||
from hermes_cli.auth import (
|
||
get_provider_auth_state, _prompt_model_selection, _save_model_choice,
|
||
_update_config_for_provider, resolve_nous_runtime_credentials,
|
||
fetch_nous_models, AuthError, format_auth_error,
|
||
_login_nous, PROVIDER_REGISTRY,
|
||
)
|
||
from hermes_cli.config import get_env_value, save_env_value
|
||
import argparse
|
||
|
||
state = get_provider_auth_state("nous")
|
||
if not state or not state.get("access_token"):
|
||
print("Not logged into Nous Portal. Starting login...")
|
||
print()
|
||
try:
|
||
mock_args = argparse.Namespace(
|
||
portal_url=None, inference_url=None, client_id=None,
|
||
scope=None, no_browser=False, timeout=15.0,
|
||
ca_bundle=None, insecure=False,
|
||
)
|
||
_login_nous(mock_args, PROVIDER_REGISTRY["nous"])
|
||
except SystemExit:
|
||
print("Login cancelled or failed.")
|
||
return
|
||
except Exception as exc:
|
||
print(f"Login failed: {exc}")
|
||
return
|
||
# login_nous already handles model selection + config update
|
||
return
|
||
|
||
# Already logged in — use curated model list (same as OpenRouter defaults).
|
||
# The live /models endpoint returns hundreds of models; the curated list
|
||
# shows only agentic models users recognize from OpenRouter.
|
||
from hermes_cli.models import _PROVIDER_MODELS
|
||
model_ids = _PROVIDER_MODELS.get("nous", [])
|
||
if not model_ids:
|
||
print("No curated models available for Nous Portal.")
|
||
return
|
||
|
||
print(f"Showing {len(model_ids)} curated models — use \"Enter custom model name\" for others.")
|
||
|
||
# Verify credentials are still valid (catches expired sessions early)
|
||
try:
|
||
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
|
||
except Exception as exc:
|
||
relogin = isinstance(exc, AuthError) and exc.relogin_required
|
||
msg = format_auth_error(exc) if isinstance(exc, AuthError) else str(exc)
|
||
if relogin:
|
||
print(f"Session expired: {msg}")
|
||
print("Re-authenticating with Nous Portal...\n")
|
||
try:
|
||
mock_args = argparse.Namespace(
|
||
portal_url=None, inference_url=None, client_id=None,
|
||
scope=None, no_browser=False, timeout=15.0,
|
||
ca_bundle=None, insecure=False,
|
||
)
|
||
_login_nous(mock_args, PROVIDER_REGISTRY["nous"])
|
||
except Exception as login_exc:
|
||
print(f"Re-login failed: {login_exc}")
|
||
return
|
||
print(f"Could not verify credentials: {msg}")
|
||
return
|
||
|
||
selected = _prompt_model_selection(model_ids, current_model=current_model)
|
||
if selected:
|
||
_save_model_choice(selected)
|
||
# Reactivate Nous as the provider and update config
|
||
inference_url = creds.get("base_url", "")
|
||
_update_config_for_provider("nous", inference_url)
|
||
# Clear any custom endpoint that might conflict
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
print(f"Default model set to: {selected} (via Nous Portal)")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def _model_flow_openai_codex(config, current_model=""):
|
||
"""OpenAI Codex provider: ensure logged in, then pick model."""
|
||
from hermes_cli.auth import (
|
||
get_codex_auth_status, _prompt_model_selection, _save_model_choice,
|
||
_update_config_for_provider, _login_openai_codex,
|
||
PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
|
||
)
|
||
from hermes_cli.codex_models import get_codex_model_ids
|
||
from hermes_cli.config import get_env_value, save_env_value
|
||
import argparse
|
||
|
||
status = get_codex_auth_status()
|
||
if not status.get("logged_in"):
|
||
print("Not logged into OpenAI Codex. Starting login...")
|
||
print()
|
||
try:
|
||
mock_args = argparse.Namespace()
|
||
_login_openai_codex(mock_args, PROVIDER_REGISTRY["openai-codex"])
|
||
except SystemExit:
|
||
print("Login cancelled or failed.")
|
||
return
|
||
except Exception as exc:
|
||
print(f"Login failed: {exc}")
|
||
return
|
||
|
||
_codex_token = None
|
||
try:
|
||
from hermes_cli.auth import resolve_codex_runtime_credentials
|
||
_codex_creds = resolve_codex_runtime_credentials()
|
||
_codex_token = _codex_creds.get("api_key")
|
||
except Exception:
|
||
pass
|
||
|
||
codex_models = get_codex_model_ids(access_token=_codex_token)
|
||
|
||
selected = _prompt_model_selection(codex_models, current_model=current_model)
|
||
if selected:
|
||
_save_model_choice(selected)
|
||
_update_config_for_provider("openai-codex", DEFAULT_CODEX_BASE_URL)
|
||
# Clear custom endpoint env vars that would otherwise override Codex.
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
print(f"Default model set to: {selected} (via OpenAI Codex)")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
|
||
def _model_flow_custom(config):
|
||
"""Custom endpoint: collect URL, API key, and model name.
|
||
|
||
Automatically saves the endpoint to ``custom_providers`` in config.yaml
|
||
so it appears in the provider menu on subsequent runs.
|
||
"""
|
||
from hermes_cli.auth import _save_model_choice, deactivate_provider
|
||
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
|
||
|
||
current_url = get_env_value("OPENAI_BASE_URL") or ""
|
||
current_key = get_env_value("OPENAI_API_KEY") or ""
|
||
|
||
print("Custom OpenAI-compatible endpoint configuration:")
|
||
if current_url:
|
||
print(f" Current URL: {current_url}")
|
||
if current_key:
|
||
print(f" Current key: {current_key[:8]}...")
|
||
print()
|
||
|
||
try:
|
||
base_url = input(f"API base URL [{current_url or 'e.g. https://api.example.com/v1'}]: ").strip()
|
||
api_key = input(f"API key [{current_key[:8] + '...' if current_key else 'optional'}]: ").strip()
|
||
model_name = input("Model name (e.g. gpt-4, llama-3-70b): ").strip()
|
||
context_length_str = input("Context length in tokens [leave blank for auto-detect]: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print("\nCancelled.")
|
||
return
|
||
|
||
context_length = None
|
||
if context_length_str:
|
||
try:
|
||
context_length = int(context_length_str.replace(",", "").replace("k", "000").replace("K", "000"))
|
||
if context_length <= 0:
|
||
context_length = None
|
||
except ValueError:
|
||
print(f"Invalid context length: {context_length_str} — will auto-detect.")
|
||
context_length = None
|
||
|
||
if not base_url and not current_url:
|
||
print("No URL provided. Cancelled.")
|
||
return
|
||
|
||
# Validate URL format
|
||
effective_url = base_url or current_url
|
||
if not effective_url.startswith(("http://", "https://")):
|
||
print(f"Invalid URL: {effective_url} (must start with http:// or https://)")
|
||
return
|
||
|
||
effective_key = api_key or current_key
|
||
|
||
from hermes_cli.models import probe_api_models
|
||
|
||
probe = probe_api_models(effective_key, effective_url)
|
||
if probe.get("used_fallback") and probe.get("resolved_base_url"):
|
||
print(
|
||
f"Warning: endpoint verification worked at {probe['resolved_base_url']}/models, "
|
||
f"not the exact URL you entered. Saving the working base URL instead."
|
||
)
|
||
effective_url = probe["resolved_base_url"]
|
||
if base_url:
|
||
base_url = effective_url
|
||
elif probe.get("models") is not None:
|
||
print(
|
||
f"Verified endpoint via {probe.get('probed_url')} "
|
||
f"({len(probe.get('models') or [])} model(s) visible)"
|
||
)
|
||
else:
|
||
print(
|
||
f"Warning: could not verify this endpoint via {probe.get('probed_url')}. "
|
||
f"Hermes will still save it."
|
||
)
|
||
if probe.get("suggested_base_url"):
|
||
print(f" If this server expects /v1, try base URL: {probe['suggested_base_url']}")
|
||
|
||
if base_url:
|
||
save_env_value("OPENAI_BASE_URL", effective_url)
|
||
if api_key:
|
||
save_env_value("OPENAI_API_KEY", api_key)
|
||
|
||
if model_name:
|
||
_save_model_choice(model_name)
|
||
|
||
# Update config and deactivate any OAuth provider
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = "custom"
|
||
model["base_url"] = effective_url
|
||
model.pop("api_mode", None) # let runtime auto-detect from URL
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"Default model set to: {model_name} (via {effective_url})")
|
||
else:
|
||
if base_url or api_key:
|
||
deactivate_provider()
|
||
print("Endpoint saved. Use `/model` in chat or `hermes model` to set a model.")
|
||
|
||
# Auto-save to custom_providers so it appears in the menu next time
|
||
_save_custom_provider(effective_url, effective_key, model_name or "", context_length=context_length)
|
||
|
||
|
||
def _save_custom_provider(base_url, api_key="", model="", context_length=None):
|
||
"""Save a custom endpoint to custom_providers in config.yaml.
|
||
|
||
Deduplicates by base_url — if the URL already exists, updates the
|
||
model name and context_length but doesn't add a duplicate entry.
|
||
Auto-generates a display name from the URL hostname.
|
||
"""
|
||
from hermes_cli.config import load_config, save_config
|
||
|
||
cfg = load_config()
|
||
providers = cfg.get("custom_providers") or []
|
||
if not isinstance(providers, list):
|
||
providers = []
|
||
|
||
# Check if this URL is already saved — update model/context_length if so
|
||
for entry in providers:
|
||
if isinstance(entry, dict) and entry.get("base_url", "").rstrip("/") == base_url.rstrip("/"):
|
||
changed = False
|
||
if model and entry.get("model") != model:
|
||
entry["model"] = model
|
||
changed = True
|
||
if model and context_length:
|
||
models_cfg = entry.get("models", {})
|
||
if not isinstance(models_cfg, dict):
|
||
models_cfg = {}
|
||
models_cfg[model] = {"context_length": context_length}
|
||
entry["models"] = models_cfg
|
||
changed = True
|
||
if changed:
|
||
cfg["custom_providers"] = providers
|
||
save_config(cfg)
|
||
return # already saved, updated if needed
|
||
|
||
# Auto-generate a name from the URL
|
||
import re
|
||
clean = base_url.replace("https://", "").replace("http://", "").rstrip("/")
|
||
# Remove /v1 suffix for cleaner names
|
||
clean = re.sub(r"/v1/?$", "", clean)
|
||
# Use hostname:port as the name
|
||
name = clean.split("/")[0]
|
||
# Capitalize for readability
|
||
if "localhost" in name or "127.0.0.1" in name:
|
||
name = f"Local ({name})"
|
||
elif "runpod" in name.lower():
|
||
name = f"RunPod ({name})"
|
||
else:
|
||
name = name.capitalize()
|
||
|
||
entry = {"name": name, "base_url": base_url}
|
||
if api_key:
|
||
entry["api_key"] = api_key
|
||
if model:
|
||
entry["model"] = model
|
||
if model and context_length:
|
||
entry["models"] = {model: {"context_length": context_length}}
|
||
|
||
providers.append(entry)
|
||
cfg["custom_providers"] = providers
|
||
save_config(cfg)
|
||
print(f" 💾 Saved to custom providers as \"{name}\" (edit in config.yaml)")
|
||
|
||
|
||
def _remove_custom_provider(config):
|
||
"""Let the user remove a saved custom provider from config.yaml."""
|
||
from hermes_cli.config import load_config, save_config
|
||
|
||
cfg = load_config()
|
||
providers = cfg.get("custom_providers") or []
|
||
if not isinstance(providers, list) or not providers:
|
||
print("No custom providers configured.")
|
||
return
|
||
|
||
print("Remove a custom provider:\n")
|
||
|
||
choices = []
|
||
for entry in providers:
|
||
if isinstance(entry, dict):
|
||
name = entry.get("name", "unnamed")
|
||
url = entry.get("base_url", "")
|
||
short_url = url.replace("https://", "").replace("http://", "").rstrip("/")
|
||
choices.append(f"{name} ({short_url})")
|
||
else:
|
||
choices.append(str(entry))
|
||
choices.append("Cancel")
|
||
|
||
try:
|
||
from simple_term_menu import TerminalMenu
|
||
menu = TerminalMenu(
|
||
[f" {c}" for c in choices], cursor_index=0,
|
||
menu_cursor="-> ", menu_cursor_style=("fg_red", "bold"),
|
||
menu_highlight_style=("fg_red",),
|
||
cycle_cursor=True, clear_screen=False,
|
||
title="Select provider to remove:",
|
||
)
|
||
idx = menu.show()
|
||
print()
|
||
except (ImportError, NotImplementedError):
|
||
for i, c in enumerate(choices, 1):
|
||
print(f" {i}. {c}")
|
||
print()
|
||
try:
|
||
val = input(f"Choice [1-{len(choices)}]: ").strip()
|
||
idx = int(val) - 1 if val else None
|
||
except (ValueError, KeyboardInterrupt, EOFError):
|
||
idx = None
|
||
|
||
if idx is None or idx >= len(providers):
|
||
print("No change.")
|
||
return
|
||
|
||
removed = providers.pop(idx)
|
||
cfg["custom_providers"] = providers
|
||
save_config(cfg)
|
||
removed_name = removed.get("name", "unnamed") if isinstance(removed, dict) else str(removed)
|
||
print(f"✅ Removed \"{removed_name}\" from custom providers.")
|
||
|
||
|
||
def _model_flow_named_custom(config, provider_info):
|
||
"""Handle a named custom provider from config.yaml custom_providers list.
|
||
|
||
If the entry has a saved model name, activates it immediately.
|
||
Otherwise probes the endpoint's /models API to let the user pick one.
|
||
"""
|
||
from hermes_cli.auth import _save_model_choice, deactivate_provider
|
||
from hermes_cli.config import save_env_value, load_config, save_config
|
||
from hermes_cli.models import fetch_api_models
|
||
|
||
name = provider_info["name"]
|
||
base_url = provider_info["base_url"]
|
||
api_key = provider_info.get("api_key", "")
|
||
saved_model = provider_info.get("model", "")
|
||
|
||
# If a model is saved, just activate immediately — no probing needed
|
||
if saved_model:
|
||
save_env_value("OPENAI_BASE_URL", base_url)
|
||
if api_key:
|
||
save_env_value("OPENAI_API_KEY", api_key)
|
||
_save_model_choice(saved_model)
|
||
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = "custom"
|
||
model["base_url"] = base_url
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"✅ Switched to: {saved_model}")
|
||
print(f" Provider: {name} ({base_url})")
|
||
return
|
||
|
||
# No saved model — probe endpoint and let user pick
|
||
print(f" Provider: {name}")
|
||
print(f" URL: {base_url}")
|
||
print()
|
||
print("No model saved for this provider. Fetching available models...")
|
||
models = fetch_api_models(api_key, base_url, timeout=8.0)
|
||
|
||
if models:
|
||
print(f"Found {len(models)} model(s):\n")
|
||
try:
|
||
from simple_term_menu import TerminalMenu
|
||
menu_items = [f" {m}" for m in models] + [" Cancel"]
|
||
menu = TerminalMenu(
|
||
menu_items, cursor_index=0,
|
||
menu_cursor="-> ", menu_cursor_style=("fg_green", "bold"),
|
||
menu_highlight_style=("fg_green",),
|
||
cycle_cursor=True, clear_screen=False,
|
||
title=f"Select model from {name}:",
|
||
)
|
||
idx = menu.show()
|
||
print()
|
||
if idx is None or idx >= len(models):
|
||
print("Cancelled.")
|
||
return
|
||
model_name = models[idx]
|
||
except (ImportError, NotImplementedError):
|
||
for i, m in enumerate(models, 1):
|
||
print(f" {i}. {m}")
|
||
print(f" {len(models) + 1}. Cancel")
|
||
print()
|
||
try:
|
||
val = input(f"Choice [1-{len(models) + 1}]: ").strip()
|
||
if not val:
|
||
print("Cancelled.")
|
||
return
|
||
idx = int(val) - 1
|
||
if idx < 0 or idx >= len(models):
|
||
print("Cancelled.")
|
||
return
|
||
model_name = models[idx]
|
||
except (ValueError, KeyboardInterrupt, EOFError):
|
||
print("\nCancelled.")
|
||
return
|
||
else:
|
||
print("Could not fetch models from endpoint. Enter model name manually.")
|
||
try:
|
||
model_name = input("Model name: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print("\nCancelled.")
|
||
return
|
||
if not model_name:
|
||
print("No model specified. Cancelled.")
|
||
return
|
||
|
||
# Activate and save the model to the custom_providers entry
|
||
save_env_value("OPENAI_BASE_URL", base_url)
|
||
if api_key:
|
||
save_env_value("OPENAI_API_KEY", api_key)
|
||
_save_model_choice(model_name)
|
||
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = "custom"
|
||
model["base_url"] = base_url
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
# Save model name to the custom_providers entry for next time
|
||
_save_custom_provider(base_url, api_key, model_name)
|
||
|
||
print(f"\n✅ Model set to: {model_name}")
|
||
print(f" Provider: {name} ({base_url})")
|
||
|
||
|
||
# Curated model lists for direct API-key providers
|
||
_PROVIDER_MODELS = {
|
||
"copilot-acp": [
|
||
"copilot-acp",
|
||
],
|
||
"copilot": [
|
||
"gpt-5.4",
|
||
"gpt-5.4-mini",
|
||
"gpt-5-mini",
|
||
"gpt-5.3-codex",
|
||
"gpt-5.2-codex",
|
||
"gpt-4.1",
|
||
"gpt-4o",
|
||
"gpt-4o-mini",
|
||
"claude-opus-4.6",
|
||
"claude-sonnet-4.6",
|
||
"claude-sonnet-4.5",
|
||
"claude-haiku-4.5",
|
||
"gemini-2.5-pro",
|
||
"grok-code-fast-1",
|
||
],
|
||
"zai": [
|
||
"glm-5",
|
||
"glm-4.7",
|
||
"glm-4.5",
|
||
"glm-4.5-flash",
|
||
],
|
||
"kimi-coding": [
|
||
"kimi-for-coding",
|
||
"kimi-k2.5",
|
||
"kimi-k2-thinking",
|
||
"kimi-k2-thinking-turbo",
|
||
"kimi-k2-turbo-preview",
|
||
"kimi-k2-0905-preview",
|
||
],
|
||
"moonshot": [
|
||
"kimi-k2.5",
|
||
"kimi-k2-thinking",
|
||
"kimi-k2-turbo-preview",
|
||
"kimi-k2-0905-preview",
|
||
],
|
||
"minimax": [
|
||
"MiniMax-M2.5",
|
||
"MiniMax-M2.5-highspeed",
|
||
"MiniMax-M2.1",
|
||
],
|
||
"minimax-cn": [
|
||
"MiniMax-M2.5",
|
||
"MiniMax-M2.5-highspeed",
|
||
"MiniMax-M2.1",
|
||
],
|
||
"kilocode": [
|
||
"anthropic/claude-opus-4.6",
|
||
"anthropic/claude-sonnet-4.6",
|
||
"openai/gpt-5.4",
|
||
"google/gemini-3-pro-preview",
|
||
"google/gemini-3-flash-preview",
|
||
],
|
||
# Curated HF model list — only agentic models that map to OpenRouter defaults.
|
||
# Format: HF model ID → OpenRouter equivalent noted in comment
|
||
"huggingface": [
|
||
"Qwen/Qwen3.5-397B-A17B", # ↔ qwen/qwen3.5-plus
|
||
"Qwen/Qwen3.5-35B-A3B", # ↔ qwen/qwen3.5-35b-a3b
|
||
"deepseek-ai/DeepSeek-V3.2", # ↔ deepseek/deepseek-chat
|
||
"moonshotai/Kimi-K2.5", # ↔ moonshotai/kimi-k2.5
|
||
"MiniMaxAI/MiniMax-M2.5", # ↔ minimax/minimax-m2.5
|
||
"zai-org/GLM-5", # ↔ z-ai/glm-5
|
||
"XiaomiMiMo/MiMo-V2-Flash", # ↔ xiaomi/mimo-v2-pro
|
||
"moonshotai/Kimi-K2-Thinking", # ↔ moonshotai/kimi-k2-thinking
|
||
],
|
||
}
|
||
|
||
|
||
def _current_reasoning_effort(config) -> str:
|
||
agent_cfg = config.get("agent")
|
||
if isinstance(agent_cfg, dict):
|
||
return str(agent_cfg.get("reasoning_effort") or "").strip().lower()
|
||
return ""
|
||
|
||
|
||
def _set_reasoning_effort(config, effort: str) -> None:
|
||
agent_cfg = config.get("agent")
|
||
if not isinstance(agent_cfg, dict):
|
||
agent_cfg = {}
|
||
config["agent"] = agent_cfg
|
||
agent_cfg["reasoning_effort"] = effort
|
||
|
||
|
||
def _prompt_reasoning_effort_selection(efforts, current_effort=""):
|
||
"""Prompt for a reasoning effort. Returns effort, 'none', or None to keep current."""
|
||
ordered = list(dict.fromkeys(str(effort).strip().lower() for effort in efforts if str(effort).strip()))
|
||
if not ordered:
|
||
return None
|
||
|
||
def _label(effort):
|
||
if effort == current_effort:
|
||
return f"{effort} ← currently in use"
|
||
return effort
|
||
|
||
disable_label = "Disable reasoning"
|
||
skip_label = "Skip (keep current)"
|
||
|
||
if current_effort == "none":
|
||
default_idx = len(ordered)
|
||
elif current_effort in ordered:
|
||
default_idx = ordered.index(current_effort)
|
||
elif "medium" in ordered:
|
||
default_idx = ordered.index("medium")
|
||
else:
|
||
default_idx = 0
|
||
|
||
try:
|
||
from simple_term_menu import TerminalMenu
|
||
|
||
choices = [f" {_label(effort)}" for effort in ordered]
|
||
choices.append(f" {disable_label}")
|
||
choices.append(f" {skip_label}")
|
||
menu = TerminalMenu(
|
||
choices,
|
||
cursor_index=default_idx,
|
||
menu_cursor="-> ",
|
||
menu_cursor_style=("fg_green", "bold"),
|
||
menu_highlight_style=("fg_green",),
|
||
cycle_cursor=True,
|
||
clear_screen=False,
|
||
title="Select reasoning effort:",
|
||
)
|
||
idx = menu.show()
|
||
if idx is None:
|
||
return None
|
||
print()
|
||
if idx < len(ordered):
|
||
return ordered[idx]
|
||
if idx == len(ordered):
|
||
return "none"
|
||
return None
|
||
except (ImportError, NotImplementedError):
|
||
pass
|
||
|
||
print("Select reasoning effort:")
|
||
for i, effort in enumerate(ordered, 1):
|
||
print(f" {i}. {_label(effort)}")
|
||
n = len(ordered)
|
||
print(f" {n + 1}. {disable_label}")
|
||
print(f" {n + 2}. {skip_label}")
|
||
print()
|
||
|
||
while True:
|
||
try:
|
||
choice = input(f"Choice [1-{n + 2}] (default: keep current): ").strip()
|
||
if not choice:
|
||
return None
|
||
idx = int(choice)
|
||
if 1 <= idx <= n:
|
||
return ordered[idx - 1]
|
||
if idx == n + 1:
|
||
return "none"
|
||
if idx == n + 2:
|
||
return None
|
||
print(f"Please enter 1-{n + 2}")
|
||
except ValueError:
|
||
print("Please enter a number")
|
||
except (KeyboardInterrupt, EOFError):
|
||
return None
|
||
|
||
|
||
def _model_flow_copilot(config, current_model=""):
|
||
"""GitHub Copilot flow using env vars, gh CLI, or OAuth device code."""
|
||
from hermes_cli.auth import (
|
||
PROVIDER_REGISTRY,
|
||
_prompt_model_selection,
|
||
_save_model_choice,
|
||
deactivate_provider,
|
||
resolve_api_key_provider_credentials,
|
||
)
|
||
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
|
||
from hermes_cli.models import (
|
||
fetch_api_models,
|
||
fetch_github_model_catalog,
|
||
github_model_reasoning_efforts,
|
||
copilot_model_api_mode,
|
||
normalize_copilot_model_id,
|
||
)
|
||
|
||
provider_id = "copilot"
|
||
pconfig = PROVIDER_REGISTRY[provider_id]
|
||
|
||
creds = resolve_api_key_provider_credentials(provider_id)
|
||
api_key = creds.get("api_key", "")
|
||
source = creds.get("source", "")
|
||
|
||
if not api_key:
|
||
print("No GitHub token configured for GitHub Copilot.")
|
||
print()
|
||
print(" Supported token types:")
|
||
print(" → OAuth token (gho_*) via `copilot login` or device code flow")
|
||
print(" → Fine-grained PAT (github_pat_*) with Copilot Requests permission")
|
||
print(" → GitHub App token (ghu_*) via environment variable")
|
||
print(" ✗ Classic PAT (ghp_*) NOT supported by Copilot API")
|
||
print()
|
||
print(" Options:")
|
||
print(" 1. Login with GitHub (OAuth device code flow)")
|
||
print(" 2. Enter a token manually")
|
||
print(" 3. Cancel")
|
||
print()
|
||
try:
|
||
choice = input(" Choice [1-3]: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
|
||
if choice == "1":
|
||
try:
|
||
from hermes_cli.copilot_auth import copilot_device_code_login
|
||
token = copilot_device_code_login()
|
||
if token:
|
||
save_env_value("COPILOT_GITHUB_TOKEN", token)
|
||
print(" Copilot token saved.")
|
||
print()
|
||
else:
|
||
print(" Login cancelled or failed.")
|
||
return
|
||
except Exception as exc:
|
||
print(f" Login failed: {exc}")
|
||
return
|
||
elif choice == "2":
|
||
try:
|
||
new_key = input(" Token (COPILOT_GITHUB_TOKEN): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
if not new_key:
|
||
print(" Cancelled.")
|
||
return
|
||
# Validate token type
|
||
try:
|
||
from hermes_cli.copilot_auth import validate_copilot_token
|
||
valid, msg = validate_copilot_token(new_key)
|
||
if not valid:
|
||
print(f" ✗ {msg}")
|
||
return
|
||
except ImportError:
|
||
pass
|
||
save_env_value("COPILOT_GITHUB_TOKEN", new_key)
|
||
print(" Token saved.")
|
||
print()
|
||
else:
|
||
print(" Cancelled.")
|
||
return
|
||
|
||
creds = resolve_api_key_provider_credentials(provider_id)
|
||
api_key = creds.get("api_key", "")
|
||
source = creds.get("source", "")
|
||
else:
|
||
if source in ("GITHUB_TOKEN", "GH_TOKEN"):
|
||
print(f" GitHub token: {api_key[:8]}... ✓ ({source})")
|
||
elif source == "gh auth token":
|
||
print(" GitHub token: ✓ (from `gh auth token`)")
|
||
else:
|
||
print(" GitHub token: ✓")
|
||
print()
|
||
|
||
effective_base = pconfig.inference_base_url
|
||
|
||
catalog = fetch_github_model_catalog(api_key)
|
||
live_models = [item.get("id", "") for item in catalog if item.get("id")] if catalog else fetch_api_models(api_key, effective_base)
|
||
normalized_current_model = normalize_copilot_model_id(
|
||
current_model,
|
||
catalog=catalog,
|
||
api_key=api_key,
|
||
) or current_model
|
||
if live_models:
|
||
model_list = [model_id for model_id in live_models if model_id]
|
||
print(f" Found {len(model_list)} model(s) from GitHub Copilot")
|
||
else:
|
||
model_list = _PROVIDER_MODELS.get(provider_id, [])
|
||
if model_list:
|
||
print(" ⚠ Could not auto-detect models from GitHub Copilot — showing defaults.")
|
||
print(' Use "Enter custom model name" if you do not see your model.')
|
||
|
||
if model_list:
|
||
selected = _prompt_model_selection(model_list, current_model=normalized_current_model)
|
||
else:
|
||
try:
|
||
selected = input("Model name: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
selected = None
|
||
|
||
if selected:
|
||
selected = normalize_copilot_model_id(
|
||
selected,
|
||
catalog=catalog,
|
||
api_key=api_key,
|
||
) or selected
|
||
# Clear stale custom-endpoint overrides so the Copilot provider wins cleanly.
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
|
||
initial_cfg = load_config()
|
||
current_effort = _current_reasoning_effort(initial_cfg)
|
||
reasoning_efforts = github_model_reasoning_efforts(
|
||
selected,
|
||
catalog=catalog,
|
||
api_key=api_key,
|
||
)
|
||
selected_effort = None
|
||
if reasoning_efforts:
|
||
print(f" {selected} supports reasoning controls.")
|
||
selected_effort = _prompt_reasoning_effort_selection(
|
||
reasoning_efforts, current_effort=current_effort
|
||
)
|
||
|
||
_save_model_choice(selected)
|
||
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = provider_id
|
||
model["base_url"] = effective_base
|
||
model["api_mode"] = copilot_model_api_mode(
|
||
selected,
|
||
catalog=catalog,
|
||
api_key=api_key,
|
||
)
|
||
if selected_effort is not None:
|
||
_set_reasoning_effort(cfg, selected_effort)
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"Default model set to: {selected} (via {pconfig.name})")
|
||
if reasoning_efforts:
|
||
if selected_effort == "none":
|
||
print("Reasoning disabled for this model.")
|
||
elif selected_effort:
|
||
print(f"Reasoning effort set to: {selected_effort}")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def _model_flow_copilot_acp(config, current_model=""):
|
||
"""GitHub Copilot ACP flow using the local Copilot CLI."""
|
||
from hermes_cli.auth import (
|
||
PROVIDER_REGISTRY,
|
||
_prompt_model_selection,
|
||
_save_model_choice,
|
||
deactivate_provider,
|
||
get_external_process_provider_status,
|
||
resolve_api_key_provider_credentials,
|
||
resolve_external_process_provider_credentials,
|
||
)
|
||
from hermes_cli.models import (
|
||
fetch_github_model_catalog,
|
||
normalize_copilot_model_id,
|
||
)
|
||
from hermes_cli.config import load_config, save_config
|
||
|
||
del config
|
||
|
||
provider_id = "copilot-acp"
|
||
pconfig = PROVIDER_REGISTRY[provider_id]
|
||
|
||
status = get_external_process_provider_status(provider_id)
|
||
resolved_command = status.get("resolved_command") or status.get("command") or "copilot"
|
||
effective_base = status.get("base_url") or pconfig.inference_base_url
|
||
|
||
print(" GitHub Copilot ACP delegates Hermes turns to `copilot --acp`.")
|
||
print(" Hermes currently starts its own ACP subprocess for each request.")
|
||
print(" Hermes uses your selected model as a hint for the Copilot ACP session.")
|
||
print(f" Command: {resolved_command}")
|
||
print(f" Backend marker: {effective_base}")
|
||
print()
|
||
|
||
try:
|
||
creds = resolve_external_process_provider_credentials(provider_id)
|
||
except Exception as exc:
|
||
print(f" ⚠ {exc}")
|
||
print(" Set HERMES_COPILOT_ACP_COMMAND or COPILOT_CLI_PATH if Copilot CLI is installed elsewhere.")
|
||
return
|
||
|
||
effective_base = creds.get("base_url") or effective_base
|
||
|
||
catalog_api_key = ""
|
||
try:
|
||
catalog_creds = resolve_api_key_provider_credentials("copilot")
|
||
catalog_api_key = catalog_creds.get("api_key", "")
|
||
except Exception:
|
||
pass
|
||
|
||
catalog = fetch_github_model_catalog(catalog_api_key)
|
||
normalized_current_model = normalize_copilot_model_id(
|
||
current_model,
|
||
catalog=catalog,
|
||
api_key=catalog_api_key,
|
||
) or current_model
|
||
|
||
if catalog:
|
||
model_list = [item.get("id", "") for item in catalog if item.get("id")]
|
||
print(f" Found {len(model_list)} model(s) from GitHub Copilot")
|
||
else:
|
||
model_list = _PROVIDER_MODELS.get("copilot", [])
|
||
if model_list:
|
||
print(" ⚠ Could not auto-detect models from GitHub Copilot — showing defaults.")
|
||
print(' Use "Enter custom model name" if you do not see your model.')
|
||
|
||
if model_list:
|
||
selected = _prompt_model_selection(
|
||
model_list,
|
||
current_model=normalized_current_model,
|
||
)
|
||
else:
|
||
try:
|
||
selected = input("Model name: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
selected = None
|
||
|
||
if not selected:
|
||
print("No change.")
|
||
return
|
||
|
||
selected = normalize_copilot_model_id(
|
||
selected,
|
||
catalog=catalog,
|
||
api_key=catalog_api_key,
|
||
) or selected
|
||
_save_model_choice(selected)
|
||
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = provider_id
|
||
model["base_url"] = effective_base
|
||
model["api_mode"] = "chat_completions"
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"Default model set to: {selected} (via {pconfig.name})")
|
||
|
||
|
||
def _model_flow_kimi(config, current_model=""):
|
||
"""Kimi / Moonshot model selection with automatic endpoint routing.
|
||
|
||
- sk-kimi-* keys → api.kimi.com/coding/v1 (Kimi Coding Plan)
|
||
- Other keys → api.moonshot.ai/v1 (legacy Moonshot)
|
||
|
||
No manual base URL prompt — endpoint is determined by key prefix.
|
||
"""
|
||
from hermes_cli.auth import (
|
||
PROVIDER_REGISTRY, KIMI_CODE_BASE_URL, _prompt_model_selection,
|
||
_save_model_choice, deactivate_provider,
|
||
)
|
||
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
|
||
|
||
provider_id = "kimi-coding"
|
||
pconfig = PROVIDER_REGISTRY[provider_id]
|
||
key_env = pconfig.api_key_env_vars[0] if pconfig.api_key_env_vars else ""
|
||
base_url_env = pconfig.base_url_env_var or ""
|
||
|
||
# Step 1: Check / prompt for API key
|
||
existing_key = ""
|
||
for ev in pconfig.api_key_env_vars:
|
||
existing_key = get_env_value(ev) or os.getenv(ev, "")
|
||
if existing_key:
|
||
break
|
||
|
||
if not existing_key:
|
||
print(f"No {pconfig.name} API key configured.")
|
||
if key_env:
|
||
try:
|
||
new_key = input(f"{key_env} (or Enter to cancel): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
if not new_key:
|
||
print("Cancelled.")
|
||
return
|
||
save_env_value(key_env, new_key)
|
||
existing_key = new_key
|
||
print("API key saved.")
|
||
print()
|
||
else:
|
||
print(f" {pconfig.name} API key: {existing_key[:8]}... ✓")
|
||
print()
|
||
|
||
# Step 2: Auto-detect endpoint from key prefix
|
||
is_coding_plan = existing_key.startswith("sk-kimi-")
|
||
if is_coding_plan:
|
||
effective_base = KIMI_CODE_BASE_URL
|
||
print(f" Detected Kimi Coding Plan key → {effective_base}")
|
||
else:
|
||
effective_base = pconfig.inference_base_url
|
||
print(f" Using Moonshot endpoint → {effective_base}")
|
||
# Clear any manual base URL override so auto-detection works at runtime
|
||
if base_url_env and get_env_value(base_url_env):
|
||
save_env_value(base_url_env, "")
|
||
print()
|
||
|
||
# Step 3: Model selection — show appropriate models for the endpoint
|
||
if is_coding_plan:
|
||
# Coding Plan models (kimi-for-coding first)
|
||
model_list = [
|
||
"kimi-for-coding",
|
||
"kimi-k2.5",
|
||
"kimi-k2-thinking",
|
||
"kimi-k2-thinking-turbo",
|
||
]
|
||
else:
|
||
# Legacy Moonshot models (excludes Coding Plan-only models)
|
||
model_list = _PROVIDER_MODELS.get("moonshot", [])
|
||
|
||
if model_list:
|
||
selected = _prompt_model_selection(model_list, current_model=current_model)
|
||
else:
|
||
try:
|
||
selected = input("Enter model name: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
selected = None
|
||
|
||
if selected:
|
||
# Clear custom endpoint if set (avoid confusion)
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
|
||
_save_model_choice(selected)
|
||
|
||
# Update config with provider and base URL
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = provider_id
|
||
model["base_url"] = effective_base
|
||
model.pop("api_mode", None) # let runtime auto-detect from URL
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
endpoint_label = "Kimi Coding" if is_coding_plan else "Moonshot"
|
||
print(f"Default model set to: {selected} (via {endpoint_label})")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def _model_flow_api_key_provider(config, provider_id, current_model=""):
|
||
"""Generic flow for API-key providers (z.ai, MiniMax)."""
|
||
from hermes_cli.auth import (
|
||
PROVIDER_REGISTRY, _prompt_model_selection, _save_model_choice,
|
||
deactivate_provider,
|
||
)
|
||
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
|
||
|
||
pconfig = PROVIDER_REGISTRY[provider_id]
|
||
key_env = pconfig.api_key_env_vars[0] if pconfig.api_key_env_vars else ""
|
||
base_url_env = pconfig.base_url_env_var or ""
|
||
|
||
# Check / prompt for API key
|
||
existing_key = ""
|
||
for ev in pconfig.api_key_env_vars:
|
||
existing_key = get_env_value(ev) or os.getenv(ev, "")
|
||
if existing_key:
|
||
break
|
||
|
||
if not existing_key:
|
||
print(f"No {pconfig.name} API key configured.")
|
||
if key_env:
|
||
try:
|
||
new_key = input(f"{key_env} (or Enter to cancel): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
if not new_key:
|
||
print("Cancelled.")
|
||
return
|
||
save_env_value(key_env, new_key)
|
||
print("API key saved.")
|
||
print()
|
||
else:
|
||
print(f" {pconfig.name} API key: {existing_key[:8]}... ✓")
|
||
print()
|
||
|
||
# Optional base URL override
|
||
current_base = ""
|
||
if base_url_env:
|
||
current_base = get_env_value(base_url_env) or os.getenv(base_url_env, "")
|
||
effective_base = current_base or pconfig.inference_base_url
|
||
|
||
try:
|
||
override = input(f"Base URL [{effective_base}]: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
override = ""
|
||
if override and base_url_env:
|
||
save_env_value(base_url_env, override)
|
||
effective_base = override
|
||
|
||
# Model selection — try live /models endpoint first, fall back to defaults.
|
||
# Providers with large live catalogs (100+ models) use a curated list instead
|
||
# so users see familiar model names rather than an overwhelming dump.
|
||
curated = _PROVIDER_MODELS.get(provider_id, [])
|
||
if curated and len(curated) >= 8:
|
||
# Curated list is substantial — use it directly, skip live probe
|
||
live_models = None
|
||
else:
|
||
from hermes_cli.models import fetch_api_models
|
||
api_key_for_probe = existing_key or (get_env_value(key_env) if key_env else "")
|
||
live_models = fetch_api_models(api_key_for_probe, effective_base)
|
||
|
||
if live_models and len(live_models) >= len(curated):
|
||
model_list = live_models
|
||
print(f" Found {len(model_list)} model(s) from {pconfig.name} API")
|
||
else:
|
||
model_list = curated
|
||
if model_list:
|
||
print(f" Showing {len(model_list)} curated models — use \"Enter custom model name\" for others.")
|
||
# else: no defaults either, will fall through to raw input
|
||
|
||
if model_list:
|
||
selected = _prompt_model_selection(model_list, current_model=current_model)
|
||
else:
|
||
try:
|
||
selected = input("Model name: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
selected = None
|
||
|
||
if selected:
|
||
# Clear custom endpoint if set (avoid confusion)
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
|
||
_save_model_choice(selected)
|
||
|
||
# Update config with provider and base URL
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = provider_id
|
||
model["base_url"] = effective_base
|
||
model.pop("api_mode", None) # let runtime auto-detect from URL
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"Default model set to: {selected} (via {pconfig.name})")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def _run_anthropic_oauth_flow(save_env_value):
|
||
"""Run the Claude OAuth setup-token flow. Returns True if credentials were saved."""
|
||
from agent.anthropic_adapter import (
|
||
run_oauth_setup_token,
|
||
read_claude_code_credentials,
|
||
is_claude_code_token_valid,
|
||
)
|
||
from hermes_cli.config import (
|
||
save_anthropic_oauth_token,
|
||
use_anthropic_claude_code_credentials,
|
||
)
|
||
|
||
def _activate_claude_code_credentials_if_available() -> bool:
|
||
try:
|
||
creds = read_claude_code_credentials()
|
||
except Exception:
|
||
creds = None
|
||
if creds and (
|
||
is_claude_code_token_valid(creds)
|
||
or bool(creds.get("refreshToken"))
|
||
):
|
||
use_anthropic_claude_code_credentials(save_fn=save_env_value)
|
||
print(" ✓ Claude Code credentials linked.")
|
||
from hermes_constants import display_hermes_home as _dhh_fn
|
||
print(f" Hermes will use Claude's credential store directly instead of copying a setup-token into {_dhh_fn()}/.env.")
|
||
return True
|
||
return False
|
||
|
||
try:
|
||
print()
|
||
print(" Running 'claude setup-token' — follow the prompts below.")
|
||
print(" A browser window will open for you to authorize access.")
|
||
print()
|
||
token = run_oauth_setup_token()
|
||
if token:
|
||
if _activate_claude_code_credentials_if_available():
|
||
return True
|
||
save_anthropic_oauth_token(token, save_fn=save_env_value)
|
||
print(" ✓ OAuth credentials saved.")
|
||
return True
|
||
|
||
# Subprocess completed but no token auto-detected — ask user to paste
|
||
print()
|
||
print(" If the setup-token was displayed above, paste it here:")
|
||
print()
|
||
try:
|
||
manual_token = input(" Paste setup-token (or Enter to cancel): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return False
|
||
if manual_token:
|
||
save_anthropic_oauth_token(manual_token, save_fn=save_env_value)
|
||
print(" ✓ Setup-token saved.")
|
||
return True
|
||
|
||
print(" ⚠ Could not detect saved credentials.")
|
||
return False
|
||
|
||
except FileNotFoundError:
|
||
# Claude CLI not installed — guide user through manual setup
|
||
print()
|
||
print(" The 'claude' CLI is required for OAuth login.")
|
||
print()
|
||
print(" To install and authenticate:")
|
||
print()
|
||
print(" 1. Install Claude Code: npm install -g @anthropic-ai/claude-code")
|
||
print(" 2. Run: claude setup-token")
|
||
print(" 3. Follow the browser prompts to authorize")
|
||
print(" 4. Re-run: hermes model")
|
||
print()
|
||
print(" Or paste an existing setup-token now (sk-ant-oat-...):")
|
||
print()
|
||
try:
|
||
token = input(" Setup-token (or Enter to cancel): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return False
|
||
if token:
|
||
save_anthropic_oauth_token(token, save_fn=save_env_value)
|
||
print(" ✓ Setup-token saved.")
|
||
return True
|
||
print(" Cancelled — install Claude Code and try again.")
|
||
return False
|
||
|
||
|
||
def _model_flow_anthropic(config, current_model=""):
|
||
"""Flow for Anthropic provider — OAuth subscription, API key, or Claude Code creds."""
|
||
import os
|
||
from hermes_cli.auth import (
|
||
PROVIDER_REGISTRY, _prompt_model_selection, _save_model_choice,
|
||
deactivate_provider,
|
||
)
|
||
from hermes_cli.config import (
|
||
get_env_value, save_env_value, load_config, save_config,
|
||
save_anthropic_api_key,
|
||
)
|
||
from hermes_cli.models import _PROVIDER_MODELS
|
||
|
||
pconfig = PROVIDER_REGISTRY["anthropic"]
|
||
|
||
# Check ALL credential sources
|
||
existing_key = (
|
||
get_env_value("ANTHROPIC_TOKEN")
|
||
or os.getenv("ANTHROPIC_TOKEN", "")
|
||
or get_env_value("ANTHROPIC_API_KEY")
|
||
or os.getenv("ANTHROPIC_API_KEY", "")
|
||
or os.getenv("CLAUDE_CODE_OAUTH_TOKEN", "")
|
||
)
|
||
cc_available = False
|
||
try:
|
||
from agent.anthropic_adapter import read_claude_code_credentials, is_claude_code_token_valid
|
||
cc_creds = read_claude_code_credentials()
|
||
if cc_creds and is_claude_code_token_valid(cc_creds):
|
||
cc_available = True
|
||
except Exception:
|
||
pass
|
||
|
||
has_creds = bool(existing_key) or cc_available
|
||
needs_auth = not has_creds
|
||
|
||
if has_creds:
|
||
# Show what we found
|
||
if existing_key:
|
||
print(f" Anthropic credentials: {existing_key[:12]}... ✓")
|
||
elif cc_available:
|
||
print(" Claude Code credentials: ✓ (auto-detected)")
|
||
print()
|
||
print(" 1. Use existing credentials")
|
||
print(" 2. Reauthenticate (new OAuth login)")
|
||
print(" 3. Cancel")
|
||
print()
|
||
try:
|
||
choice = input(" Choice [1/2/3]: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
choice = "1"
|
||
|
||
if choice == "2":
|
||
needs_auth = True
|
||
elif choice == "3":
|
||
return
|
||
# choice == "1" or default: use existing, proceed to model selection
|
||
|
||
if needs_auth:
|
||
# Show auth method choice
|
||
print()
|
||
print(" Choose authentication method:")
|
||
print()
|
||
print(" 1. Claude Pro/Max subscription (OAuth login)")
|
||
print(" 2. Anthropic API key (pay-per-token)")
|
||
print(" 3. Cancel")
|
||
print()
|
||
try:
|
||
choice = input(" Choice [1/2/3]: ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
|
||
if choice == "1":
|
||
if not _run_anthropic_oauth_flow(save_env_value):
|
||
return
|
||
|
||
elif choice == "2":
|
||
print()
|
||
print(" Get an API key at: https://console.anthropic.com/settings/keys")
|
||
print()
|
||
try:
|
||
api_key = input(" API key (sk-ant-...): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
print()
|
||
return
|
||
if not api_key:
|
||
print(" Cancelled.")
|
||
return
|
||
save_anthropic_api_key(api_key, save_fn=save_env_value)
|
||
print(" ✓ API key saved.")
|
||
|
||
else:
|
||
print(" No change.")
|
||
return
|
||
print()
|
||
|
||
# Model selection
|
||
model_list = _PROVIDER_MODELS.get("anthropic", [])
|
||
if model_list:
|
||
selected = _prompt_model_selection(model_list, current_model=current_model)
|
||
else:
|
||
try:
|
||
selected = input("Model name (e.g., claude-sonnet-4-20250514): ").strip()
|
||
except (KeyboardInterrupt, EOFError):
|
||
selected = None
|
||
|
||
if selected:
|
||
# Clear custom endpoint if set
|
||
if get_env_value("OPENAI_BASE_URL"):
|
||
save_env_value("OPENAI_BASE_URL", "")
|
||
save_env_value("OPENAI_API_KEY", "")
|
||
|
||
_save_model_choice(selected)
|
||
|
||
# Update config with provider — clear base_url since
|
||
# resolve_runtime_provider() always hardcodes Anthropic's URL.
|
||
# Leaving a stale base_url in config can contaminate other
|
||
# providers if the user switches without running 'hermes model'.
|
||
cfg = load_config()
|
||
model = cfg.get("model")
|
||
if not isinstance(model, dict):
|
||
model = {"default": model} if model else {}
|
||
cfg["model"] = model
|
||
model["provider"] = "anthropic"
|
||
model.pop("base_url", None)
|
||
save_config(cfg)
|
||
deactivate_provider()
|
||
|
||
print(f"Default model set to: {selected} (via Anthropic)")
|
||
else:
|
||
print("No change.")
|
||
|
||
|
||
def cmd_login(args):
|
||
"""Authenticate Hermes CLI with a provider."""
|
||
from hermes_cli.auth import login_command
|
||
login_command(args)
|
||
|
||
|
||
def cmd_logout(args):
|
||
"""Clear provider authentication."""
|
||
from hermes_cli.auth import logout_command
|
||
logout_command(args)
|
||
|
||
|
||
def cmd_status(args):
|
||
"""Show status of all components."""
|
||
from hermes_cli.status import show_status
|
||
show_status(args)
|
||
|
||
|
||
def cmd_cron(args):
|
||
"""Cron job management."""
|
||
from hermes_cli.cron import cron_command
|
||
cron_command(args)
|
||
|
||
|
||
def cmd_webhook(args):
|
||
"""Webhook subscription management."""
|
||
from hermes_cli.webhook import webhook_command
|
||
webhook_command(args)
|
||
|
||
|
||
def cmd_doctor(args):
|
||
"""Check configuration and dependencies."""
|
||
from hermes_cli.doctor import run_doctor
|
||
run_doctor(args)
|
||
|
||
|
||
def cmd_config(args):
|
||
"""Configuration management."""
|
||
from hermes_cli.config import config_command
|
||
config_command(args)
|
||
|
||
|
||
def cmd_version(args):
|
||
"""Show version."""
|
||
print(f"Hermes Agent v{__version__} ({__release_date__})")
|
||
print(f"Project: {PROJECT_ROOT}")
|
||
|
||
# Show Python version
|
||
print(f"Python: {sys.version.split()[0]}")
|
||
|
||
# Check for key dependencies
|
||
try:
|
||
import openai
|
||
print(f"OpenAI SDK: {openai.__version__}")
|
||
except ImportError:
|
||
print("OpenAI SDK: Not installed")
|
||
|
||
# Show update status (synchronous — acceptable since user asked for version info)
|
||
try:
|
||
from hermes_cli.banner import check_for_updates
|
||
behind = check_for_updates()
|
||
if behind and behind > 0:
|
||
commits_word = "commit" if behind == 1 else "commits"
|
||
print(f"Update available: {behind} {commits_word} behind — run 'hermes update'")
|
||
elif behind == 0:
|
||
print("Up to date")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def cmd_uninstall(args):
|
||
"""Uninstall Hermes Agent."""
|
||
_require_tty("uninstall")
|
||
from hermes_cli.uninstall import run_uninstall
|
||
run_uninstall(args)
|
||
|
||
|
||
def _clear_bytecode_cache(root: Path) -> int:
|
||
"""Remove all __pycache__ directories under *root*.
|
||
|
||
Stale .pyc files can cause ImportError after code updates when Python
|
||
loads a cached bytecode file that references names that no longer exist
|
||
(or don't yet exist) in the updated source. Clearing them forces Python
|
||
to recompile from the .py source on next import.
|
||
|
||
Returns the number of directories removed.
|
||
"""
|
||
removed = 0
|
||
for dirpath, dirnames, _ in os.walk(root):
|
||
# Skip venv / node_modules / .git entirely
|
||
dirnames[:] = [
|
||
d for d in dirnames
|
||
if d not in ("venv", ".venv", "node_modules", ".git", ".worktrees")
|
||
]
|
||
if os.path.basename(dirpath) == "__pycache__":
|
||
try:
|
||
import shutil as _shutil
|
||
_shutil.rmtree(dirpath)
|
||
removed += 1
|
||
except OSError:
|
||
pass
|
||
dirnames.clear() # nothing left to recurse into
|
||
return removed
|
||
|
||
|
||
def _update_via_zip(args):
|
||
"""Update Hermes Agent by downloading a ZIP archive.
|
||
|
||
Used on Windows when git file I/O is broken (antivirus, NTFS filter
|
||
drivers causing 'Invalid argument' errors on file creation).
|
||
"""
|
||
import shutil
|
||
import tempfile
|
||
import zipfile
|
||
from urllib.request import urlretrieve
|
||
|
||
branch = "main"
|
||
zip_url = f"https://github.com/NousResearch/hermes-agent/archive/refs/heads/{branch}.zip"
|
||
|
||
print("→ Downloading latest version...")
|
||
try:
|
||
tmp_dir = tempfile.mkdtemp(prefix="hermes-update-")
|
||
zip_path = os.path.join(tmp_dir, f"hermes-agent-{branch}.zip")
|
||
urlretrieve(zip_url, zip_path)
|
||
|
||
print("→ Extracting...")
|
||
with zipfile.ZipFile(zip_path, 'r') as zf:
|
||
# Validate paths to prevent zip-slip (path traversal)
|
||
tmp_dir_real = os.path.realpath(tmp_dir)
|
||
for member in zf.infolist():
|
||
member_path = os.path.realpath(os.path.join(tmp_dir, member.filename))
|
||
if not member_path.startswith(tmp_dir_real + os.sep) and member_path != tmp_dir_real:
|
||
raise ValueError(f"Zip-slip detected: {member.filename} escapes extraction directory")
|
||
zf.extractall(tmp_dir)
|
||
|
||
# GitHub ZIPs extract to hermes-agent-<branch>/
|
||
extracted = os.path.join(tmp_dir, f"hermes-agent-{branch}")
|
||
if not os.path.isdir(extracted):
|
||
# Try to find it
|
||
for d in os.listdir(tmp_dir):
|
||
candidate = os.path.join(tmp_dir, d)
|
||
if os.path.isdir(candidate) and d != "__MACOSX":
|
||
extracted = candidate
|
||
break
|
||
|
||
# Copy updated files over existing installation, preserving venv/node_modules/.git
|
||
preserve = {'venv', 'node_modules', '.git', '.env'}
|
||
update_count = 0
|
||
for item in os.listdir(extracted):
|
||
if item in preserve:
|
||
continue
|
||
src = os.path.join(extracted, item)
|
||
dst = os.path.join(str(PROJECT_ROOT), item)
|
||
if os.path.isdir(src):
|
||
if os.path.exists(dst):
|
||
shutil.rmtree(dst)
|
||
shutil.copytree(src, dst)
|
||
else:
|
||
shutil.copy2(src, dst)
|
||
update_count += 1
|
||
|
||
print(f"✓ Updated {update_count} items from ZIP")
|
||
|
||
# Cleanup
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
|
||
except Exception as e:
|
||
print(f"✗ ZIP update failed: {e}")
|
||
sys.exit(1)
|
||
|
||
# Clear stale bytecode after ZIP extraction
|
||
removed = _clear_bytecode_cache(PROJECT_ROOT)
|
||
if removed:
|
||
print(f" ✓ Cleared {removed} stale __pycache__ director{'y' if removed == 1 else 'ies'}")
|
||
|
||
# Reinstall Python dependencies (try .[all] first for optional extras,
|
||
# fall back to . if extras fail — mirrors the install script behavior)
|
||
print("→ Updating Python dependencies...")
|
||
import subprocess
|
||
uv_bin = shutil.which("uv")
|
||
if uv_bin:
|
||
uv_env = {**os.environ, "VIRTUAL_ENV": str(PROJECT_ROOT / "venv")}
|
||
try:
|
||
subprocess.run(
|
||
[uv_bin, "pip", "install", "-e", ".[all]", "--quiet"],
|
||
cwd=PROJECT_ROOT, check=True, env=uv_env,
|
||
)
|
||
except subprocess.CalledProcessError:
|
||
print(" ⚠ Optional extras failed, installing base dependencies...")
|
||
subprocess.run(
|
||
[uv_bin, "pip", "install", "-e", ".", "--quiet"],
|
||
cwd=PROJECT_ROOT, check=True, env=uv_env,
|
||
)
|
||
else:
|
||
# Use sys.executable to explicitly call the venv's pip module,
|
||
# avoiding PEP 668 'externally-managed-environment' errors on Debian/Ubuntu.
|
||
# Some environments lose pip inside the venv; bootstrap it back with
|
||
# ensurepip before trying the editable install.
|
||
pip_cmd = [sys.executable, "-m", "pip"]
|
||
try:
|
||
subprocess.run(pip_cmd + ["--version"], cwd=PROJECT_ROOT, check=True, capture_output=True)
|
||
except subprocess.CalledProcessError:
|
||
subprocess.run(
|
||
[sys.executable, "-m", "ensurepip", "--upgrade", "--default-pip"],
|
||
cwd=PROJECT_ROOT,
|
||
check=True,
|
||
)
|
||
try:
|
||
subprocess.run(pip_cmd + ["install", "-e", ".[all]", "--quiet"], cwd=PROJECT_ROOT, check=True)
|
||
except subprocess.CalledProcessError:
|
||
print(" ⚠ Optional extras failed, installing base dependencies...")
|
||
subprocess.run(pip_cmd + ["install", "-e", ".", "--quiet"], cwd=PROJECT_ROOT, check=True)
|
||
|
||
# Sync skills
|
||
try:
|
||
from tools.skills_sync import sync_skills
|
||
print("→ Syncing bundled skills...")
|
||
result = sync_skills(quiet=True)
|
||
if result["copied"]:
|
||
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
|
||
if result.get("updated"):
|
||
print(f" ↑ {len(result['updated'])} updated: {', '.join(result['updated'])}")
|
||
if result.get("user_modified"):
|
||
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
|
||
if result.get("cleaned"):
|
||
print(f" − {len(result['cleaned'])} removed from manifest")
|
||
if not result["copied"] and not result.get("updated"):
|
||
print(" ✓ Skills are up to date")
|
||
except Exception:
|
||
pass
|
||
|
||
print()
|
||
print("✓ Update complete!")
|
||
|
||
|
||
def _stash_local_changes_if_needed(git_cmd: list[str], cwd: Path) -> Optional[str]:
|
||
status = subprocess.run(
|
||
git_cmd + ["status", "--porcelain"],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
if not status.stdout.strip():
|
||
return None
|
||
|
||
from datetime import datetime, timezone
|
||
|
||
stash_name = datetime.now(timezone.utc).strftime("hermes-update-autostash-%Y%m%d-%H%M%S")
|
||
print("→ Local changes detected — stashing before update...")
|
||
subprocess.run(
|
||
git_cmd + ["stash", "push", "--include-untracked", "-m", stash_name],
|
||
cwd=cwd,
|
||
check=True,
|
||
)
|
||
stash_ref = subprocess.run(
|
||
git_cmd + ["rev-parse", "--verify", "refs/stash"],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
).stdout.strip()
|
||
return stash_ref
|
||
|
||
|
||
|
||
def _resolve_stash_selector(git_cmd: list[str], cwd: Path, stash_ref: str) -> Optional[str]:
|
||
stash_list = subprocess.run(
|
||
git_cmd + ["stash", "list", "--format=%gd %H"],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
for line in stash_list.stdout.splitlines():
|
||
selector, _, commit = line.partition(" ")
|
||
if commit.strip() == stash_ref:
|
||
return selector.strip()
|
||
return None
|
||
|
||
|
||
|
||
def _print_stash_cleanup_guidance(stash_ref: str, stash_selector: Optional[str] = None) -> None:
|
||
print(" Check `git status` first so you don't accidentally reapply the same change twice.")
|
||
print(" Find the saved entry with: git stash list --format='%gd %H %s'")
|
||
if stash_selector:
|
||
print(f" Remove it with: git stash drop {stash_selector}")
|
||
else:
|
||
print(f" Look for commit {stash_ref}, then drop its selector with: git stash drop stash@{{N}}")
|
||
|
||
|
||
|
||
def _restore_stashed_changes(
|
||
git_cmd: list[str],
|
||
cwd: Path,
|
||
stash_ref: str,
|
||
prompt_user: bool = False,
|
||
) -> bool:
|
||
if prompt_user:
|
||
print()
|
||
print("⚠ Local changes were stashed before updating.")
|
||
print(" Restoring them may reapply local customizations onto the updated codebase.")
|
||
print(" Review the result afterward if Hermes behaves unexpectedly.")
|
||
print("Restore local changes now? [Y/n]")
|
||
response = input().strip().lower()
|
||
if response not in ("", "y", "yes"):
|
||
print("Skipped restoring local changes.")
|
||
print("Your changes are still preserved in git stash.")
|
||
print(f"Restore manually with: git stash apply {stash_ref}")
|
||
return False
|
||
|
||
print("→ Restoring local changes...")
|
||
restore = subprocess.run(
|
||
git_cmd + ["stash", "apply", stash_ref],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
# Check for unmerged (conflicted) files — can happen even when returncode is 0
|
||
unmerged = subprocess.run(
|
||
git_cmd + ["diff", "--name-only", "--diff-filter=U"],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
has_conflicts = bool(unmerged.stdout.strip())
|
||
|
||
if restore.returncode != 0 or has_conflicts:
|
||
print("✗ Update pulled new code, but restoring local changes hit conflicts.")
|
||
if restore.stdout.strip():
|
||
print(restore.stdout.strip())
|
||
if restore.stderr.strip():
|
||
print(restore.stderr.strip())
|
||
|
||
# Show which files conflicted
|
||
conflicted_files = unmerged.stdout.strip()
|
||
if conflicted_files:
|
||
print("\nConflicted files:")
|
||
for f in conflicted_files.splitlines():
|
||
print(f" • {f}")
|
||
|
||
print("\nYour stashed changes are preserved — nothing is lost.")
|
||
print(f" Stash ref: {stash_ref}")
|
||
|
||
# Ask before resetting (if interactive)
|
||
do_reset = True
|
||
if prompt_user:
|
||
print("\nReset working tree to clean state so Hermes can run?")
|
||
print(" (You can re-apply your changes later with: git stash apply)")
|
||
print("[Y/n] ", end="", flush=True)
|
||
response = input().strip().lower()
|
||
if response not in ("", "y", "yes"):
|
||
do_reset = False
|
||
|
||
if do_reset:
|
||
subprocess.run(
|
||
git_cmd + ["reset", "--hard", "HEAD"],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
)
|
||
print("Working tree reset to clean state.")
|
||
else:
|
||
print("Working tree left as-is (may have conflict markers).")
|
||
print("Resolve conflicts manually, then run: git stash drop")
|
||
|
||
print(f"Restore your changes with: git stash apply {stash_ref}")
|
||
# In non-interactive mode (gateway /update), don't abort — the code
|
||
# update itself succeeded, only the stash restore had conflicts.
|
||
# Aborting would report the entire update as failed.
|
||
if prompt_user:
|
||
sys.exit(1)
|
||
return False
|
||
|
||
stash_selector = _resolve_stash_selector(git_cmd, cwd, stash_ref)
|
||
if stash_selector is None:
|
||
print("⚠ Local changes were restored, but Hermes couldn't find the stash entry to drop.")
|
||
print(" The stash was left in place. You can remove it manually after checking the result.")
|
||
_print_stash_cleanup_guidance(stash_ref)
|
||
else:
|
||
drop = subprocess.run(
|
||
git_cmd + ["stash", "drop", stash_selector],
|
||
cwd=cwd,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if drop.returncode != 0:
|
||
print("⚠ Local changes were restored, but Hermes couldn't drop the saved stash entry.")
|
||
if drop.stdout.strip():
|
||
print(drop.stdout.strip())
|
||
if drop.stderr.strip():
|
||
print(drop.stderr.strip())
|
||
print(" The stash was left in place. You can remove it manually after checking the result.")
|
||
_print_stash_cleanup_guidance(stash_ref, stash_selector)
|
||
|
||
print("⚠ Local changes were restored on top of the updated codebase.")
|
||
print(" Review `git diff` / `git status` if Hermes behaves unexpectedly.")
|
||
return True
|
||
|
||
def _invalidate_update_cache():
|
||
"""Delete the update-check cache so ``hermes --version`` doesn't
|
||
report a stale "commits behind" count after a successful update."""
|
||
try:
|
||
cache_file = Path(os.getenv(
|
||
"HERMES_HOME", Path.home() / ".hermes"
|
||
)) / ".update_check"
|
||
if cache_file.exists():
|
||
cache_file.unlink()
|
||
except Exception:
|
||
pass
|
||
|
||
def cmd_update(args):
|
||
"""Update Hermes Agent to the latest version."""
|
||
import shutil
|
||
|
||
print("⚕ Updating Hermes Agent...")
|
||
print()
|
||
|
||
# Try git-based update first, fall back to ZIP download on Windows
|
||
# when git file I/O is broken (antivirus, NTFS filter drivers, etc.)
|
||
use_zip_update = False
|
||
git_dir = PROJECT_ROOT / '.git'
|
||
|
||
if not git_dir.exists():
|
||
if sys.platform == "win32":
|
||
use_zip_update = True
|
||
else:
|
||
print("✗ Not a git repository. Please reinstall:")
|
||
print(" curl -fsSL https://raw.githubusercontent.com/NousResearch/hermes-agent/main/scripts/install.sh | bash")
|
||
sys.exit(1)
|
||
|
||
# On Windows, git can fail with "unable to write loose object file: Invalid argument"
|
||
# due to filesystem atomicity issues. Set the recommended workaround.
|
||
if sys.platform == "win32" and git_dir.exists():
|
||
subprocess.run(
|
||
["git", "-c", "windows.appendAtomically=false", "config", "windows.appendAtomically", "false"],
|
||
cwd=PROJECT_ROOT, check=False, capture_output=True
|
||
)
|
||
|
||
if use_zip_update:
|
||
# ZIP-based update for Windows when git is broken
|
||
_update_via_zip(args)
|
||
return
|
||
|
||
# Fetch and pull
|
||
try:
|
||
git_cmd = ["git"]
|
||
if sys.platform == "win32":
|
||
git_cmd = ["git", "-c", "windows.appendAtomically=false"]
|
||
|
||
print("→ Fetching updates...")
|
||
fetch_result = subprocess.run(
|
||
git_cmd + ["fetch", "origin"],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if fetch_result.returncode != 0:
|
||
stderr = fetch_result.stderr.strip()
|
||
if "Could not resolve host" in stderr or "unable to access" in stderr:
|
||
print("✗ Network error — cannot reach the remote repository.")
|
||
print(f" {stderr.splitlines()[0]}" if stderr else "")
|
||
elif "Authentication failed" in stderr or "could not read Username" in stderr:
|
||
print("✗ Authentication failed — check your git credentials or SSH key.")
|
||
else:
|
||
print(f"✗ Failed to fetch updates from origin.")
|
||
if stderr:
|
||
print(f" {stderr.splitlines()[0]}")
|
||
sys.exit(1)
|
||
|
||
# Get current branch (returns literal "HEAD" when detached)
|
||
result = subprocess.run(
|
||
git_cmd + ["rev-parse", "--abbrev-ref", "HEAD"],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
current_branch = result.stdout.strip()
|
||
|
||
# Always update against main
|
||
branch = "main"
|
||
|
||
# If user is on a non-main branch or detached HEAD, switch to main
|
||
if current_branch != "main":
|
||
label = "detached HEAD" if current_branch == "HEAD" else f"branch '{current_branch}'"
|
||
print(f" ⚠ Currently on {label} — switching to main for update...")
|
||
# Stash before checkout so uncommitted work isn't lost
|
||
auto_stash_ref = _stash_local_changes_if_needed(git_cmd, PROJECT_ROOT)
|
||
subprocess.run(
|
||
git_cmd + ["checkout", "main"],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
else:
|
||
auto_stash_ref = _stash_local_changes_if_needed(git_cmd, PROJECT_ROOT)
|
||
|
||
prompt_for_restore = auto_stash_ref is not None and sys.stdin.isatty() and sys.stdout.isatty()
|
||
|
||
# Check if there are updates
|
||
result = subprocess.run(
|
||
git_cmd + ["rev-list", f"HEAD..origin/{branch}", "--count"],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
)
|
||
commit_count = int(result.stdout.strip())
|
||
|
||
if commit_count == 0:
|
||
_invalidate_update_cache()
|
||
# Restore stash and switch back to original branch if we moved
|
||
if auto_stash_ref is not None:
|
||
_restore_stashed_changes(
|
||
git_cmd, PROJECT_ROOT, auto_stash_ref,
|
||
prompt_user=prompt_for_restore,
|
||
)
|
||
if current_branch not in ("main", "HEAD"):
|
||
subprocess.run(
|
||
git_cmd + ["checkout", current_branch],
|
||
cwd=PROJECT_ROOT, capture_output=True, text=True, check=False,
|
||
)
|
||
print("✓ Already up to date!")
|
||
return
|
||
|
||
print(f"→ Found {commit_count} new commit(s)")
|
||
|
||
print("→ Pulling updates...")
|
||
update_succeeded = False
|
||
try:
|
||
pull_result = subprocess.run(
|
||
git_cmd + ["pull", "--ff-only", "origin", branch],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if pull_result.returncode != 0:
|
||
# ff-only failed — local and remote have diverged (e.g. upstream
|
||
# force-pushed or rebase). Since local changes are already
|
||
# stashed, reset to match the remote exactly.
|
||
print(" ⚠ Fast-forward not possible (history diverged), resetting to match remote...")
|
||
reset_result = subprocess.run(
|
||
git_cmd + ["reset", "--hard", f"origin/{branch}"],
|
||
cwd=PROJECT_ROOT,
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if reset_result.returncode != 0:
|
||
print(f"✗ Failed to reset to origin/{branch}.")
|
||
if reset_result.stderr.strip():
|
||
print(f" {reset_result.stderr.strip()}")
|
||
print(" Try manually: git fetch origin && git reset --hard origin/main")
|
||
sys.exit(1)
|
||
update_succeeded = True
|
||
finally:
|
||
if auto_stash_ref is not None:
|
||
# Don't attempt stash restore if the code update itself failed —
|
||
# working tree is in an unknown state.
|
||
if not update_succeeded:
|
||
print(f" ℹ️ Local changes preserved in stash (ref: {auto_stash_ref})")
|
||
print(f" Restore manually with: git stash apply")
|
||
else:
|
||
_restore_stashed_changes(
|
||
git_cmd,
|
||
PROJECT_ROOT,
|
||
auto_stash_ref,
|
||
prompt_user=prompt_for_restore,
|
||
)
|
||
|
||
_invalidate_update_cache()
|
||
|
||
# Clear stale .pyc bytecode cache — prevents ImportError on gateway
|
||
# restart when updated source references names that didn't exist in
|
||
# the old bytecode (e.g. get_hermes_home added to hermes_constants).
|
||
removed = _clear_bytecode_cache(PROJECT_ROOT)
|
||
if removed:
|
||
print(f" ✓ Cleared {removed} stale __pycache__ director{'y' if removed == 1 else 'ies'}")
|
||
|
||
# Reinstall Python dependencies (try .[all] first for optional extras,
|
||
# fall back to . if extras fail — mirrors the install script behavior)
|
||
print("→ Updating Python dependencies...")
|
||
uv_bin = shutil.which("uv")
|
||
if uv_bin:
|
||
uv_env = {**os.environ, "VIRTUAL_ENV": str(PROJECT_ROOT / "venv")}
|
||
try:
|
||
subprocess.run(
|
||
[uv_bin, "pip", "install", "-e", ".[all]", "--quiet"],
|
||
cwd=PROJECT_ROOT, check=True, env=uv_env,
|
||
)
|
||
except subprocess.CalledProcessError:
|
||
print(" ⚠ Optional extras failed, installing base dependencies...")
|
||
subprocess.run(
|
||
[uv_bin, "pip", "install", "-e", ".", "--quiet"],
|
||
cwd=PROJECT_ROOT, check=True, env=uv_env,
|
||
)
|
||
else:
|
||
# Use sys.executable to explicitly call the venv's pip module,
|
||
# avoiding PEP 668 'externally-managed-environment' errors on Debian/Ubuntu.
|
||
# Some environments lose pip inside the venv; bootstrap it back with
|
||
# ensurepip before trying the editable install.
|
||
pip_cmd = [sys.executable, "-m", "pip"]
|
||
try:
|
||
subprocess.run(pip_cmd + ["--version"], cwd=PROJECT_ROOT, check=True, capture_output=True)
|
||
except subprocess.CalledProcessError:
|
||
subprocess.run(
|
||
[sys.executable, "-m", "ensurepip", "--upgrade", "--default-pip"],
|
||
cwd=PROJECT_ROOT,
|
||
check=True,
|
||
)
|
||
try:
|
||
subprocess.run(pip_cmd + ["install", "-e", ".[all]", "--quiet"], cwd=PROJECT_ROOT, check=True)
|
||
except subprocess.CalledProcessError:
|
||
print(" ⚠ Optional extras failed, installing base dependencies...")
|
||
subprocess.run(pip_cmd + ["install", "-e", ".", "--quiet"], cwd=PROJECT_ROOT, check=True)
|
||
|
||
# Check for Node.js deps
|
||
if (PROJECT_ROOT / "package.json").exists():
|
||
import shutil
|
||
if shutil.which("npm"):
|
||
print("→ Updating Node.js dependencies...")
|
||
subprocess.run(["npm", "install", "--silent"], cwd=PROJECT_ROOT, check=False)
|
||
|
||
print()
|
||
print("✓ Code updated!")
|
||
|
||
# After git pull, source files on disk are newer than cached Python
|
||
# modules in this process. Reload hermes_constants so that any lazy
|
||
# import executed below (skills sync, gateway restart) sees new
|
||
# attributes like display_hermes_home() added since the last release.
|
||
try:
|
||
import importlib
|
||
import hermes_constants as _hc
|
||
importlib.reload(_hc)
|
||
except Exception:
|
||
pass # non-fatal — worst case a lazy import fails gracefully
|
||
|
||
# Sync bundled skills (copies new, updates changed, respects user deletions)
|
||
try:
|
||
from tools.skills_sync import sync_skills
|
||
print()
|
||
print("→ Syncing bundled skills...")
|
||
result = sync_skills(quiet=True)
|
||
if result["copied"]:
|
||
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
|
||
if result.get("updated"):
|
||
print(f" ↑ {len(result['updated'])} updated: {', '.join(result['updated'])}")
|
||
if result.get("user_modified"):
|
||
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
|
||
if result.get("cleaned"):
|
||
print(f" − {len(result['cleaned'])} removed from manifest")
|
||
if not result["copied"] and not result.get("updated"):
|
||
print(" ✓ Skills are up to date")
|
||
except Exception as e:
|
||
logger.debug("Skills sync during update failed: %s", e)
|
||
|
||
# Sync bundled skills to all other profiles
|
||
try:
|
||
from hermes_cli.profiles import list_profiles, get_active_profile_name, seed_profile_skills
|
||
active = get_active_profile_name()
|
||
other_profiles = [p for p in list_profiles() if not p.is_default and p.name != active]
|
||
if other_profiles:
|
||
print()
|
||
print("→ Syncing bundled skills to other profiles...")
|
||
for p in other_profiles:
|
||
try:
|
||
r = seed_profile_skills(p.path, quiet=True)
|
||
if r:
|
||
copied = len(r.get("copied", []))
|
||
updated = len(r.get("updated", []))
|
||
modified = len(r.get("user_modified", []))
|
||
parts = []
|
||
if copied: parts.append(f"+{copied} new")
|
||
if updated: parts.append(f"↑{updated} updated")
|
||
if modified: parts.append(f"~{modified} user-modified")
|
||
status = ", ".join(parts) if parts else "up to date"
|
||
else:
|
||
status = "sync failed"
|
||
print(f" {p.name}: {status}")
|
||
except Exception as pe:
|
||
print(f" {p.name}: error ({pe})")
|
||
except Exception:
|
||
pass # profiles module not available or no profiles
|
||
|
||
# Check for config migrations
|
||
print()
|
||
print("→ Checking configuration for new options...")
|
||
|
||
from hermes_cli.config import (
|
||
get_missing_env_vars, get_missing_config_fields,
|
||
check_config_version, migrate_config
|
||
)
|
||
|
||
missing_env = get_missing_env_vars(required_only=True)
|
||
missing_config = get_missing_config_fields()
|
||
current_ver, latest_ver = check_config_version()
|
||
|
||
needs_migration = missing_env or missing_config or current_ver < latest_ver
|
||
|
||
if needs_migration:
|
||
print()
|
||
if missing_env:
|
||
print(f" ⚠️ {len(missing_env)} new required setting(s) need configuration")
|
||
if missing_config:
|
||
print(f" ℹ️ {len(missing_config)} new config option(s) available")
|
||
|
||
print()
|
||
if not (sys.stdin.isatty() and sys.stdout.isatty()):
|
||
print(" ℹ Non-interactive session — skipping config migration prompt.")
|
||
print(" Run 'hermes config migrate' later to apply any new config/env options.")
|
||
response = "n"
|
||
else:
|
||
try:
|
||
response = input("Would you like to configure them now? [Y/n]: ").strip().lower()
|
||
except EOFError:
|
||
response = "n"
|
||
|
||
if response in ('', 'y', 'yes'):
|
||
print()
|
||
results = migrate_config(interactive=True, quiet=False)
|
||
|
||
if results["env_added"] or results["config_added"]:
|
||
print()
|
||
print("✓ Configuration updated!")
|
||
else:
|
||
print()
|
||
print("Skipped. Run 'hermes config migrate' later to configure.")
|
||
else:
|
||
print(" ✓ Configuration is up to date")
|
||
|
||
print()
|
||
print("✓ Update complete!")
|
||
|
||
# Auto-restart gateway if it's running.
|
||
# Uses the PID file (scoped to HERMES_HOME) to find this
|
||
# installation's gateway — safe with multiple installations.
|
||
try:
|
||
from gateway.status import get_running_pid, remove_pid_file
|
||
from hermes_cli.gateway import (
|
||
get_service_name, get_launchd_plist_path, is_macos, is_linux,
|
||
refresh_launchd_plist_if_needed,
|
||
_ensure_user_systemd_env, get_systemd_linger_status,
|
||
)
|
||
import signal as _signal
|
||
|
||
_gw_service_name = get_service_name()
|
||
existing_pid = get_running_pid()
|
||
has_systemd_service = False
|
||
has_launchd_service = False
|
||
|
||
try:
|
||
_ensure_user_systemd_env()
|
||
check = subprocess.run(
|
||
["systemctl", "--user", "is-active", _gw_service_name],
|
||
capture_output=True, text=True, timeout=5,
|
||
)
|
||
has_systemd_service = check.stdout.strip() == "active"
|
||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||
pass
|
||
|
||
# Check for macOS launchd service
|
||
if is_macos():
|
||
try:
|
||
from hermes_cli.gateway import get_launchd_label
|
||
plist_path = get_launchd_plist_path()
|
||
if plist_path.exists():
|
||
check = subprocess.run(
|
||
["launchctl", "list", get_launchd_label()],
|
||
capture_output=True, text=True, timeout=5,
|
||
)
|
||
has_launchd_service = check.returncode == 0
|
||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||
pass
|
||
|
||
if existing_pid or has_systemd_service or has_launchd_service:
|
||
print()
|
||
|
||
# When a service manager is handling the gateway, let it
|
||
# manage the lifecycle — don't manually SIGTERM the PID
|
||
# (launchd KeepAlive would respawn immediately, causing races).
|
||
if has_systemd_service:
|
||
import time as _time
|
||
if existing_pid:
|
||
try:
|
||
os.kill(existing_pid, _signal.SIGTERM)
|
||
print(f"→ Stopped gateway process (PID {existing_pid})")
|
||
except ProcessLookupError:
|
||
pass
|
||
except PermissionError:
|
||
print(f"⚠ Permission denied killing gateway PID {existing_pid}")
|
||
remove_pid_file()
|
||
_time.sleep(1) # Brief pause for port/socket release
|
||
print("→ Restarting gateway service...")
|
||
restart = subprocess.run(
|
||
["systemctl", "--user", "restart", _gw_service_name],
|
||
capture_output=True, text=True, timeout=15,
|
||
)
|
||
if restart.returncode == 0:
|
||
print("✓ Gateway restarted.")
|
||
else:
|
||
print(f"⚠ Gateway restart failed: {restart.stderr.strip()}")
|
||
# Check if linger is the issue
|
||
if is_linux():
|
||
linger_ok, _detail = get_systemd_linger_status()
|
||
if linger_ok is not True:
|
||
import getpass
|
||
_username = getpass.getuser()
|
||
print()
|
||
print(" Linger must be enabled for the gateway user service to function.")
|
||
print(f" Run: sudo loginctl enable-linger {_username}")
|
||
print()
|
||
print(" Then restart the gateway:")
|
||
print(" hermes gateway restart")
|
||
else:
|
||
print(" Try manually: hermes gateway restart")
|
||
elif has_launchd_service:
|
||
# Refresh the plist first (picks up --replace and other
|
||
# changes from the update we just pulled).
|
||
refresh_launchd_plist_if_needed()
|
||
# Explicit stop+start — don't rely on KeepAlive respawn
|
||
# after a manual SIGTERM, which would race with the
|
||
# PID file cleanup.
|
||
print("→ Restarting gateway service...")
|
||
_launchd_label = get_launchd_label()
|
||
stop = subprocess.run(
|
||
["launchctl", "stop", _launchd_label],
|
||
capture_output=True, text=True, timeout=10,
|
||
)
|
||
start = subprocess.run(
|
||
["launchctl", "start", _launchd_label],
|
||
capture_output=True, text=True, timeout=10,
|
||
)
|
||
if start.returncode == 0:
|
||
print("✓ Gateway restarted via launchd.")
|
||
else:
|
||
print(f"⚠ Gateway restart failed: {start.stderr.strip()}")
|
||
print(" Try manually: hermes gateway restart")
|
||
elif existing_pid:
|
||
try:
|
||
os.kill(existing_pid, _signal.SIGTERM)
|
||
print(f"→ Stopped gateway process (PID {existing_pid})")
|
||
except ProcessLookupError:
|
||
pass # Already gone
|
||
except PermissionError:
|
||
print(f"⚠ Permission denied killing gateway PID {existing_pid}")
|
||
remove_pid_file()
|
||
print(" ℹ️ Gateway was running manually (not as a service).")
|
||
print(" Restart it with: hermes gateway run")
|
||
except Exception as e:
|
||
logger.debug("Gateway restart during update failed: %s", e)
|
||
|
||
print()
|
||
print("Tip: You can now select a provider and model:")
|
||
print(" hermes model # Select provider and model")
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
if sys.platform == "win32":
|
||
print(f"⚠ Git update failed: {e}")
|
||
print("→ Falling back to ZIP download...")
|
||
print()
|
||
_update_via_zip(args)
|
||
else:
|
||
print(f"✗ Update failed: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
def _coalesce_session_name_args(argv: list) -> list:
|
||
"""Join unquoted multi-word session names after -c/--continue and -r/--resume.
|
||
|
||
When a user types ``hermes -c Pokemon Agent Dev`` without quoting the
|
||
session name, argparse sees three separate tokens. This function merges
|
||
them into a single argument so argparse receives
|
||
``['-c', 'Pokemon Agent Dev']`` instead.
|
||
|
||
Tokens are collected after the flag until we hit another flag (``-*``)
|
||
or a known top-level subcommand.
|
||
"""
|
||
_SUBCOMMANDS = {
|
||
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
|
||
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
|
||
"mcp", "sessions", "insights", "version", "update", "uninstall",
|
||
"profile",
|
||
}
|
||
_SESSION_FLAGS = {"-c", "--continue", "-r", "--resume"}
|
||
|
||
result = []
|
||
i = 0
|
||
while i < len(argv):
|
||
token = argv[i]
|
||
if token in _SESSION_FLAGS:
|
||
result.append(token)
|
||
i += 1
|
||
# Collect subsequent non-flag, non-subcommand tokens as one name
|
||
parts: list = []
|
||
while i < len(argv) and not argv[i].startswith("-") and argv[i] not in _SUBCOMMANDS:
|
||
parts.append(argv[i])
|
||
i += 1
|
||
if parts:
|
||
result.append(" ".join(parts))
|
||
else:
|
||
result.append(token)
|
||
i += 1
|
||
return result
|
||
|
||
|
||
def cmd_profile(args):
|
||
"""Profile management — create, delete, list, switch, alias."""
|
||
from hermes_cli.profiles import (
|
||
list_profiles, create_profile, delete_profile, seed_profile_skills,
|
||
get_active_profile, set_active_profile, get_active_profile_name,
|
||
check_alias_collision, create_wrapper_script, remove_wrapper_script,
|
||
_is_wrapper_dir_in_path, _get_wrapper_dir,
|
||
)
|
||
from hermes_constants import display_hermes_home
|
||
|
||
action = getattr(args, "profile_action", None)
|
||
|
||
if action is None:
|
||
# Bare `hermes profile` — show current profile status
|
||
profile_name = get_active_profile_name()
|
||
dhh = display_hermes_home()
|
||
print(f"\nActive profile: {profile_name}")
|
||
print(f"Path: {dhh}")
|
||
|
||
profiles = list_profiles()
|
||
for p in profiles:
|
||
if p.name == profile_name or (profile_name == "default" and p.is_default):
|
||
if p.model:
|
||
print(f"Model: {p.model}" + (f" ({p.provider})" if p.provider else ""))
|
||
print(f"Gateway: {'running' if p.gateway_running else 'stopped'}")
|
||
print(f"Skills: {p.skill_count} installed")
|
||
if p.alias_path:
|
||
print(f"Alias: {p.name} → hermes -p {p.name}")
|
||
break
|
||
print()
|
||
return
|
||
|
||
if action == "list":
|
||
profiles = list_profiles()
|
||
active = get_active_profile_name()
|
||
|
||
if not profiles:
|
||
print("No profiles found.")
|
||
return
|
||
|
||
# Header
|
||
print(f"\n {'Profile':<16} {'Model':<28} {'Gateway':<12} {'Alias'}")
|
||
print(f" {'─' * 15} {'─' * 27} {'─' * 11} {'─' * 12}")
|
||
|
||
for p in profiles:
|
||
marker = " ◆" if (p.name == active or (active == "default" and p.is_default)) else " "
|
||
name = p.name
|
||
model = (p.model or "—")[:26]
|
||
gw = "running" if p.gateway_running else "stopped"
|
||
alias = p.name if p.alias_path else "—"
|
||
if p.is_default:
|
||
alias = "—"
|
||
print(f"{marker}{name:<15} {model:<28} {gw:<12} {alias}")
|
||
print()
|
||
|
||
elif action == "use":
|
||
name = args.profile_name
|
||
try:
|
||
set_active_profile(name)
|
||
if name == "default":
|
||
print(f"Switched to: default (~/.hermes)")
|
||
else:
|
||
print(f"Switched to: {name}")
|
||
except (ValueError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
elif action == "create":
|
||
name = args.profile_name
|
||
clone = getattr(args, "clone", False)
|
||
clone_all = getattr(args, "clone_all", False)
|
||
no_alias = getattr(args, "no_alias", False)
|
||
|
||
try:
|
||
clone_from = getattr(args, "clone_from", None)
|
||
|
||
profile_dir = create_profile(
|
||
name=name,
|
||
clone_from=clone_from,
|
||
clone_all=clone_all,
|
||
clone_config=clone,
|
||
no_alias=no_alias,
|
||
)
|
||
print(f"\nProfile '{name}' created at {profile_dir}")
|
||
|
||
if clone or clone_all:
|
||
source_label = getattr(args, "clone_from", None) or get_active_profile_name()
|
||
if clone_all:
|
||
print(f"Full copy from {source_label}.")
|
||
else:
|
||
print(f"Cloned config, .env, SOUL.md from {source_label}.")
|
||
|
||
# Seed bundled skills (skip if --clone-all already copied them)
|
||
if not clone_all:
|
||
result = seed_profile_skills(profile_dir)
|
||
if result:
|
||
copied = len(result.get("copied", []))
|
||
print(f"{copied} bundled skills synced.")
|
||
else:
|
||
print("⚠ Skills could not be seeded. Run `{} update` to retry.".format(name))
|
||
|
||
# Create wrapper alias
|
||
if not no_alias:
|
||
collision = check_alias_collision(name)
|
||
if collision:
|
||
print(f"\n⚠ Cannot create alias '{name}' — {collision}")
|
||
print(f" Choose a custom alias: hermes profile alias {name} --name <custom>")
|
||
print(f" Or access via flag: hermes -p {name} chat")
|
||
else:
|
||
wrapper_path = create_wrapper_script(name)
|
||
if wrapper_path:
|
||
print(f"Wrapper created: {wrapper_path}")
|
||
if not _is_wrapper_dir_in_path():
|
||
print(f"\n⚠ {_get_wrapper_dir()} is not in your PATH.")
|
||
print(f' Add to your shell config (~/.bashrc or ~/.zshrc):')
|
||
print(f' export PATH="$HOME/.local/bin:$PATH"')
|
||
|
||
# Next steps
|
||
print(f"\nNext steps:")
|
||
print(f" {name} setup Configure API keys and model")
|
||
print(f" {name} chat Start chatting")
|
||
print(f" {name} gateway start Start the messaging gateway")
|
||
if clone or clone_all:
|
||
from hermes_constants import get_hermes_home
|
||
profile_dir_display = f"~/.hermes/profiles/{name}"
|
||
print(f"\n Edit {profile_dir_display}/.env for different API keys")
|
||
print(f" Edit {profile_dir_display}/SOUL.md for different personality")
|
||
print()
|
||
|
||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
elif action == "delete":
|
||
name = args.profile_name
|
||
yes = getattr(args, "yes", False)
|
||
try:
|
||
delete_profile(name, yes=yes)
|
||
except (ValueError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
elif action == "show":
|
||
name = args.profile_name
|
||
from hermes_cli.profiles import get_profile_dir, profile_exists, _read_config_model, _check_gateway_running, _count_skills
|
||
if not profile_exists(name):
|
||
print(f"Error: Profile '{name}' does not exist.")
|
||
sys.exit(1)
|
||
profile_dir = get_profile_dir(name)
|
||
model, provider = _read_config_model(profile_dir)
|
||
gw = _check_gateway_running(profile_dir)
|
||
skills = _count_skills(profile_dir)
|
||
wrapper = _get_wrapper_dir() / name
|
||
|
||
print(f"\nProfile: {name}")
|
||
print(f"Path: {profile_dir}")
|
||
if model:
|
||
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
|
||
print(f"Gateway: {'running' if gw else 'stopped'}")
|
||
print(f"Skills: {skills}")
|
||
print(f".env: {'exists' if (profile_dir / '.env').exists() else 'not configured'}")
|
||
print(f"SOUL.md: {'exists' if (profile_dir / 'SOUL.md').exists() else 'not configured'}")
|
||
if wrapper.exists():
|
||
print(f"Alias: {wrapper}")
|
||
print()
|
||
|
||
elif action == "alias":
|
||
name = args.profile_name
|
||
remove = getattr(args, "remove", False)
|
||
custom_name = getattr(args, "alias_name", None)
|
||
|
||
from hermes_cli.profiles import profile_exists
|
||
if not profile_exists(name):
|
||
print(f"Error: Profile '{name}' does not exist.")
|
||
sys.exit(1)
|
||
|
||
alias_name = custom_name or name
|
||
|
||
if remove:
|
||
if remove_wrapper_script(alias_name):
|
||
print(f"✓ Removed alias '{alias_name}'")
|
||
else:
|
||
print(f"No alias '{alias_name}' found to remove.")
|
||
else:
|
||
collision = check_alias_collision(alias_name)
|
||
if collision:
|
||
print(f"Error: {collision}")
|
||
sys.exit(1)
|
||
wrapper_path = create_wrapper_script(alias_name)
|
||
if wrapper_path:
|
||
# If custom name, write the profile name into the wrapper
|
||
if custom_name:
|
||
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
|
||
print(f"✓ Alias created: {wrapper_path}")
|
||
if not _is_wrapper_dir_in_path():
|
||
print(f"⚠ {_get_wrapper_dir()} is not in your PATH.")
|
||
|
||
elif action == "rename":
|
||
from hermes_cli.profiles import rename_profile
|
||
try:
|
||
new_dir = rename_profile(args.old_name, args.new_name)
|
||
print(f"\nProfile renamed: {args.old_name} → {args.new_name}")
|
||
print(f"Path: {new_dir}\n")
|
||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
elif action == "export":
|
||
from hermes_cli.profiles import export_profile
|
||
name = args.profile_name
|
||
output = args.output or f"{name}.tar.gz"
|
||
try:
|
||
result_path = export_profile(name, output)
|
||
print(f"✓ Exported '{name}' to {result_path}")
|
||
except (ValueError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
elif action == "import":
|
||
from hermes_cli.profiles import import_profile
|
||
try:
|
||
profile_dir = import_profile(args.archive, name=getattr(args, "import_name", None))
|
||
name = profile_dir.name
|
||
print(f"✓ Imported profile '{name}' at {profile_dir}")
|
||
|
||
# Offer to create alias
|
||
collision = check_alias_collision(name)
|
||
if not collision:
|
||
wrapper_path = create_wrapper_script(name)
|
||
if wrapper_path:
|
||
print(f" Wrapper created: {wrapper_path}")
|
||
print()
|
||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||
print(f"Error: {e}")
|
||
sys.exit(1)
|
||
|
||
|
||
def cmd_completion(args):
|
||
"""Print shell completion script."""
|
||
from hermes_cli.profiles import generate_bash_completion, generate_zsh_completion
|
||
shell = getattr(args, "shell", "bash")
|
||
if shell == "zsh":
|
||
print(generate_zsh_completion())
|
||
else:
|
||
print(generate_bash_completion())
|
||
|
||
|
||
def main():
|
||
"""Main entry point for hermes CLI."""
|
||
parser = argparse.ArgumentParser(
|
||
prog="hermes",
|
||
description="Hermes Agent - AI assistant with tool-calling capabilities",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
hermes Start interactive chat
|
||
hermes chat -q "Hello" Single query mode
|
||
hermes -c Resume the most recent session
|
||
hermes -c "my project" Resume a session by name (latest in lineage)
|
||
hermes --resume <session_id> Resume a specific session by ID
|
||
hermes setup Run setup wizard
|
||
hermes logout Clear stored authentication
|
||
hermes model Select default model
|
||
hermes config View configuration
|
||
hermes config edit Edit config in $EDITOR
|
||
hermes config set model gpt-4 Set a config value
|
||
hermes gateway Run messaging gateway
|
||
hermes -s hermes-agent-dev,github-auth
|
||
hermes -w Start in isolated git worktree
|
||
hermes gateway install Install gateway background service
|
||
hermes sessions list List past sessions
|
||
hermes sessions browse Interactive session picker
|
||
hermes sessions rename ID T Rename/title a session
|
||
hermes update Update to latest version
|
||
|
||
For more help on a command:
|
||
hermes <command> --help
|
||
"""
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--version", "-V",
|
||
action="store_true",
|
||
help="Show version and exit"
|
||
)
|
||
parser.add_argument(
|
||
"--resume", "-r",
|
||
metavar="SESSION",
|
||
default=None,
|
||
help="Resume a previous session by ID or title"
|
||
)
|
||
parser.add_argument(
|
||
"--continue", "-c",
|
||
dest="continue_last",
|
||
nargs="?",
|
||
const=True,
|
||
default=None,
|
||
metavar="SESSION_NAME",
|
||
help="Resume a session by name, or the most recent if no name given"
|
||
)
|
||
parser.add_argument(
|
||
"--worktree", "-w",
|
||
action="store_true",
|
||
default=False,
|
||
help="Run in an isolated git worktree (for parallel agents)"
|
||
)
|
||
parser.add_argument(
|
||
"--skills", "-s",
|
||
action="append",
|
||
default=None,
|
||
help="Preload one or more skills for the session (repeat flag or comma-separate)"
|
||
)
|
||
parser.add_argument(
|
||
"--yolo",
|
||
action="store_true",
|
||
default=False,
|
||
help="Bypass all dangerous command approval prompts (use at your own risk)"
|
||
)
|
||
parser.add_argument(
|
||
"--pass-session-id",
|
||
action="store_true",
|
||
default=False,
|
||
help="Include the session ID in the agent's system prompt"
|
||
)
|
||
|
||
subparsers = parser.add_subparsers(dest="command", help="Command to run")
|
||
|
||
# =========================================================================
|
||
# chat command
|
||
# =========================================================================
|
||
chat_parser = subparsers.add_parser(
|
||
"chat",
|
||
help="Interactive chat with the agent",
|
||
description="Start an interactive chat session with Hermes Agent"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-q", "--query",
|
||
help="Single query (non-interactive mode)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-m", "--model",
|
||
help="Model to use (e.g., anthropic/claude-sonnet-4)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-t", "--toolsets",
|
||
help="Comma-separated toolsets to enable"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-s", "--skills",
|
||
action="append",
|
||
default=None,
|
||
help="Preload one or more skills for the session (repeat flag or comma-separate)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--provider",
|
||
choices=["auto", "openrouter", "nous", "openai-codex", "copilot-acp", "copilot", "anthropic", "huggingface", "zai", "kimi-coding", "minimax", "minimax-cn", "kilocode"],
|
||
default=None,
|
||
help="Inference provider (default: auto)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-v", "--verbose",
|
||
action="store_true",
|
||
help="Verbose output"
|
||
)
|
||
chat_parser.add_argument(
|
||
"-Q", "--quiet",
|
||
action="store_true",
|
||
help="Quiet mode for programmatic use: suppress banner, spinner, and tool previews. Only output the final response and session info."
|
||
)
|
||
chat_parser.add_argument(
|
||
"--resume", "-r",
|
||
metavar="SESSION_ID",
|
||
help="Resume a previous session by ID (shown on exit)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--continue", "-c",
|
||
dest="continue_last",
|
||
nargs="?",
|
||
const=True,
|
||
default=None,
|
||
metavar="SESSION_NAME",
|
||
help="Resume a session by name, or the most recent if no name given"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--worktree", "-w",
|
||
action="store_true",
|
||
default=False,
|
||
help="Run in an isolated git worktree (for parallel agents on the same repo)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--checkpoints",
|
||
action="store_true",
|
||
default=False,
|
||
help="Enable filesystem checkpoints before destructive file operations (use /rollback to restore)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--yolo",
|
||
action="store_true",
|
||
default=False,
|
||
help="Bypass all dangerous command approval prompts (use at your own risk)"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--pass-session-id",
|
||
action="store_true",
|
||
default=False,
|
||
help="Include the session ID in the agent's system prompt"
|
||
)
|
||
chat_parser.add_argument(
|
||
"--source",
|
||
default=None,
|
||
help="Session source tag for filtering (default: cli). Use 'tool' for third-party integrations that should not appear in user session lists."
|
||
)
|
||
chat_parser.set_defaults(func=cmd_chat)
|
||
|
||
# =========================================================================
|
||
# model command
|
||
# =========================================================================
|
||
model_parser = subparsers.add_parser(
|
||
"model",
|
||
help="Select default model and provider",
|
||
description="Interactively select your inference provider and default model"
|
||
)
|
||
model_parser.set_defaults(func=cmd_model)
|
||
|
||
# =========================================================================
|
||
# gateway command
|
||
# =========================================================================
|
||
gateway_parser = subparsers.add_parser(
|
||
"gateway",
|
||
help="Messaging gateway management",
|
||
description="Manage the messaging gateway (Telegram, Discord, WhatsApp)"
|
||
)
|
||
gateway_subparsers = gateway_parser.add_subparsers(dest="gateway_command")
|
||
|
||
# gateway run (default)
|
||
gateway_run = gateway_subparsers.add_parser("run", help="Run gateway in foreground")
|
||
gateway_run.add_argument("-v", "--verbose", action="store_true")
|
||
gateway_run.add_argument("--replace", action="store_true",
|
||
help="Replace any existing gateway instance (useful for systemd)")
|
||
|
||
# gateway start
|
||
gateway_start = gateway_subparsers.add_parser("start", help="Start gateway service")
|
||
gateway_start.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
|
||
|
||
# gateway stop
|
||
gateway_stop = gateway_subparsers.add_parser("stop", help="Stop gateway service")
|
||
gateway_stop.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
|
||
|
||
# gateway restart
|
||
gateway_restart = gateway_subparsers.add_parser("restart", help="Restart gateway service")
|
||
gateway_restart.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
|
||
|
||
# gateway status
|
||
gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status")
|
||
gateway_status.add_argument("--deep", action="store_true", help="Deep status check")
|
||
gateway_status.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
|
||
|
||
# gateway install
|
||
gateway_install = gateway_subparsers.add_parser("install", help="Install gateway as service")
|
||
gateway_install.add_argument("--force", action="store_true", help="Force reinstall")
|
||
gateway_install.add_argument("--system", action="store_true", help="Install as a Linux system-level service (starts at boot)")
|
||
gateway_install.add_argument("--run-as-user", dest="run_as_user", help="User account the Linux system service should run as")
|
||
|
||
# gateway uninstall
|
||
gateway_uninstall = gateway_subparsers.add_parser("uninstall", help="Uninstall gateway service")
|
||
gateway_uninstall.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
|
||
|
||
# gateway setup
|
||
gateway_setup = gateway_subparsers.add_parser("setup", help="Configure messaging platforms")
|
||
|
||
gateway_parser.set_defaults(func=cmd_gateway)
|
||
|
||
# =========================================================================
|
||
# setup command
|
||
# =========================================================================
|
||
setup_parser = subparsers.add_parser(
|
||
"setup",
|
||
help="Interactive setup wizard",
|
||
description="Configure Hermes Agent with an interactive wizard. "
|
||
"Run a specific section: hermes setup model|terminal|gateway|tools|agent"
|
||
)
|
||
setup_parser.add_argument(
|
||
"section",
|
||
nargs="?",
|
||
choices=["model", "terminal", "gateway", "tools", "agent"],
|
||
default=None,
|
||
help="Run a specific setup section instead of the full wizard"
|
||
)
|
||
setup_parser.add_argument(
|
||
"--non-interactive",
|
||
action="store_true",
|
||
help="Non-interactive mode (use defaults/env vars)"
|
||
)
|
||
setup_parser.add_argument(
|
||
"--reset",
|
||
action="store_true",
|
||
help="Reset configuration to defaults"
|
||
)
|
||
setup_parser.set_defaults(func=cmd_setup)
|
||
|
||
# =========================================================================
|
||
# whatsapp command
|
||
# =========================================================================
|
||
whatsapp_parser = subparsers.add_parser(
|
||
"whatsapp",
|
||
help="Set up WhatsApp integration",
|
||
description="Configure WhatsApp and pair via QR code"
|
||
)
|
||
whatsapp_parser.set_defaults(func=cmd_whatsapp)
|
||
|
||
# =========================================================================
|
||
# login command
|
||
# =========================================================================
|
||
login_parser = subparsers.add_parser(
|
||
"login",
|
||
help="Authenticate with an inference provider",
|
||
description="Run OAuth device authorization flow for Hermes CLI"
|
||
)
|
||
login_parser.add_argument(
|
||
"--provider",
|
||
choices=["nous", "openai-codex"],
|
||
default=None,
|
||
help="Provider to authenticate with (default: nous)"
|
||
)
|
||
login_parser.add_argument(
|
||
"--portal-url",
|
||
help="Portal base URL (default: production portal)"
|
||
)
|
||
login_parser.add_argument(
|
||
"--inference-url",
|
||
help="Inference API base URL (default: production inference API)"
|
||
)
|
||
login_parser.add_argument(
|
||
"--client-id",
|
||
default=None,
|
||
help="OAuth client id to use (default: hermes-cli)"
|
||
)
|
||
login_parser.add_argument(
|
||
"--scope",
|
||
default=None,
|
||
help="OAuth scope to request"
|
||
)
|
||
login_parser.add_argument(
|
||
"--no-browser",
|
||
action="store_true",
|
||
help="Do not attempt to open the browser automatically"
|
||
)
|
||
login_parser.add_argument(
|
||
"--timeout",
|
||
type=float,
|
||
default=15.0,
|
||
help="HTTP request timeout in seconds (default: 15)"
|
||
)
|
||
login_parser.add_argument(
|
||
"--ca-bundle",
|
||
help="Path to CA bundle PEM file for TLS verification"
|
||
)
|
||
login_parser.add_argument(
|
||
"--insecure",
|
||
action="store_true",
|
||
help="Disable TLS verification (testing only)"
|
||
)
|
||
login_parser.set_defaults(func=cmd_login)
|
||
|
||
# =========================================================================
|
||
# logout command
|
||
# =========================================================================
|
||
logout_parser = subparsers.add_parser(
|
||
"logout",
|
||
help="Clear authentication for an inference provider",
|
||
description="Remove stored credentials and reset provider config"
|
||
)
|
||
logout_parser.add_argument(
|
||
"--provider",
|
||
choices=["nous", "openai-codex"],
|
||
default=None,
|
||
help="Provider to log out from (default: active provider)"
|
||
)
|
||
logout_parser.set_defaults(func=cmd_logout)
|
||
|
||
# =========================================================================
|
||
# status command
|
||
# =========================================================================
|
||
status_parser = subparsers.add_parser(
|
||
"status",
|
||
help="Show status of all components",
|
||
description="Display status of Hermes Agent components"
|
||
)
|
||
status_parser.add_argument(
|
||
"--all",
|
||
action="store_true",
|
||
help="Show all details (redacted for sharing)"
|
||
)
|
||
status_parser.add_argument(
|
||
"--deep",
|
||
action="store_true",
|
||
help="Run deep checks (may take longer)"
|
||
)
|
||
status_parser.set_defaults(func=cmd_status)
|
||
|
||
# =========================================================================
|
||
# cron command
|
||
# =========================================================================
|
||
cron_parser = subparsers.add_parser(
|
||
"cron",
|
||
help="Cron job management",
|
||
description="Manage scheduled tasks"
|
||
)
|
||
cron_subparsers = cron_parser.add_subparsers(dest="cron_command")
|
||
|
||
# cron list
|
||
cron_list = cron_subparsers.add_parser("list", help="List scheduled jobs")
|
||
cron_list.add_argument("--all", action="store_true", help="Include disabled jobs")
|
||
|
||
# cron create/add
|
||
cron_create = cron_subparsers.add_parser("create", aliases=["add"], help="Create a scheduled job")
|
||
cron_create.add_argument("schedule", help="Schedule like '30m', 'every 2h', or '0 9 * * *'")
|
||
cron_create.add_argument("prompt", nargs="?", help="Optional self-contained prompt or task instruction")
|
||
cron_create.add_argument("--name", help="Optional human-friendly job name")
|
||
cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id")
|
||
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
|
||
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
|
||
|
||
# cron edit
|
||
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
|
||
cron_edit.add_argument("job_id", help="Job ID to edit")
|
||
cron_edit.add_argument("--schedule", help="New schedule")
|
||
cron_edit.add_argument("--prompt", help="New prompt/task instruction")
|
||
cron_edit.add_argument("--name", help="New job name")
|
||
cron_edit.add_argument("--deliver", help="New delivery target")
|
||
cron_edit.add_argument("--repeat", type=int, help="New repeat count")
|
||
cron_edit.add_argument("--skill", dest="skills", action="append", help="Replace the job's skills with this set. Repeat to attach multiple skills.")
|
||
cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.")
|
||
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
|
||
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
|
||
|
||
# lifecycle actions
|
||
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")
|
||
cron_pause.add_argument("job_id", help="Job ID to pause")
|
||
|
||
cron_resume = cron_subparsers.add_parser("resume", help="Resume a paused job")
|
||
cron_resume.add_argument("job_id", help="Job ID to resume")
|
||
|
||
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
|
||
cron_run.add_argument("job_id", help="Job ID to trigger")
|
||
|
||
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
|
||
cron_remove.add_argument("job_id", help="Job ID to remove")
|
||
|
||
# cron status
|
||
cron_subparsers.add_parser("status", help="Check if cron scheduler is running")
|
||
|
||
# cron tick (mostly for debugging)
|
||
cron_subparsers.add_parser("tick", help="Run due jobs once and exit")
|
||
|
||
cron_parser.set_defaults(func=cmd_cron)
|
||
|
||
# =========================================================================
|
||
# webhook command
|
||
# =========================================================================
|
||
webhook_parser = subparsers.add_parser(
|
||
"webhook",
|
||
help="Manage dynamic webhook subscriptions",
|
||
description="Create, list, and remove webhook subscriptions for event-driven agent activation",
|
||
)
|
||
webhook_subparsers = webhook_parser.add_subparsers(dest="webhook_action")
|
||
|
||
wh_sub = webhook_subparsers.add_parser("subscribe", aliases=["add"], help="Create a webhook subscription")
|
||
wh_sub.add_argument("name", help="Route name (used in URL: /webhooks/<name>)")
|
||
wh_sub.add_argument("--prompt", default="", help="Prompt template with {dot.notation} payload refs")
|
||
wh_sub.add_argument("--events", default="", help="Comma-separated event types to accept")
|
||
wh_sub.add_argument("--description", default="", help="What this subscription does")
|
||
wh_sub.add_argument("--skills", default="", help="Comma-separated skill names to load")
|
||
wh_sub.add_argument("--deliver", default="log", help="Delivery target: log, telegram, discord, slack, etc.")
|
||
wh_sub.add_argument("--deliver-chat-id", default="", help="Target chat ID for cross-platform delivery")
|
||
wh_sub.add_argument("--secret", default="", help="HMAC secret (auto-generated if omitted)")
|
||
|
||
webhook_subparsers.add_parser("list", aliases=["ls"], help="List all dynamic subscriptions")
|
||
|
||
wh_rm = webhook_subparsers.add_parser("remove", aliases=["rm"], help="Remove a subscription")
|
||
wh_rm.add_argument("name", help="Subscription name to remove")
|
||
|
||
wh_test = webhook_subparsers.add_parser("test", help="Send a test POST to a webhook route")
|
||
wh_test.add_argument("name", help="Subscription name to test")
|
||
wh_test.add_argument("--payload", default="", help="JSON payload to send (default: test payload)")
|
||
|
||
webhook_parser.set_defaults(func=cmd_webhook)
|
||
|
||
# =========================================================================
|
||
# doctor command
|
||
# =========================================================================
|
||
doctor_parser = subparsers.add_parser(
|
||
"doctor",
|
||
help="Check configuration and dependencies",
|
||
description="Diagnose issues with Hermes Agent setup"
|
||
)
|
||
doctor_parser.add_argument(
|
||
"--fix",
|
||
action="store_true",
|
||
help="Attempt to fix issues automatically"
|
||
)
|
||
doctor_parser.set_defaults(func=cmd_doctor)
|
||
|
||
# =========================================================================
|
||
# config command
|
||
# =========================================================================
|
||
config_parser = subparsers.add_parser(
|
||
"config",
|
||
help="View and edit configuration",
|
||
description="Manage Hermes Agent configuration"
|
||
)
|
||
config_subparsers = config_parser.add_subparsers(dest="config_command")
|
||
|
||
# config show (default)
|
||
config_show = config_subparsers.add_parser("show", help="Show current configuration")
|
||
|
||
# config edit
|
||
config_edit = config_subparsers.add_parser("edit", help="Open config file in editor")
|
||
|
||
# config set
|
||
config_set = config_subparsers.add_parser("set", help="Set a configuration value")
|
||
config_set.add_argument("key", nargs="?", help="Configuration key (e.g., model, terminal.backend)")
|
||
config_set.add_argument("value", nargs="?", help="Value to set")
|
||
|
||
# config path
|
||
config_path = config_subparsers.add_parser("path", help="Print config file path")
|
||
|
||
# config env-path
|
||
config_env = config_subparsers.add_parser("env-path", help="Print .env file path")
|
||
|
||
# config check
|
||
config_check = config_subparsers.add_parser("check", help="Check for missing/outdated config")
|
||
|
||
# config migrate
|
||
config_migrate = config_subparsers.add_parser("migrate", help="Update config with new options")
|
||
|
||
config_parser.set_defaults(func=cmd_config)
|
||
|
||
# =========================================================================
|
||
# pairing command
|
||
# =========================================================================
|
||
pairing_parser = subparsers.add_parser(
|
||
"pairing",
|
||
help="Manage DM pairing codes for user authorization",
|
||
description="Approve or revoke user access via pairing codes"
|
||
)
|
||
pairing_sub = pairing_parser.add_subparsers(dest="pairing_action")
|
||
|
||
pairing_list_parser = pairing_sub.add_parser("list", help="Show pending + approved users")
|
||
|
||
pairing_approve_parser = pairing_sub.add_parser("approve", help="Approve a pairing code")
|
||
pairing_approve_parser.add_argument("platform", help="Platform name (telegram, discord, slack, whatsapp)")
|
||
pairing_approve_parser.add_argument("code", help="Pairing code to approve")
|
||
|
||
pairing_revoke_parser = pairing_sub.add_parser("revoke", help="Revoke user access")
|
||
pairing_revoke_parser.add_argument("platform", help="Platform name")
|
||
pairing_revoke_parser.add_argument("user_id", help="User ID to revoke")
|
||
|
||
pairing_clear_parser = pairing_sub.add_parser("clear-pending", help="Clear all pending codes")
|
||
|
||
def cmd_pairing(args):
|
||
from hermes_cli.pairing import pairing_command
|
||
pairing_command(args)
|
||
|
||
pairing_parser.set_defaults(func=cmd_pairing)
|
||
|
||
# =========================================================================
|
||
# skills command
|
||
# =========================================================================
|
||
skills_parser = subparsers.add_parser(
|
||
"skills",
|
||
help="Search, install, configure, and manage skills",
|
||
description="Search, install, inspect, audit, configure, and manage skills from skills.sh, well-known agent skill endpoints, GitHub, ClawHub, and other registries."
|
||
)
|
||
skills_subparsers = skills_parser.add_subparsers(dest="skills_action")
|
||
|
||
skills_browse = skills_subparsers.add_parser("browse", help="Browse all available skills (paginated)")
|
||
skills_browse.add_argument("--page", type=int, default=1, help="Page number (default: 1)")
|
||
skills_browse.add_argument("--size", type=int, default=20, help="Results per page (default: 20)")
|
||
skills_browse.add_argument("--source", default="all",
|
||
choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"],
|
||
help="Filter by source (default: all)")
|
||
|
||
skills_search = skills_subparsers.add_parser("search", help="Search skill registries")
|
||
skills_search.add_argument("query", help="Search query")
|
||
skills_search.add_argument("--source", default="all", choices=["all", "official", "skills-sh", "well-known", "github", "clawhub", "lobehub"])
|
||
skills_search.add_argument("--limit", type=int, default=10, help="Max results")
|
||
|
||
skills_install = skills_subparsers.add_parser("install", help="Install a skill")
|
||
skills_install.add_argument("identifier", help="Skill identifier (e.g. openai/skills/skill-creator)")
|
||
skills_install.add_argument("--category", default="", help="Category folder to install into")
|
||
skills_install.add_argument("--force", action="store_true", help="Install despite blocked scan verdict")
|
||
skills_install.add_argument("--yes", "-y", action="store_true", help="Skip confirmation prompt (needed in TUI mode)")
|
||
|
||
skills_inspect = skills_subparsers.add_parser("inspect", help="Preview a skill without installing")
|
||
skills_inspect.add_argument("identifier", help="Skill identifier")
|
||
|
||
skills_list = skills_subparsers.add_parser("list", help="List installed skills")
|
||
skills_list.add_argument("--source", default="all", choices=["all", "hub", "builtin", "local"])
|
||
|
||
skills_check = skills_subparsers.add_parser("check", help="Check installed hub skills for updates")
|
||
skills_check.add_argument("name", nargs="?", help="Specific skill to check (default: all)")
|
||
|
||
skills_update = skills_subparsers.add_parser("update", help="Update installed hub skills")
|
||
skills_update.add_argument("name", nargs="?", help="Specific skill to update (default: all outdated skills)")
|
||
|
||
skills_audit = skills_subparsers.add_parser("audit", help="Re-scan installed hub skills")
|
||
skills_audit.add_argument("name", nargs="?", help="Specific skill to audit (default: all)")
|
||
|
||
skills_uninstall = skills_subparsers.add_parser("uninstall", help="Remove a hub-installed skill")
|
||
skills_uninstall.add_argument("name", help="Skill name to remove")
|
||
|
||
skills_publish = skills_subparsers.add_parser("publish", help="Publish a skill to a registry")
|
||
skills_publish.add_argument("skill_path", help="Path to skill directory")
|
||
skills_publish.add_argument("--to", default="github", choices=["github", "clawhub"], help="Target registry")
|
||
skills_publish.add_argument("--repo", default="", help="Target GitHub repo (e.g. openai/skills)")
|
||
|
||
skills_snapshot = skills_subparsers.add_parser("snapshot", help="Export/import skill configurations")
|
||
snapshot_subparsers = skills_snapshot.add_subparsers(dest="snapshot_action")
|
||
snap_export = snapshot_subparsers.add_parser("export", help="Export installed skills to a file")
|
||
snap_export.add_argument("output", help="Output JSON file path (use - for stdout)")
|
||
snap_import = snapshot_subparsers.add_parser("import", help="Import and install skills from a file")
|
||
snap_import.add_argument("input", help="Input JSON file path")
|
||
snap_import.add_argument("--force", action="store_true", help="Force install despite caution verdict")
|
||
|
||
skills_tap = skills_subparsers.add_parser("tap", help="Manage skill sources")
|
||
tap_subparsers = skills_tap.add_subparsers(dest="tap_action")
|
||
tap_subparsers.add_parser("list", help="List configured taps")
|
||
tap_add = tap_subparsers.add_parser("add", help="Add a GitHub repo as skill source")
|
||
tap_add.add_argument("repo", help="GitHub repo (e.g. owner/repo)")
|
||
tap_rm = tap_subparsers.add_parser("remove", help="Remove a tap")
|
||
tap_rm.add_argument("name", help="Tap name to remove")
|
||
|
||
# config sub-action: interactive enable/disable
|
||
skills_subparsers.add_parser("config", help="Interactive skill configuration — enable/disable individual skills")
|
||
|
||
def cmd_skills(args):
|
||
# Route 'config' action to skills_config module
|
||
if getattr(args, 'skills_action', None) == 'config':
|
||
_require_tty("skills config")
|
||
from hermes_cli.skills_config import skills_command as skills_config_command
|
||
skills_config_command(args)
|
||
else:
|
||
from hermes_cli.skills_hub import skills_command
|
||
skills_command(args)
|
||
|
||
skills_parser.set_defaults(func=cmd_skills)
|
||
|
||
# =========================================================================
|
||
# plugins command
|
||
# =========================================================================
|
||
plugins_parser = subparsers.add_parser(
|
||
"plugins",
|
||
help="Manage plugins — install, update, remove, list",
|
||
description="Install plugins from Git repositories, update, remove, or list them.",
|
||
)
|
||
plugins_subparsers = plugins_parser.add_subparsers(dest="plugins_action")
|
||
|
||
plugins_install = plugins_subparsers.add_parser(
|
||
"install", help="Install a plugin from a Git URL or owner/repo"
|
||
)
|
||
plugins_install.add_argument(
|
||
"identifier",
|
||
help="Git URL or owner/repo shorthand (e.g. anpicasso/hermes-plugin-chrome-profiles)",
|
||
)
|
||
plugins_install.add_argument(
|
||
"--force", "-f", action="store_true",
|
||
help="Remove existing plugin and reinstall",
|
||
)
|
||
|
||
plugins_update = plugins_subparsers.add_parser(
|
||
"update", help="Pull latest changes for an installed plugin"
|
||
)
|
||
plugins_update.add_argument("name", help="Plugin name to update")
|
||
|
||
plugins_remove = plugins_subparsers.add_parser(
|
||
"remove", aliases=["rm", "uninstall"], help="Remove an installed plugin"
|
||
)
|
||
plugins_remove.add_argument("name", help="Plugin directory name to remove")
|
||
|
||
plugins_subparsers.add_parser("list", aliases=["ls"], help="List installed plugins")
|
||
|
||
plugins_enable = plugins_subparsers.add_parser(
|
||
"enable", help="Enable a disabled plugin"
|
||
)
|
||
plugins_enable.add_argument("name", help="Plugin name to enable")
|
||
|
||
plugins_disable = plugins_subparsers.add_parser(
|
||
"disable", help="Disable a plugin without removing it"
|
||
)
|
||
plugins_disable.add_argument("name", help="Plugin name to disable")
|
||
|
||
def cmd_plugins(args):
|
||
from hermes_cli.plugins_cmd import plugins_command
|
||
plugins_command(args)
|
||
|
||
plugins_parser.set_defaults(func=cmd_plugins)
|
||
|
||
# =========================================================================
|
||
# honcho command
|
||
# =========================================================================
|
||
honcho_parser = subparsers.add_parser(
|
||
"honcho",
|
||
help="Manage Honcho AI memory integration",
|
||
description=(
|
||
"Honcho is a memory layer that persists across sessions.\n\n"
|
||
"Each conversation is stored as a peer interaction in a workspace. "
|
||
"Honcho builds a representation of the user over time — conclusions, "
|
||
"patterns, context — and surfaces the relevant slice at the start of "
|
||
"each turn so Hermes knows who you are without you having to repeat yourself.\n\n"
|
||
"Modes: hybrid (Honcho + local MEMORY.md), honcho (Honcho only), "
|
||
"local (MEMORY.md only). Write frequency is configurable so memory "
|
||
"writes never block the response."
|
||
),
|
||
formatter_class=__import__("argparse").RawDescriptionHelpFormatter,
|
||
)
|
||
honcho_subparsers = honcho_parser.add_subparsers(dest="honcho_command")
|
||
|
||
honcho_subparsers.add_parser("setup", help="Interactive setup wizard for Honcho integration")
|
||
honcho_subparsers.add_parser("status", help="Show current Honcho config and connection status")
|
||
honcho_subparsers.add_parser("sessions", help="List known Honcho session mappings")
|
||
|
||
honcho_map = honcho_subparsers.add_parser(
|
||
"map", help="Map current directory to a Honcho session name (no arg = list mappings)"
|
||
)
|
||
honcho_map.add_argument(
|
||
"session_name", nargs="?", default=None,
|
||
help="Session name to associate with this directory. Omit to list current mappings.",
|
||
)
|
||
|
||
honcho_peer = honcho_subparsers.add_parser(
|
||
"peer", help="Show or update peer names and dialectic reasoning level"
|
||
)
|
||
honcho_peer.add_argument("--user", metavar="NAME", help="Set user peer name")
|
||
honcho_peer.add_argument("--ai", metavar="NAME", help="Set AI peer name")
|
||
honcho_peer.add_argument(
|
||
"--reasoning",
|
||
metavar="LEVEL",
|
||
choices=("minimal", "low", "medium", "high", "max"),
|
||
help="Set default dialectic reasoning level (minimal/low/medium/high/max)",
|
||
)
|
||
|
||
honcho_mode = honcho_subparsers.add_parser(
|
||
"mode", help="Show or set memory mode (hybrid/honcho/local)"
|
||
)
|
||
honcho_mode.add_argument(
|
||
"mode", nargs="?", metavar="MODE",
|
||
choices=("hybrid", "honcho", "local"),
|
||
help="Memory mode to set (hybrid/honcho/local). Omit to show current.",
|
||
)
|
||
|
||
honcho_tokens = honcho_subparsers.add_parser(
|
||
"tokens", help="Show or set token budget for context and dialectic"
|
||
)
|
||
honcho_tokens.add_argument(
|
||
"--context", type=int, metavar="N",
|
||
help="Max tokens Honcho returns from session.context() per turn",
|
||
)
|
||
honcho_tokens.add_argument(
|
||
"--dialectic", type=int, metavar="N",
|
||
help="Max chars of dialectic result to inject into system prompt",
|
||
)
|
||
|
||
honcho_identity = honcho_subparsers.add_parser(
|
||
"identity", help="Seed or show the AI peer's Honcho identity representation"
|
||
)
|
||
honcho_identity.add_argument(
|
||
"file", nargs="?", default=None,
|
||
help="Path to file to seed from (e.g. SOUL.md). Omit to show usage.",
|
||
)
|
||
honcho_identity.add_argument(
|
||
"--show", action="store_true",
|
||
help="Show current AI peer representation from Honcho",
|
||
)
|
||
|
||
honcho_subparsers.add_parser(
|
||
"migrate",
|
||
help="Step-by-step migration guide from openclaw-honcho to Hermes Honcho",
|
||
)
|
||
|
||
def cmd_honcho(args):
|
||
from honcho_integration.cli import honcho_command
|
||
honcho_command(args)
|
||
|
||
honcho_parser.set_defaults(func=cmd_honcho)
|
||
|
||
# =========================================================================
|
||
# tools command
|
||
# =========================================================================
|
||
tools_parser = subparsers.add_parser(
|
||
"tools",
|
||
help="Configure which tools are enabled per platform",
|
||
description=(
|
||
"Enable, disable, or list tools for CLI, Telegram, Discord, etc.\n\n"
|
||
"Built-in toolsets use plain names (e.g. web, memory).\n"
|
||
"MCP tools use server:tool notation (e.g. github:create_issue).\n\n"
|
||
"Run 'hermes tools' with no subcommand for the interactive configuration UI."
|
||
),
|
||
)
|
||
tools_parser.add_argument(
|
||
"--summary",
|
||
action="store_true",
|
||
help="Print a summary of enabled tools per platform and exit"
|
||
)
|
||
tools_sub = tools_parser.add_subparsers(dest="tools_action")
|
||
|
||
# hermes tools list [--platform cli]
|
||
tools_list_p = tools_sub.add_parser(
|
||
"list",
|
||
help="Show all tools and their enabled/disabled status",
|
||
)
|
||
tools_list_p.add_argument(
|
||
"--platform", default="cli",
|
||
help="Platform to show (default: cli)",
|
||
)
|
||
|
||
# hermes tools disable <name...> [--platform cli]
|
||
tools_disable_p = tools_sub.add_parser(
|
||
"disable",
|
||
help="Disable toolsets or MCP tools",
|
||
)
|
||
tools_disable_p.add_argument(
|
||
"names", nargs="+", metavar="NAME",
|
||
help="Toolset name (e.g. web) or MCP tool in server:tool form",
|
||
)
|
||
tools_disable_p.add_argument(
|
||
"--platform", default="cli",
|
||
help="Platform to apply to (default: cli)",
|
||
)
|
||
|
||
# hermes tools enable <name...> [--platform cli]
|
||
tools_enable_p = tools_sub.add_parser(
|
||
"enable",
|
||
help="Enable toolsets or MCP tools",
|
||
)
|
||
tools_enable_p.add_argument(
|
||
"names", nargs="+", metavar="NAME",
|
||
help="Toolset name or MCP tool in server:tool form",
|
||
)
|
||
tools_enable_p.add_argument(
|
||
"--platform", default="cli",
|
||
help="Platform to apply to (default: cli)",
|
||
)
|
||
|
||
def cmd_tools(args):
|
||
action = getattr(args, "tools_action", None)
|
||
if action in ("list", "disable", "enable"):
|
||
from hermes_cli.tools_config import tools_disable_enable_command
|
||
tools_disable_enable_command(args)
|
||
else:
|
||
_require_tty("tools")
|
||
from hermes_cli.tools_config import tools_command
|
||
tools_command(args)
|
||
|
||
tools_parser.set_defaults(func=cmd_tools)
|
||
# =========================================================================
|
||
# mcp command — manage MCP server connections
|
||
# =========================================================================
|
||
mcp_parser = subparsers.add_parser(
|
||
"mcp",
|
||
help="Manage MCP servers and run Hermes as an MCP server",
|
||
description=(
|
||
"Manage MCP server connections and run Hermes as an MCP server.\n\n"
|
||
"MCP servers provide additional tools via the Model Context Protocol.\n"
|
||
"Use 'hermes mcp add' to connect to a new server, or\n"
|
||
"'hermes mcp serve' to expose Hermes conversations over MCP."
|
||
),
|
||
)
|
||
mcp_sub = mcp_parser.add_subparsers(dest="mcp_action")
|
||
|
||
mcp_serve_p = mcp_sub.add_parser(
|
||
"serve",
|
||
help="Run Hermes as an MCP server (expose conversations to other agents)",
|
||
)
|
||
mcp_serve_p.add_argument(
|
||
"-v", "--verbose", action="store_true",
|
||
help="Enable verbose logging on stderr",
|
||
)
|
||
|
||
mcp_add_p = mcp_sub.add_parser("add", help="Add an MCP server (discovery-first install)")
|
||
mcp_add_p.add_argument("name", help="Server name (used as config key)")
|
||
mcp_add_p.add_argument("--url", help="HTTP/SSE endpoint URL")
|
||
mcp_add_p.add_argument("--command", help="Stdio command (e.g. npx)")
|
||
mcp_add_p.add_argument("--args", nargs="*", default=[], help="Arguments for stdio command")
|
||
mcp_add_p.add_argument("--auth", choices=["oauth", "header"], help="Auth method")
|
||
|
||
mcp_rm_p = mcp_sub.add_parser("remove", aliases=["rm"], help="Remove an MCP server")
|
||
mcp_rm_p.add_argument("name", help="Server name to remove")
|
||
|
||
mcp_sub.add_parser("list", aliases=["ls"], help="List configured MCP servers")
|
||
|
||
mcp_test_p = mcp_sub.add_parser("test", help="Test MCP server connection")
|
||
mcp_test_p.add_argument("name", help="Server name to test")
|
||
|
||
mcp_cfg_p = mcp_sub.add_parser("configure", aliases=["config"], help="Toggle tool selection")
|
||
mcp_cfg_p.add_argument("name", help="Server name to configure")
|
||
|
||
def cmd_mcp(args):
|
||
from hermes_cli.mcp_config import mcp_command
|
||
mcp_command(args)
|
||
|
||
mcp_parser.set_defaults(func=cmd_mcp)
|
||
|
||
# =========================================================================
|
||
# sessions command
|
||
# =========================================================================
|
||
sessions_parser = subparsers.add_parser(
|
||
"sessions",
|
||
help="Manage session history (list, rename, export, prune, delete)",
|
||
description="View and manage the SQLite session store"
|
||
)
|
||
sessions_subparsers = sessions_parser.add_subparsers(dest="sessions_action")
|
||
|
||
sessions_list = sessions_subparsers.add_parser("list", help="List recent sessions")
|
||
sessions_list.add_argument("--source", help="Filter by source (cli, telegram, discord, etc.)")
|
||
sessions_list.add_argument("--limit", type=int, default=20, help="Max sessions to show")
|
||
|
||
sessions_export = sessions_subparsers.add_parser("export", help="Export sessions to a JSONL file")
|
||
sessions_export.add_argument("output", help="Output JSONL file path (use - for stdout)")
|
||
sessions_export.add_argument("--source", help="Filter by source")
|
||
sessions_export.add_argument("--session-id", help="Export a specific session")
|
||
|
||
sessions_delete = sessions_subparsers.add_parser("delete", help="Delete a specific session")
|
||
sessions_delete.add_argument("session_id", help="Session ID to delete")
|
||
sessions_delete.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
|
||
|
||
sessions_prune = sessions_subparsers.add_parser("prune", help="Delete old sessions")
|
||
sessions_prune.add_argument("--older-than", type=int, default=90, help="Delete sessions older than N days (default: 90)")
|
||
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
|
||
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
|
||
|
||
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
|
||
|
||
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
|
||
sessions_rename.add_argument("session_id", help="Session ID to rename")
|
||
sessions_rename.add_argument("title", nargs="+", help="New title for the session")
|
||
|
||
sessions_browse = sessions_subparsers.add_parser(
|
||
"browse",
|
||
help="Interactive session picker — browse, search, and resume sessions",
|
||
)
|
||
sessions_browse.add_argument("--source", help="Filter by source (cli, telegram, discord, etc.)")
|
||
sessions_browse.add_argument("--limit", type=int, default=50, help="Max sessions to load (default: 50)")
|
||
|
||
def _confirm_prompt(prompt: str) -> bool:
|
||
"""Prompt for y/N confirmation, safe against non-TTY environments."""
|
||
try:
|
||
return input(prompt).strip().lower() in ("y", "yes")
|
||
except (EOFError, KeyboardInterrupt):
|
||
return False
|
||
|
||
def cmd_sessions(args):
|
||
import json as _json
|
||
try:
|
||
from hermes_state import SessionDB
|
||
db = SessionDB()
|
||
except Exception as e:
|
||
print(f"Error: Could not open session database: {e}")
|
||
return
|
||
|
||
action = args.sessions_action
|
||
|
||
# Hide third-party tool sessions by default, but honour explicit --source
|
||
_source = getattr(args, "source", None)
|
||
_exclude = None if _source else ["tool"]
|
||
|
||
if action == "list":
|
||
sessions = db.list_sessions_rich(source=args.source, exclude_sources=_exclude, limit=args.limit)
|
||
if not sessions:
|
||
print("No sessions found.")
|
||
return
|
||
has_titles = any(s.get("title") for s in sessions)
|
||
if has_titles:
|
||
print(f"{'Title':<32} {'Preview':<40} {'Last Active':<13} {'ID'}")
|
||
print("─" * 110)
|
||
else:
|
||
print(f"{'Preview':<50} {'Last Active':<13} {'Src':<6} {'ID'}")
|
||
print("─" * 95)
|
||
for s in sessions:
|
||
last_active = _relative_time(s.get("last_active"))
|
||
preview = s.get("preview", "")[:38] if has_titles else s.get("preview", "")[:48]
|
||
if has_titles:
|
||
title = (s.get("title") or "—")[:30]
|
||
sid = s["id"]
|
||
print(f"{title:<32} {preview:<40} {last_active:<13} {sid}")
|
||
else:
|
||
sid = s["id"]
|
||
print(f"{preview:<50} {last_active:<13} {s['source']:<6} {sid}")
|
||
|
||
elif action == "export":
|
||
if args.session_id:
|
||
resolved_session_id = db.resolve_session_id(args.session_id)
|
||
if not resolved_session_id:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
return
|
||
data = db.export_session(resolved_session_id)
|
||
if not data:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
return
|
||
line = _json.dumps(data, ensure_ascii=False) + "\n"
|
||
if args.output == "-":
|
||
import sys
|
||
sys.stdout.write(line)
|
||
else:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
f.write(line)
|
||
print(f"Exported 1 session to {args.output}")
|
||
else:
|
||
sessions = db.export_all(source=args.source)
|
||
if args.output == "-":
|
||
import sys
|
||
for s in sessions:
|
||
sys.stdout.write(_json.dumps(s, ensure_ascii=False) + "\n")
|
||
else:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
for s in sessions:
|
||
f.write(_json.dumps(s, ensure_ascii=False) + "\n")
|
||
print(f"Exported {len(sessions)} sessions to {args.output}")
|
||
|
||
elif action == "delete":
|
||
resolved_session_id = db.resolve_session_id(args.session_id)
|
||
if not resolved_session_id:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
return
|
||
if not args.yes:
|
||
if not _confirm_prompt(f"Delete session '{resolved_session_id}' and all its messages? [y/N] "):
|
||
print("Cancelled.")
|
||
return
|
||
if db.delete_session(resolved_session_id):
|
||
print(f"Deleted session '{resolved_session_id}'.")
|
||
else:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
|
||
elif action == "prune":
|
||
days = args.older_than
|
||
source_msg = f" from '{args.source}'" if args.source else ""
|
||
if not args.yes:
|
||
if not _confirm_prompt(f"Delete all ended sessions older than {days} days{source_msg}? [y/N] "):
|
||
print("Cancelled.")
|
||
return
|
||
count = db.prune_sessions(older_than_days=days, source=args.source)
|
||
print(f"Pruned {count} session(s).")
|
||
|
||
elif action == "rename":
|
||
resolved_session_id = db.resolve_session_id(args.session_id)
|
||
if not resolved_session_id:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
return
|
||
title = " ".join(args.title)
|
||
try:
|
||
if db.set_session_title(resolved_session_id, title):
|
||
print(f"Session '{resolved_session_id}' renamed to: {title}")
|
||
else:
|
||
print(f"Session '{args.session_id}' not found.")
|
||
except ValueError as e:
|
||
print(f"Error: {e}")
|
||
|
||
elif action == "browse":
|
||
limit = getattr(args, "limit", 50) or 50
|
||
source = getattr(args, "source", None)
|
||
_browse_exclude = None if source else ["tool"]
|
||
sessions = db.list_sessions_rich(source=source, exclude_sources=_browse_exclude, limit=limit)
|
||
db.close()
|
||
if not sessions:
|
||
print("No sessions found.")
|
||
return
|
||
|
||
selected_id = _session_browse_picker(sessions)
|
||
if not selected_id:
|
||
print("Cancelled.")
|
||
return
|
||
|
||
# Launch hermes --resume <id> by replacing the current process
|
||
print(f"Resuming session: {selected_id}")
|
||
import shutil
|
||
hermes_bin = shutil.which("hermes")
|
||
if hermes_bin:
|
||
os.execvp(hermes_bin, ["hermes", "--resume", selected_id])
|
||
else:
|
||
# Fallback: re-invoke via python -m
|
||
os.execvp(
|
||
sys.executable,
|
||
[sys.executable, "-m", "hermes_cli.main", "--resume", selected_id],
|
||
)
|
||
return # won't reach here after execvp
|
||
|
||
elif action == "stats":
|
||
total = db.session_count()
|
||
msgs = db.message_count()
|
||
print(f"Total sessions: {total}")
|
||
print(f"Total messages: {msgs}")
|
||
for src in ["cli", "telegram", "discord", "whatsapp", "slack"]:
|
||
c = db.session_count(source=src)
|
||
if c > 0:
|
||
print(f" {src}: {c} sessions")
|
||
db_path = db.db_path
|
||
if db_path.exists():
|
||
size_mb = os.path.getsize(db_path) / (1024 * 1024)
|
||
print(f"Database size: {size_mb:.1f} MB")
|
||
|
||
else:
|
||
sessions_parser.print_help()
|
||
|
||
db.close()
|
||
|
||
sessions_parser.set_defaults(func=cmd_sessions)
|
||
|
||
# =========================================================================
|
||
# insights command
|
||
# =========================================================================
|
||
insights_parser = subparsers.add_parser(
|
||
"insights",
|
||
help="Show usage insights and analytics",
|
||
description="Analyze session history to show token usage, costs, tool patterns, and activity trends"
|
||
)
|
||
insights_parser.add_argument("--days", type=int, default=30, help="Number of days to analyze (default: 30)")
|
||
insights_parser.add_argument("--source", help="Filter by platform (cli, telegram, discord, etc.)")
|
||
|
||
def cmd_insights(args):
|
||
try:
|
||
from hermes_state import SessionDB
|
||
from agent.insights import InsightsEngine
|
||
|
||
db = SessionDB()
|
||
engine = InsightsEngine(db)
|
||
report = engine.generate(days=args.days, source=args.source)
|
||
print(engine.format_terminal(report))
|
||
db.close()
|
||
except Exception as e:
|
||
print(f"Error generating insights: {e}")
|
||
|
||
insights_parser.set_defaults(func=cmd_insights)
|
||
|
||
# =========================================================================
|
||
# claw command (OpenClaw migration)
|
||
# =========================================================================
|
||
claw_parser = subparsers.add_parser(
|
||
"claw",
|
||
help="OpenClaw migration tools",
|
||
description="Migrate settings, memories, skills, and API keys from OpenClaw to Hermes"
|
||
)
|
||
claw_subparsers = claw_parser.add_subparsers(dest="claw_action")
|
||
|
||
# claw migrate
|
||
claw_migrate = claw_subparsers.add_parser(
|
||
"migrate",
|
||
help="Migrate from OpenClaw to Hermes",
|
||
description="Import settings, memories, skills, and API keys from an OpenClaw installation"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--source",
|
||
help="Path to OpenClaw directory (default: ~/.openclaw)"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Preview what would be migrated without making changes"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--preset",
|
||
choices=["user-data", "full"],
|
||
default="full",
|
||
help="Migration preset (default: full). 'user-data' excludes secrets"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--overwrite",
|
||
action="store_true",
|
||
help="Overwrite existing files (default: skip conflicts)"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--migrate-secrets",
|
||
action="store_true",
|
||
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.)"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--workspace-target",
|
||
help="Absolute path to copy workspace instructions into"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--skill-conflict",
|
||
choices=["skip", "overwrite", "rename"],
|
||
default="skip",
|
||
help="How to handle skill name conflicts (default: skip)"
|
||
)
|
||
claw_migrate.add_argument(
|
||
"--yes", "-y",
|
||
action="store_true",
|
||
help="Skip confirmation prompts"
|
||
)
|
||
|
||
def cmd_claw(args):
|
||
from hermes_cli.claw import claw_command
|
||
claw_command(args)
|
||
|
||
claw_parser.set_defaults(func=cmd_claw)
|
||
|
||
# =========================================================================
|
||
# version command
|
||
# =========================================================================
|
||
version_parser = subparsers.add_parser(
|
||
"version",
|
||
help="Show version information"
|
||
)
|
||
version_parser.set_defaults(func=cmd_version)
|
||
|
||
# =========================================================================
|
||
# update command
|
||
# =========================================================================
|
||
update_parser = subparsers.add_parser(
|
||
"update",
|
||
help="Update Hermes Agent to the latest version",
|
||
description="Pull the latest changes from git and reinstall dependencies"
|
||
)
|
||
update_parser.set_defaults(func=cmd_update)
|
||
|
||
# =========================================================================
|
||
# uninstall command
|
||
# =========================================================================
|
||
uninstall_parser = subparsers.add_parser(
|
||
"uninstall",
|
||
help="Uninstall Hermes Agent",
|
||
description="Remove Hermes Agent from your system. Can keep configs/data for reinstall."
|
||
)
|
||
uninstall_parser.add_argument(
|
||
"--full",
|
||
action="store_true",
|
||
help="Full uninstall - remove everything including configs and data"
|
||
)
|
||
uninstall_parser.add_argument(
|
||
"--yes", "-y",
|
||
action="store_true",
|
||
help="Skip confirmation prompts"
|
||
)
|
||
uninstall_parser.set_defaults(func=cmd_uninstall)
|
||
|
||
# =========================================================================
|
||
# acp command
|
||
# =========================================================================
|
||
acp_parser = subparsers.add_parser(
|
||
"acp",
|
||
help="Run Hermes Agent as an ACP (Agent Client Protocol) server",
|
||
description="Start Hermes Agent in ACP mode for editor integration (VS Code, Zed, JetBrains)",
|
||
)
|
||
|
||
def cmd_acp(args):
|
||
"""Launch Hermes Agent as an ACP server."""
|
||
try:
|
||
from acp_adapter.entry import main as acp_main
|
||
acp_main()
|
||
except ImportError:
|
||
print("ACP dependencies not installed.")
|
||
print("Install them with: pip install -e '.[acp]'")
|
||
sys.exit(1)
|
||
|
||
acp_parser.set_defaults(func=cmd_acp)
|
||
|
||
# =========================================================================
|
||
# profile command
|
||
# =========================================================================
|
||
profile_parser = subparsers.add_parser(
|
||
"profile",
|
||
help="Manage profiles — multiple isolated Hermes instances",
|
||
)
|
||
profile_subparsers = profile_parser.add_subparsers(dest="profile_action")
|
||
|
||
profile_list = profile_subparsers.add_parser("list", help="List all profiles")
|
||
profile_use = profile_subparsers.add_parser("use", help="Set sticky default profile")
|
||
profile_use.add_argument("profile_name", help="Profile name (or 'default')")
|
||
|
||
profile_create = profile_subparsers.add_parser("create", help="Create a new profile")
|
||
profile_create.add_argument("profile_name", help="Profile name (lowercase, alphanumeric)")
|
||
profile_create.add_argument("--clone", action="store_true",
|
||
help="Copy config.yaml, .env, SOUL.md from active profile")
|
||
profile_create.add_argument("--clone-all", action="store_true",
|
||
help="Full copy of active profile (all state)")
|
||
profile_create.add_argument("--clone-from", metavar="SOURCE",
|
||
help="Source profile to clone from (default: active)")
|
||
profile_create.add_argument("--no-alias", action="store_true",
|
||
help="Skip wrapper script creation")
|
||
|
||
profile_delete = profile_subparsers.add_parser("delete", help="Delete a profile")
|
||
profile_delete.add_argument("profile_name", help="Profile to delete")
|
||
profile_delete.add_argument("-y", "--yes", action="store_true",
|
||
help="Skip confirmation prompt")
|
||
|
||
profile_show = profile_subparsers.add_parser("show", help="Show profile details")
|
||
profile_show.add_argument("profile_name", help="Profile to show")
|
||
|
||
profile_alias = profile_subparsers.add_parser("alias", help="Manage wrapper scripts")
|
||
profile_alias.add_argument("profile_name", help="Profile name")
|
||
profile_alias.add_argument("--remove", action="store_true",
|
||
help="Remove the wrapper script")
|
||
profile_alias.add_argument("--name", dest="alias_name", metavar="NAME",
|
||
help="Custom alias name (default: profile name)")
|
||
|
||
profile_rename = profile_subparsers.add_parser("rename", help="Rename a profile")
|
||
profile_rename.add_argument("old_name", help="Current profile name")
|
||
profile_rename.add_argument("new_name", help="New profile name")
|
||
|
||
profile_export = profile_subparsers.add_parser("export", help="Export a profile to archive")
|
||
profile_export.add_argument("profile_name", help="Profile to export")
|
||
profile_export.add_argument("-o", "--output", default=None,
|
||
help="Output file (default: <name>.tar.gz)")
|
||
|
||
profile_import = profile_subparsers.add_parser("import", help="Import a profile from archive")
|
||
profile_import.add_argument("archive", help="Path to .tar.gz archive")
|
||
profile_import.add_argument("--name", dest="import_name", metavar="NAME",
|
||
help="Profile name (default: inferred from archive)")
|
||
|
||
profile_parser.set_defaults(func=cmd_profile)
|
||
|
||
# =========================================================================
|
||
# completion command
|
||
# =========================================================================
|
||
completion_parser = subparsers.add_parser(
|
||
"completion",
|
||
help="Print shell completion script (bash or zsh)",
|
||
)
|
||
completion_parser.add_argument(
|
||
"shell", nargs="?", default="bash", choices=["bash", "zsh"],
|
||
help="Shell type (default: bash)",
|
||
)
|
||
completion_parser.set_defaults(func=cmd_completion)
|
||
|
||
# =========================================================================
|
||
# Parse and execute
|
||
# =========================================================================
|
||
# Pre-process argv so unquoted multi-word session names after -c / -r
|
||
# are merged into a single token before argparse sees them.
|
||
# e.g. ``hermes -c Pokemon Agent Dev`` → ``hermes -c 'Pokemon Agent Dev'``
|
||
_processed_argv = _coalesce_session_name_args(sys.argv[1:])
|
||
args = parser.parse_args(_processed_argv)
|
||
|
||
# Handle --version flag
|
||
if args.version:
|
||
cmd_version(args)
|
||
return
|
||
|
||
# Handle top-level --resume / --continue as shortcut to chat
|
||
if (args.resume or args.continue_last) and args.command is None:
|
||
args.command = "chat"
|
||
args.query = None
|
||
args.model = None
|
||
args.provider = None
|
||
args.toolsets = None
|
||
args.verbose = False
|
||
if not hasattr(args, "worktree"):
|
||
args.worktree = False
|
||
cmd_chat(args)
|
||
return
|
||
|
||
# Default to chat if no command specified
|
||
if args.command is None:
|
||
args.query = None
|
||
args.model = None
|
||
args.provider = None
|
||
args.toolsets = None
|
||
args.verbose = False
|
||
args.resume = None
|
||
args.continue_last = None
|
||
if not hasattr(args, "worktree"):
|
||
args.worktree = False
|
||
cmd_chat(args)
|
||
return
|
||
|
||
# Execute the command
|
||
if hasattr(args, 'func'):
|
||
args.func(args)
|
||
else:
|
||
parser.print_help()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|