feat: add profiles — run multiple isolated Hermes instances (#3681)
Each profile is a fully independent HERMES_HOME with its own config,
API keys, memory, sessions, skills, gateway, cron, and state.db.
Core module: hermes_cli/profiles.py (~900 lines)
- Profile CRUD: create, delete, list, show, rename
- Three clone levels: blank, --clone (config), --clone-all (everything)
- Export/import: tar.gz archive for backup and migration
- Wrapper alias scripts (~/.local/bin/<name>)
- Collision detection for alias names
- Sticky default via ~/.hermes/active_profile
- Skill seeding via subprocess (handles module-level caching)
- Auto-stop gateway on delete with disable-before-stop for services
- Tab completion generation for bash and zsh
CLI integration (hermes_cli/main.py):
- _apply_profile_override(): pre-import -p/--profile flag + sticky default
- Full 'hermes profile' subcommand: list, use, create, delete, show,
alias, rename, export, import
- 'hermes completion bash/zsh' command
- Multi-profile skill sync in hermes update
Display (cli.py, banner.py, gateway/run.py):
- CLI prompt: 'coder ❯' when using a non-default profile
- Banner shows profile name
- Gateway startup log includes profile name
Gateway safety:
- Token locks: Discord, Slack, WhatsApp, Signal (extends Telegram pattern)
- Port conflict detection: API server, webhook adapter
Diagnostics (hermes_cli/doctor.py):
- Profile health section: lists profiles, checks config, .env, aliases
- Orphan alias detection: warns when wrapper points to deleted profile
Tests (tests/hermes_cli/test_profiles.py):
- 71 automated tests covering: validation, CRUD, clone levels, rename,
export/import, active profile, isolation, alias collision, completion
- Full suite: 6760 passed, 0 new failures
Documentation:
- website/docs/user-guide/profiles.md: full user guide (12 sections)
- website/docs/reference/profile-commands.md: command reference (12 commands)
- website/docs/reference/faq.md: 6 profile FAQ entries
- website/sidebars.ts: navigation updated
This commit is contained in:
@@ -403,6 +403,15 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
|
||||
if mcp_connected:
|
||||
summary_parts.append(f"{mcp_connected} MCP servers")
|
||||
summary_parts.append("/help for commands")
|
||||
# Show active profile name when not 'default'
|
||||
try:
|
||||
from hermes_cli.profiles import get_active_profile_name
|
||||
_profile_name = get_active_profile_name()
|
||||
if _profile_name and _profile_name != "default":
|
||||
right_lines.append(f"[bold {accent}]Profile:[/] [{text}]{_profile_name}[/]")
|
||||
except Exception:
|
||||
pass # Never break the banner over a profiles.py bug
|
||||
|
||||
right_lines.append(f"[dim {dim}]{' · '.join(summary_parts)}[/]")
|
||||
|
||||
# Update check — use prefetched result if available
|
||||
|
||||
@@ -730,6 +730,53 @@ def run_doctor(args):
|
||||
except Exception as _e:
|
||||
check_warn("Honcho check failed", str(_e))
|
||||
|
||||
# =========================================================================
|
||||
# Profiles
|
||||
# =========================================================================
|
||||
try:
|
||||
from hermes_cli.profiles import list_profiles, _get_wrapper_dir, profile_exists
|
||||
import re as _re
|
||||
|
||||
named_profiles = [p for p in list_profiles() if not p.is_default]
|
||||
if named_profiles:
|
||||
print()
|
||||
print(color("◆ Profiles", Colors.CYAN, Colors.BOLD))
|
||||
check_ok(f"{len(named_profiles)} profile(s) found")
|
||||
wrapper_dir = _get_wrapper_dir()
|
||||
for p in named_profiles:
|
||||
parts = []
|
||||
if p.gateway_running:
|
||||
parts.append("gateway running")
|
||||
if p.model:
|
||||
parts.append(p.model[:30])
|
||||
if not (p.path / "config.yaml").exists():
|
||||
parts.append("⚠ missing config")
|
||||
if not (p.path / ".env").exists():
|
||||
parts.append("no .env")
|
||||
wrapper = wrapper_dir / p.name
|
||||
if not wrapper.exists():
|
||||
parts.append("no alias")
|
||||
status = ", ".join(parts) if parts else "configured"
|
||||
check_ok(f" {p.name}: {status}")
|
||||
|
||||
# Check for orphan wrappers
|
||||
if wrapper_dir.is_dir():
|
||||
for wrapper in wrapper_dir.iterdir():
|
||||
if not wrapper.is_file():
|
||||
continue
|
||||
try:
|
||||
content = wrapper.read_text()
|
||||
if "hermes -p" in content:
|
||||
_m = _re.search(r"hermes -p (\S+)", content)
|
||||
if _m and not profile_exists(_m.group(1)):
|
||||
check_warn(f"Orphan alias: {wrapper.name} → profile '{_m.group(1)}' no longer exists")
|
||||
except Exception:
|
||||
pass
|
||||
except ImportError:
|
||||
pass
|
||||
except Exception as _e:
|
||||
logger.debug("Profile health check failed: %s", _e)
|
||||
|
||||
# =========================================================================
|
||||
# Summary
|
||||
# =========================================================================
|
||||
|
||||
@@ -54,6 +54,71 @@ from typing import Optional
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile override — MUST happen before any hermes module import.
|
||||
#
|
||||
# Many modules cache HERMES_HOME at import time (module-level constants).
|
||||
# We intercept --profile/-p from sys.argv here and set the env var so that
|
||||
# every subsequent ``os.getenv("HERMES_HOME", ...)`` resolves correctly.
|
||||
# The flag is stripped from sys.argv so argparse never sees it.
|
||||
# Falls back to ~/.hermes/active_profile for sticky default.
|
||||
# ---------------------------------------------------------------------------
|
||||
def _apply_profile_override() -> None:
|
||||
"""Pre-parse --profile/-p and set HERMES_HOME before module imports."""
|
||||
argv = sys.argv[1:]
|
||||
profile_name = None
|
||||
consume = 0
|
||||
|
||||
# 1. Check for explicit -p / --profile flag
|
||||
for i, arg in enumerate(argv):
|
||||
if arg in ("--profile", "-p") and i + 1 < len(argv):
|
||||
profile_name = argv[i + 1]
|
||||
consume = 2
|
||||
break
|
||||
elif arg.startswith("--profile="):
|
||||
profile_name = arg.split("=", 1)[1]
|
||||
consume = 1
|
||||
break
|
||||
|
||||
# 2. If no flag, check ~/.hermes/active_profile
|
||||
if profile_name is None:
|
||||
try:
|
||||
active_path = Path.home() / ".hermes" / "active_profile"
|
||||
if active_path.exists():
|
||||
name = active_path.read_text().strip()
|
||||
if name and name != "default":
|
||||
profile_name = name
|
||||
consume = 0 # don't strip anything from argv
|
||||
except (UnicodeDecodeError, OSError):
|
||||
pass # corrupted file, skip
|
||||
|
||||
# 3. If we found a profile, resolve and set HERMES_HOME
|
||||
if profile_name is not None:
|
||||
try:
|
||||
from hermes_cli.profiles import resolve_profile_env
|
||||
hermes_home = resolve_profile_env(profile_name)
|
||||
except (ValueError, FileNotFoundError) as exc:
|
||||
print(f"Error: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except Exception as exc:
|
||||
# A bug in profiles.py must NEVER prevent hermes from starting
|
||||
print(f"Warning: profile override failed ({exc}), using default", file=sys.stderr)
|
||||
return
|
||||
os.environ["HERMES_HOME"] = hermes_home
|
||||
# Strip the flag from argv so argparse doesn't choke
|
||||
if consume > 0:
|
||||
for i, arg in enumerate(argv):
|
||||
if arg in ("--profile", "-p"):
|
||||
start = i + 1 # +1 because argv is sys.argv[1:]
|
||||
sys.argv = sys.argv[:start] + sys.argv[start + consume:]
|
||||
break
|
||||
elif arg.startswith("--profile="):
|
||||
start = i + 1
|
||||
sys.argv = sys.argv[:start] + sys.argv[start + 1:]
|
||||
break
|
||||
|
||||
_apply_profile_override()
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from hermes_cli.config import get_hermes_home
|
||||
@@ -2924,7 +2989,35 @@ def cmd_update(args):
|
||||
print(" ✓ Skills are up to date")
|
||||
except Exception as e:
|
||||
logger.debug("Skills sync during update failed: %s", e)
|
||||
|
||||
|
||||
# Sync bundled skills to all other profiles
|
||||
try:
|
||||
from hermes_cli.profiles import list_profiles, get_active_profile_name, seed_profile_skills
|
||||
active = get_active_profile_name()
|
||||
other_profiles = [p for p in list_profiles() if not p.is_default and p.name != active]
|
||||
if other_profiles:
|
||||
print()
|
||||
print("→ Syncing bundled skills to other profiles...")
|
||||
for p in other_profiles:
|
||||
try:
|
||||
r = seed_profile_skills(p.path, quiet=True)
|
||||
if r:
|
||||
copied = len(r.get("copied", []))
|
||||
updated = len(r.get("updated", []))
|
||||
modified = len(r.get("user_modified", []))
|
||||
parts = []
|
||||
if copied: parts.append(f"+{copied} new")
|
||||
if updated: parts.append(f"↑{updated} updated")
|
||||
if modified: parts.append(f"~{modified} user-modified")
|
||||
status = ", ".join(parts) if parts else "up to date"
|
||||
else:
|
||||
status = "sync failed"
|
||||
print(f" {p.name}: {status}")
|
||||
except Exception as pe:
|
||||
print(f" {p.name}: error ({pe})")
|
||||
except Exception:
|
||||
pass # profiles module not available or no profiles
|
||||
|
||||
# Check for config migrations
|
||||
print()
|
||||
print("→ Checking configuration for new options...")
|
||||
@@ -3122,6 +3215,7 @@ def _coalesce_session_name_args(argv: list) -> list:
|
||||
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
|
||||
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
|
||||
"mcp", "sessions", "insights", "version", "update", "uninstall",
|
||||
"profile",
|
||||
}
|
||||
_SESSION_FLAGS = {"-c", "--continue", "-r", "--resume"}
|
||||
|
||||
@@ -3145,6 +3239,253 @@ def _coalesce_session_name_args(argv: list) -> list:
|
||||
return result
|
||||
|
||||
|
||||
def cmd_profile(args):
|
||||
"""Profile management — create, delete, list, switch, alias."""
|
||||
from hermes_cli.profiles import (
|
||||
list_profiles, create_profile, delete_profile, seed_profile_skills,
|
||||
get_active_profile, set_active_profile, get_active_profile_name,
|
||||
check_alias_collision, create_wrapper_script, remove_wrapper_script,
|
||||
_is_wrapper_dir_in_path, _get_wrapper_dir,
|
||||
)
|
||||
from hermes_constants import display_hermes_home
|
||||
|
||||
action = getattr(args, "profile_action", None)
|
||||
|
||||
if action is None:
|
||||
# Bare `hermes profile` — show current profile status
|
||||
profile_name = get_active_profile_name()
|
||||
dhh = display_hermes_home()
|
||||
print(f"\nActive profile: {profile_name}")
|
||||
print(f"Path: {dhh}")
|
||||
|
||||
profiles = list_profiles()
|
||||
for p in profiles:
|
||||
if p.name == profile_name or (profile_name == "default" and p.is_default):
|
||||
if p.model:
|
||||
print(f"Model: {p.model}" + (f" ({p.provider})" if p.provider else ""))
|
||||
print(f"Gateway: {'running' if p.gateway_running else 'stopped'}")
|
||||
print(f"Skills: {p.skill_count} installed")
|
||||
if p.alias_path:
|
||||
print(f"Alias: {p.name} → hermes -p {p.name}")
|
||||
break
|
||||
print()
|
||||
return
|
||||
|
||||
if action == "list":
|
||||
profiles = list_profiles()
|
||||
active = get_active_profile_name()
|
||||
|
||||
if not profiles:
|
||||
print("No profiles found.")
|
||||
return
|
||||
|
||||
# Header
|
||||
print(f"\n {'Profile':<16} {'Model':<28} {'Gateway':<12} {'Alias'}")
|
||||
print(f" {'─' * 15} {'─' * 27} {'─' * 11} {'─' * 12}")
|
||||
|
||||
for p in profiles:
|
||||
marker = " ◆" if (p.name == active or (active == "default" and p.is_default)) else " "
|
||||
name = p.name
|
||||
model = (p.model or "—")[:26]
|
||||
gw = "running" if p.gateway_running else "stopped"
|
||||
alias = p.name if p.alias_path else "—"
|
||||
if p.is_default:
|
||||
alias = "—"
|
||||
print(f"{marker}{name:<15} {model:<28} {gw:<12} {alias}")
|
||||
print()
|
||||
|
||||
elif action == "use":
|
||||
name = args.profile_name
|
||||
try:
|
||||
set_active_profile(name)
|
||||
if name == "default":
|
||||
print(f"Switched to: default (~/.hermes)")
|
||||
else:
|
||||
print(f"Switched to: {name}")
|
||||
except (ValueError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
elif action == "create":
|
||||
name = args.profile_name
|
||||
clone = getattr(args, "clone", False)
|
||||
clone_all = getattr(args, "clone_all", False)
|
||||
no_alias = getattr(args, "no_alias", False)
|
||||
|
||||
try:
|
||||
clone_from = getattr(args, "clone_from", None)
|
||||
|
||||
profile_dir = create_profile(
|
||||
name=name,
|
||||
clone_from=clone_from,
|
||||
clone_all=clone_all,
|
||||
clone_config=clone,
|
||||
no_alias=no_alias,
|
||||
)
|
||||
print(f"\nProfile '{name}' created at {profile_dir}")
|
||||
|
||||
if clone or clone_all:
|
||||
source_label = getattr(args, "clone_from", None) or get_active_profile_name()
|
||||
if clone_all:
|
||||
print(f"Full copy from {source_label}.")
|
||||
else:
|
||||
print(f"Cloned config, .env, SOUL.md from {source_label}.")
|
||||
|
||||
# Seed bundled skills (skip if --clone-all already copied them)
|
||||
if not clone_all:
|
||||
result = seed_profile_skills(profile_dir)
|
||||
if result:
|
||||
copied = len(result.get("copied", []))
|
||||
print(f"{copied} bundled skills synced.")
|
||||
else:
|
||||
print("⚠ Skills could not be seeded. Run `{} update` to retry.".format(name))
|
||||
|
||||
# Create wrapper alias
|
||||
if not no_alias:
|
||||
collision = check_alias_collision(name)
|
||||
if collision:
|
||||
print(f"\n⚠ Cannot create alias '{name}' — {collision}")
|
||||
print(f" Choose a custom alias: hermes profile alias {name} --name <custom>")
|
||||
print(f" Or access via flag: hermes -p {name} chat")
|
||||
else:
|
||||
wrapper_path = create_wrapper_script(name)
|
||||
if wrapper_path:
|
||||
print(f"Wrapper created: {wrapper_path}")
|
||||
if not _is_wrapper_dir_in_path():
|
||||
print(f"\n⚠ {_get_wrapper_dir()} is not in your PATH.")
|
||||
print(f' Add to your shell config (~/.bashrc or ~/.zshrc):')
|
||||
print(f' export PATH="$HOME/.local/bin:$PATH"')
|
||||
|
||||
# Next steps
|
||||
print(f"\nNext steps:")
|
||||
print(f" {name} setup Configure API keys and model")
|
||||
print(f" {name} chat Start chatting")
|
||||
print(f" {name} gateway start Start the messaging gateway")
|
||||
if clone or clone_all:
|
||||
from hermes_constants import get_hermes_home
|
||||
profile_dir_display = f"~/.hermes/profiles/{name}"
|
||||
print(f"\n Edit {profile_dir_display}/.env for different API keys")
|
||||
print(f" Edit {profile_dir_display}/SOUL.md for different personality")
|
||||
print()
|
||||
|
||||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
elif action == "delete":
|
||||
name = args.profile_name
|
||||
yes = getattr(args, "yes", False)
|
||||
try:
|
||||
delete_profile(name, yes=yes)
|
||||
except (ValueError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
elif action == "show":
|
||||
name = args.profile_name
|
||||
from hermes_cli.profiles import get_profile_dir, profile_exists, _read_config_model, _check_gateway_running, _count_skills
|
||||
if not profile_exists(name):
|
||||
print(f"Error: Profile '{name}' does not exist.")
|
||||
sys.exit(1)
|
||||
profile_dir = get_profile_dir(name)
|
||||
model, provider = _read_config_model(profile_dir)
|
||||
gw = _check_gateway_running(profile_dir)
|
||||
skills = _count_skills(profile_dir)
|
||||
wrapper = _get_wrapper_dir() / name
|
||||
|
||||
print(f"\nProfile: {name}")
|
||||
print(f"Path: {profile_dir}")
|
||||
if model:
|
||||
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
|
||||
print(f"Gateway: {'running' if gw else 'stopped'}")
|
||||
print(f"Skills: {skills}")
|
||||
print(f".env: {'exists' if (profile_dir / '.env').exists() else 'not configured'}")
|
||||
print(f"SOUL.md: {'exists' if (profile_dir / 'SOUL.md').exists() else 'not configured'}")
|
||||
if wrapper.exists():
|
||||
print(f"Alias: {wrapper}")
|
||||
print()
|
||||
|
||||
elif action == "alias":
|
||||
name = args.profile_name
|
||||
remove = getattr(args, "remove", False)
|
||||
custom_name = getattr(args, "alias_name", None)
|
||||
|
||||
from hermes_cli.profiles import profile_exists
|
||||
if not profile_exists(name):
|
||||
print(f"Error: Profile '{name}' does not exist.")
|
||||
sys.exit(1)
|
||||
|
||||
alias_name = custom_name or name
|
||||
|
||||
if remove:
|
||||
if remove_wrapper_script(alias_name):
|
||||
print(f"✓ Removed alias '{alias_name}'")
|
||||
else:
|
||||
print(f"No alias '{alias_name}' found to remove.")
|
||||
else:
|
||||
collision = check_alias_collision(alias_name)
|
||||
if collision:
|
||||
print(f"Error: {collision}")
|
||||
sys.exit(1)
|
||||
wrapper_path = create_wrapper_script(alias_name)
|
||||
if wrapper_path:
|
||||
# If custom name, write the profile name into the wrapper
|
||||
if custom_name:
|
||||
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
|
||||
print(f"✓ Alias created: {wrapper_path}")
|
||||
if not _is_wrapper_dir_in_path():
|
||||
print(f"⚠ {_get_wrapper_dir()} is not in your PATH.")
|
||||
|
||||
elif action == "rename":
|
||||
from hermes_cli.profiles import rename_profile
|
||||
try:
|
||||
new_dir = rename_profile(args.old_name, args.new_name)
|
||||
print(f"\nProfile renamed: {args.old_name} → {args.new_name}")
|
||||
print(f"Path: {new_dir}\n")
|
||||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
elif action == "export":
|
||||
from hermes_cli.profiles import export_profile
|
||||
name = args.profile_name
|
||||
output = args.output or f"{name}.tar.gz"
|
||||
try:
|
||||
result_path = export_profile(name, output)
|
||||
print(f"✓ Exported '{name}' to {result_path}")
|
||||
except (ValueError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
elif action == "import":
|
||||
from hermes_cli.profiles import import_profile
|
||||
try:
|
||||
profile_dir = import_profile(args.archive, name=getattr(args, "import_name", None))
|
||||
name = profile_dir.name
|
||||
print(f"✓ Imported profile '{name}' at {profile_dir}")
|
||||
|
||||
# Offer to create alias
|
||||
collision = check_alias_collision(name)
|
||||
if not collision:
|
||||
wrapper_path = create_wrapper_script(name)
|
||||
if wrapper_path:
|
||||
print(f" Wrapper created: {wrapper_path}")
|
||||
print()
|
||||
except (ValueError, FileExistsError, FileNotFoundError) as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def cmd_completion(args):
|
||||
"""Print shell completion script."""
|
||||
from hermes_cli.profiles import generate_bash_completion, generate_zsh_completion
|
||||
shell = getattr(args, "shell", "bash")
|
||||
if shell == "zsh":
|
||||
print(generate_zsh_completion())
|
||||
else:
|
||||
print(generate_bash_completion())
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for hermes CLI."""
|
||||
parser = argparse.ArgumentParser(
|
||||
@@ -4342,7 +4683,75 @@ For more help on a command:
|
||||
sys.exit(1)
|
||||
|
||||
acp_parser.set_defaults(func=cmd_acp)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# profile command
|
||||
# =========================================================================
|
||||
profile_parser = subparsers.add_parser(
|
||||
"profile",
|
||||
help="Manage profiles — multiple isolated Hermes instances",
|
||||
)
|
||||
profile_subparsers = profile_parser.add_subparsers(dest="profile_action")
|
||||
|
||||
profile_list = profile_subparsers.add_parser("list", help="List all profiles")
|
||||
profile_use = profile_subparsers.add_parser("use", help="Set sticky default profile")
|
||||
profile_use.add_argument("profile_name", help="Profile name (or 'default')")
|
||||
|
||||
profile_create = profile_subparsers.add_parser("create", help="Create a new profile")
|
||||
profile_create.add_argument("profile_name", help="Profile name (lowercase, alphanumeric)")
|
||||
profile_create.add_argument("--clone", action="store_true",
|
||||
help="Copy config.yaml, .env, SOUL.md from active profile")
|
||||
profile_create.add_argument("--clone-all", action="store_true",
|
||||
help="Full copy of active profile (all state)")
|
||||
profile_create.add_argument("--clone-from", metavar="SOURCE",
|
||||
help="Source profile to clone from (default: active)")
|
||||
profile_create.add_argument("--no-alias", action="store_true",
|
||||
help="Skip wrapper script creation")
|
||||
|
||||
profile_delete = profile_subparsers.add_parser("delete", help="Delete a profile")
|
||||
profile_delete.add_argument("profile_name", help="Profile to delete")
|
||||
profile_delete.add_argument("-y", "--yes", action="store_true",
|
||||
help="Skip confirmation prompt")
|
||||
|
||||
profile_show = profile_subparsers.add_parser("show", help="Show profile details")
|
||||
profile_show.add_argument("profile_name", help="Profile to show")
|
||||
|
||||
profile_alias = profile_subparsers.add_parser("alias", help="Manage wrapper scripts")
|
||||
profile_alias.add_argument("profile_name", help="Profile name")
|
||||
profile_alias.add_argument("--remove", action="store_true",
|
||||
help="Remove the wrapper script")
|
||||
profile_alias.add_argument("--name", dest="alias_name", metavar="NAME",
|
||||
help="Custom alias name (default: profile name)")
|
||||
|
||||
profile_rename = profile_subparsers.add_parser("rename", help="Rename a profile")
|
||||
profile_rename.add_argument("old_name", help="Current profile name")
|
||||
profile_rename.add_argument("new_name", help="New profile name")
|
||||
|
||||
profile_export = profile_subparsers.add_parser("export", help="Export a profile to archive")
|
||||
profile_export.add_argument("profile_name", help="Profile to export")
|
||||
profile_export.add_argument("-o", "--output", default=None,
|
||||
help="Output file (default: <name>.tar.gz)")
|
||||
|
||||
profile_import = profile_subparsers.add_parser("import", help="Import a profile from archive")
|
||||
profile_import.add_argument("archive", help="Path to .tar.gz archive")
|
||||
profile_import.add_argument("--name", dest="import_name", metavar="NAME",
|
||||
help="Profile name (default: inferred from archive)")
|
||||
|
||||
profile_parser.set_defaults(func=cmd_profile)
|
||||
|
||||
# =========================================================================
|
||||
# completion command
|
||||
# =========================================================================
|
||||
completion_parser = subparsers.add_parser(
|
||||
"completion",
|
||||
help="Print shell completion script (bash or zsh)",
|
||||
)
|
||||
completion_parser.add_argument(
|
||||
"shell", nargs="?", default="bash", choices=["bash", "zsh"],
|
||||
help="Shell type (default: bash)",
|
||||
)
|
||||
completion_parser.set_defaults(func=cmd_completion)
|
||||
|
||||
# =========================================================================
|
||||
# Parse and execute
|
||||
# =========================================================================
|
||||
|
||||
906
hermes_cli/profiles.py
Normal file
906
hermes_cli/profiles.py
Normal file
@@ -0,0 +1,906 @@
|
||||
"""
|
||||
Profile management for multiple isolated Hermes instances.
|
||||
|
||||
Each profile is a fully independent HERMES_HOME directory with its own
|
||||
config.yaml, .env, memory, sessions, skills, gateway, cron, and logs.
|
||||
Profiles live under ``~/.hermes/profiles/<name>/`` by default.
|
||||
|
||||
The "default" profile is ``~/.hermes`` itself — backward compatible,
|
||||
zero migration needed.
|
||||
|
||||
Usage::
|
||||
|
||||
hermes profile create coder # fresh profile + bundled skills
|
||||
hermes profile create coder --clone # also copy config, .env, SOUL.md
|
||||
hermes profile create coder --clone-all # full copy of source profile
|
||||
coder chat # use via wrapper alias
|
||||
hermes -p coder chat # or via flag
|
||||
hermes profile use coder # set as sticky default
|
||||
hermes profile delete coder # remove profile + alias + service
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import stat
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
_PROFILE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
|
||||
|
||||
# Directories bootstrapped inside every new profile
|
||||
_PROFILE_DIRS = [
|
||||
"memories",
|
||||
"sessions",
|
||||
"skills",
|
||||
"skins",
|
||||
"logs",
|
||||
"plans",
|
||||
"workspace",
|
||||
"cron",
|
||||
]
|
||||
|
||||
# Files copied during --clone (if they exist in the source)
|
||||
_CLONE_CONFIG_FILES = [
|
||||
"config.yaml",
|
||||
".env",
|
||||
"SOUL.md",
|
||||
]
|
||||
|
||||
# Runtime files stripped after --clone-all (shouldn't carry over)
|
||||
_CLONE_ALL_STRIP = [
|
||||
"gateway.pid",
|
||||
"gateway_state.json",
|
||||
"processes.json",
|
||||
]
|
||||
|
||||
# Names that cannot be used as profile aliases
|
||||
_RESERVED_NAMES = frozenset({
|
||||
"hermes", "default", "test", "tmp", "root", "sudo",
|
||||
})
|
||||
|
||||
# Hermes subcommands that cannot be used as profile names/aliases
|
||||
_HERMES_SUBCOMMANDS = frozenset({
|
||||
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
|
||||
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
|
||||
"mcp", "sessions", "insights", "version", "update", "uninstall",
|
||||
"profile", "plugins", "honcho", "acp",
|
||||
})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Path helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _get_profiles_root() -> Path:
|
||||
"""Return the directory where named profiles are stored.
|
||||
|
||||
Always ``~/.hermes/profiles/`` — anchored to the user's home,
|
||||
NOT to the current HERMES_HOME (which may itself be a profile).
|
||||
This ensures ``coder profile list`` can see all profiles.
|
||||
"""
|
||||
return Path.home() / ".hermes" / "profiles"
|
||||
|
||||
|
||||
def _get_default_hermes_home() -> Path:
|
||||
"""Return the default (pre-profile) HERMES_HOME path."""
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def _get_active_profile_path() -> Path:
|
||||
"""Return the path to the sticky active_profile file."""
|
||||
return _get_default_hermes_home() / "active_profile"
|
||||
|
||||
|
||||
def _get_wrapper_dir() -> Path:
|
||||
"""Return the directory for wrapper scripts."""
|
||||
return Path.home() / ".local" / "bin"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def validate_profile_name(name: str) -> None:
|
||||
"""Raise ``ValueError`` if *name* is not a valid profile identifier."""
|
||||
if name == "default":
|
||||
return # special alias for ~/.hermes
|
||||
if not _PROFILE_ID_RE.match(name):
|
||||
raise ValueError(
|
||||
f"Invalid profile name {name!r}. Must match "
|
||||
f"[a-z0-9][a-z0-9_-]{{0,63}}"
|
||||
)
|
||||
|
||||
|
||||
def get_profile_dir(name: str) -> Path:
|
||||
"""Resolve a profile name to its HERMES_HOME directory."""
|
||||
if name == "default":
|
||||
return _get_default_hermes_home()
|
||||
return _get_profiles_root() / name
|
||||
|
||||
|
||||
def profile_exists(name: str) -> bool:
|
||||
"""Check whether a profile directory exists."""
|
||||
if name == "default":
|
||||
return True
|
||||
return get_profile_dir(name).is_dir()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Alias / wrapper script management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_alias_collision(name: str) -> Optional[str]:
|
||||
"""Return a human-readable collision message, or None if the name is safe.
|
||||
|
||||
Checks: reserved names, hermes subcommands, existing binaries in PATH.
|
||||
"""
|
||||
if name in _RESERVED_NAMES:
|
||||
return f"'{name}' is a reserved name"
|
||||
if name in _HERMES_SUBCOMMANDS:
|
||||
return f"'{name}' conflicts with a hermes subcommand"
|
||||
|
||||
# Check existing commands in PATH
|
||||
wrapper_dir = _get_wrapper_dir()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["which", name], capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
existing_path = result.stdout.strip()
|
||||
# Allow overwriting our own wrappers
|
||||
if existing_path == str(wrapper_dir / name):
|
||||
try:
|
||||
content = (wrapper_dir / name).read_text()
|
||||
if "hermes -p" in content:
|
||||
return None # it's our wrapper, safe to overwrite
|
||||
except Exception:
|
||||
pass
|
||||
return f"'{name}' conflicts with an existing command ({existing_path})"
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
|
||||
return None # safe
|
||||
|
||||
|
||||
def _is_wrapper_dir_in_path() -> bool:
|
||||
"""Check if ~/.local/bin is in PATH."""
|
||||
wrapper_dir = str(_get_wrapper_dir())
|
||||
return wrapper_dir in os.environ.get("PATH", "").split(os.pathsep)
|
||||
|
||||
|
||||
def create_wrapper_script(name: str) -> Optional[Path]:
|
||||
"""Create a shell wrapper script at ~/.local/bin/<name>.
|
||||
|
||||
Returns the path to the created wrapper, or None if creation failed.
|
||||
"""
|
||||
wrapper_dir = _get_wrapper_dir()
|
||||
try:
|
||||
wrapper_dir.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as e:
|
||||
print(f"⚠ Could not create {wrapper_dir}: {e}")
|
||||
return None
|
||||
|
||||
wrapper_path = wrapper_dir / name
|
||||
try:
|
||||
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
|
||||
wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
|
||||
return wrapper_path
|
||||
except OSError as e:
|
||||
print(f"⚠ Could not create wrapper at {wrapper_path}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def remove_wrapper_script(name: str) -> bool:
|
||||
"""Remove the wrapper script for a profile. Returns True if removed."""
|
||||
wrapper_path = _get_wrapper_dir() / name
|
||||
if wrapper_path.exists():
|
||||
try:
|
||||
# Verify it's our wrapper before removing
|
||||
content = wrapper_path.read_text()
|
||||
if "hermes -p" in content:
|
||||
wrapper_path.unlink()
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ProfileInfo
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class ProfileInfo:
|
||||
"""Summary information about a profile."""
|
||||
name: str
|
||||
path: Path
|
||||
is_default: bool
|
||||
gateway_running: bool
|
||||
model: Optional[str] = None
|
||||
provider: Optional[str] = None
|
||||
has_env: bool = False
|
||||
skill_count: int = 0
|
||||
alias_path: Optional[Path] = None
|
||||
|
||||
|
||||
def _read_config_model(profile_dir: Path) -> tuple:
|
||||
"""Read model/provider from a profile's config.yaml. Returns (model, provider)."""
|
||||
config_path = profile_dir / "config.yaml"
|
||||
if not config_path.exists():
|
||||
return None, None
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path, "r") as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
model_cfg = cfg.get("model", {})
|
||||
if isinstance(model_cfg, str):
|
||||
return model_cfg, None
|
||||
if isinstance(model_cfg, dict):
|
||||
return model_cfg.get("model"), model_cfg.get("provider")
|
||||
return None, None
|
||||
except Exception:
|
||||
return None, None
|
||||
|
||||
|
||||
def _check_gateway_running(profile_dir: Path) -> bool:
|
||||
"""Check if a gateway is running for a given profile directory."""
|
||||
pid_file = profile_dir / "gateway.pid"
|
||||
if not pid_file.exists():
|
||||
return False
|
||||
try:
|
||||
raw = pid_file.read_text().strip()
|
||||
if not raw:
|
||||
return False
|
||||
data = json.loads(raw) if raw.startswith("{") else {"pid": int(raw)}
|
||||
pid = int(data["pid"])
|
||||
os.kill(pid, 0) # existence check
|
||||
return True
|
||||
except (json.JSONDecodeError, KeyError, ValueError, TypeError,
|
||||
ProcessLookupError, PermissionError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def _count_skills(profile_dir: Path) -> int:
|
||||
"""Count installed skills in a profile."""
|
||||
skills_dir = profile_dir / "skills"
|
||||
if not skills_dir.is_dir():
|
||||
return 0
|
||||
count = 0
|
||||
for md in skills_dir.rglob("SKILL.md"):
|
||||
if "/.hub/" not in str(md) and "/.git/" not in str(md):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CRUD operations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def list_profiles() -> List[ProfileInfo]:
|
||||
"""Return info for all profiles, including the default."""
|
||||
profiles = []
|
||||
wrapper_dir = _get_wrapper_dir()
|
||||
|
||||
# Default profile
|
||||
default_home = _get_default_hermes_home()
|
||||
if default_home.is_dir():
|
||||
model, provider = _read_config_model(default_home)
|
||||
profiles.append(ProfileInfo(
|
||||
name="default",
|
||||
path=default_home,
|
||||
is_default=True,
|
||||
gateway_running=_check_gateway_running(default_home),
|
||||
model=model,
|
||||
provider=provider,
|
||||
has_env=(default_home / ".env").exists(),
|
||||
skill_count=_count_skills(default_home),
|
||||
))
|
||||
|
||||
# Named profiles
|
||||
profiles_root = _get_profiles_root()
|
||||
if profiles_root.is_dir():
|
||||
for entry in sorted(profiles_root.iterdir()):
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
name = entry.name
|
||||
if not _PROFILE_ID_RE.match(name):
|
||||
continue
|
||||
model, provider = _read_config_model(entry)
|
||||
alias_path = wrapper_dir / name
|
||||
profiles.append(ProfileInfo(
|
||||
name=name,
|
||||
path=entry,
|
||||
is_default=False,
|
||||
gateway_running=_check_gateway_running(entry),
|
||||
model=model,
|
||||
provider=provider,
|
||||
has_env=(entry / ".env").exists(),
|
||||
skill_count=_count_skills(entry),
|
||||
alias_path=alias_path if alias_path.exists() else None,
|
||||
))
|
||||
|
||||
return profiles
|
||||
|
||||
|
||||
def create_profile(
|
||||
name: str,
|
||||
clone_from: Optional[str] = None,
|
||||
clone_all: bool = False,
|
||||
clone_config: bool = False,
|
||||
no_alias: bool = False,
|
||||
) -> Path:
|
||||
"""Create a new profile directory.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name:
|
||||
Profile identifier (lowercase, alphanumeric, hyphens, underscores).
|
||||
clone_from:
|
||||
Source profile to clone from. If ``None`` and clone_config/clone_all
|
||||
is True, defaults to the currently active profile.
|
||||
clone_all:
|
||||
If True, do a full copytree of the source (all state).
|
||||
clone_config:
|
||||
If True, copy only config files (config.yaml, .env, SOUL.md).
|
||||
no_alias:
|
||||
If True, skip wrapper script creation.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The newly created profile directory.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
|
||||
if name == "default":
|
||||
raise ValueError(
|
||||
"Cannot create a profile named 'default' — it is the built-in profile (~/.hermes)."
|
||||
)
|
||||
|
||||
profile_dir = get_profile_dir(name)
|
||||
if profile_dir.exists():
|
||||
raise FileExistsError(f"Profile '{name}' already exists at {profile_dir}")
|
||||
|
||||
# Resolve clone source
|
||||
source_dir = None
|
||||
if clone_from is not None or clone_all or clone_config:
|
||||
if clone_from is None:
|
||||
# Default: clone from active profile
|
||||
from hermes_constants import get_hermes_home
|
||||
source_dir = get_hermes_home()
|
||||
else:
|
||||
validate_profile_name(clone_from)
|
||||
source_dir = get_profile_dir(clone_from)
|
||||
if not source_dir.is_dir():
|
||||
raise FileNotFoundError(
|
||||
f"Source profile '{clone_from or 'active'}' does not exist at {source_dir}"
|
||||
)
|
||||
|
||||
if clone_all and source_dir:
|
||||
# Full copy of source profile
|
||||
shutil.copytree(source_dir, profile_dir)
|
||||
# Strip runtime files
|
||||
for stale in _CLONE_ALL_STRIP:
|
||||
(profile_dir / stale).unlink(missing_ok=True)
|
||||
else:
|
||||
# Bootstrap directory structure
|
||||
profile_dir.mkdir(parents=True, exist_ok=True)
|
||||
for subdir in _PROFILE_DIRS:
|
||||
(profile_dir / subdir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Clone config files from source
|
||||
if source_dir is not None:
|
||||
for filename in _CLONE_CONFIG_FILES:
|
||||
src = source_dir / filename
|
||||
if src.exists():
|
||||
shutil.copy2(src, profile_dir / filename)
|
||||
|
||||
return profile_dir
|
||||
|
||||
|
||||
def seed_profile_skills(profile_dir: Path, quiet: bool = False) -> Optional[dict]:
|
||||
"""Seed bundled skills into a profile via subprocess.
|
||||
|
||||
Uses subprocess because sync_skills() caches HERMES_HOME at module level.
|
||||
Returns the sync result dict, or None on failure.
|
||||
"""
|
||||
project_root = Path(__file__).parent.parent.resolve()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, "-c",
|
||||
"import json; from tools.skills_sync import sync_skills; "
|
||||
"r = sync_skills(quiet=True); print(json.dumps(r))"],
|
||||
env={**os.environ, "HERMES_HOME": str(profile_dir)},
|
||||
cwd=str(project_root),
|
||||
capture_output=True, text=True, timeout=60,
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
return json.loads(result.stdout.strip())
|
||||
if not quiet:
|
||||
print(f"⚠ Skill seeding returned exit code {result.returncode}")
|
||||
if result.stderr.strip():
|
||||
print(f" {result.stderr.strip()[:200]}")
|
||||
return None
|
||||
except subprocess.TimeoutExpired:
|
||||
if not quiet:
|
||||
print("⚠ Skill seeding timed out (60s)")
|
||||
return None
|
||||
except Exception as e:
|
||||
if not quiet:
|
||||
print(f"⚠ Skill seeding failed: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def delete_profile(name: str, yes: bool = False) -> Path:
|
||||
"""Delete a profile, its wrapper script, and its gateway service.
|
||||
|
||||
Stops the gateway if running. Disables systemd/launchd service first
|
||||
to prevent auto-restart.
|
||||
|
||||
Returns the path that was removed.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
|
||||
if name == "default":
|
||||
raise ValueError(
|
||||
"Cannot delete the default profile (~/.hermes).\n"
|
||||
"To remove everything, use: hermes uninstall"
|
||||
)
|
||||
|
||||
profile_dir = get_profile_dir(name)
|
||||
if not profile_dir.is_dir():
|
||||
raise FileNotFoundError(f"Profile '{name}' does not exist.")
|
||||
|
||||
# Show what will be deleted
|
||||
model, provider = _read_config_model(profile_dir)
|
||||
gw_running = _check_gateway_running(profile_dir)
|
||||
skill_count = _count_skills(profile_dir)
|
||||
|
||||
print(f"\nProfile: {name}")
|
||||
print(f"Path: {profile_dir}")
|
||||
if model:
|
||||
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
|
||||
if skill_count:
|
||||
print(f"Skills: {skill_count}")
|
||||
|
||||
items = [
|
||||
"All config, API keys, memories, sessions, skills, cron jobs",
|
||||
]
|
||||
|
||||
# Check for service
|
||||
from hermes_cli.gateway import _profile_suffix, get_service_name
|
||||
wrapper_path = _get_wrapper_dir() / name
|
||||
has_wrapper = wrapper_path.exists()
|
||||
if has_wrapper:
|
||||
items.append(f"Command alias ({wrapper_path})")
|
||||
|
||||
print(f"\nThis will permanently delete:")
|
||||
for item in items:
|
||||
print(f" • {item}")
|
||||
if gw_running:
|
||||
print(f" ⚠ Gateway is running — it will be stopped.")
|
||||
|
||||
# Confirmation
|
||||
if not yes:
|
||||
print()
|
||||
try:
|
||||
confirm = input(f"Type '{name}' to confirm: ").strip()
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print("\nCancelled.")
|
||||
return profile_dir
|
||||
if confirm != name:
|
||||
print("Cancelled.")
|
||||
return profile_dir
|
||||
|
||||
# 1. Disable service (prevents auto-restart)
|
||||
_cleanup_gateway_service(name, profile_dir)
|
||||
|
||||
# 2. Stop running gateway
|
||||
if gw_running:
|
||||
_stop_gateway_process(profile_dir)
|
||||
|
||||
# 3. Remove wrapper script
|
||||
if has_wrapper:
|
||||
if remove_wrapper_script(name):
|
||||
print(f"✓ Removed {wrapper_path}")
|
||||
|
||||
# 4. Remove profile directory
|
||||
try:
|
||||
shutil.rmtree(profile_dir)
|
||||
print(f"✓ Removed {profile_dir}")
|
||||
except Exception as e:
|
||||
print(f"⚠ Could not remove {profile_dir}: {e}")
|
||||
|
||||
# 5. Clear active_profile if it pointed to this profile
|
||||
try:
|
||||
active = get_active_profile()
|
||||
if active == name:
|
||||
set_active_profile("default")
|
||||
print("✓ Active profile reset to default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(f"\nProfile '{name}' deleted.")
|
||||
return profile_dir
|
||||
|
||||
|
||||
def _cleanup_gateway_service(name: str, profile_dir: Path) -> None:
|
||||
"""Disable and remove systemd/launchd service for a profile."""
|
||||
import platform as _platform
|
||||
|
||||
# Derive service name for this profile
|
||||
# Temporarily set HERMES_HOME so _profile_suffix resolves correctly
|
||||
old_home = os.environ.get("HERMES_HOME")
|
||||
try:
|
||||
os.environ["HERMES_HOME"] = str(profile_dir)
|
||||
from hermes_cli.gateway import get_service_name, get_launchd_plist_path
|
||||
|
||||
if _platform.system() == "Linux":
|
||||
svc_name = get_service_name()
|
||||
svc_file = Path.home() / ".config" / "systemd" / "user" / f"{svc_name}.service"
|
||||
if svc_file.exists():
|
||||
subprocess.run(
|
||||
["systemctl", "--user", "disable", svc_name],
|
||||
capture_output=True, check=False, timeout=10,
|
||||
)
|
||||
subprocess.run(
|
||||
["systemctl", "--user", "stop", svc_name],
|
||||
capture_output=True, check=False, timeout=10,
|
||||
)
|
||||
svc_file.unlink(missing_ok=True)
|
||||
subprocess.run(
|
||||
["systemctl", "--user", "daemon-reload"],
|
||||
capture_output=True, check=False, timeout=10,
|
||||
)
|
||||
print(f"✓ Service {svc_name} removed")
|
||||
|
||||
elif _platform.system() == "Darwin":
|
||||
plist_path = get_launchd_plist_path()
|
||||
if plist_path.exists():
|
||||
subprocess.run(
|
||||
["launchctl", "unload", str(plist_path)],
|
||||
capture_output=True, check=False, timeout=10,
|
||||
)
|
||||
plist_path.unlink(missing_ok=True)
|
||||
print(f"✓ Launchd service removed")
|
||||
except Exception as e:
|
||||
print(f"⚠ Service cleanup: {e}")
|
||||
finally:
|
||||
if old_home is not None:
|
||||
os.environ["HERMES_HOME"] = old_home
|
||||
elif "HERMES_HOME" in os.environ:
|
||||
del os.environ["HERMES_HOME"]
|
||||
|
||||
|
||||
def _stop_gateway_process(profile_dir: Path) -> None:
|
||||
"""Stop a running gateway process via its PID file."""
|
||||
import signal as _signal
|
||||
import time as _time
|
||||
|
||||
pid_file = profile_dir / "gateway.pid"
|
||||
if not pid_file.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
raw = pid_file.read_text().strip()
|
||||
data = json.loads(raw) if raw.startswith("{") else {"pid": int(raw)}
|
||||
pid = int(data["pid"])
|
||||
os.kill(pid, _signal.SIGTERM)
|
||||
# Wait up to 10s for graceful shutdown
|
||||
for _ in range(20):
|
||||
_time.sleep(0.5)
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
except ProcessLookupError:
|
||||
print(f"✓ Gateway stopped (PID {pid})")
|
||||
return
|
||||
# Force kill
|
||||
try:
|
||||
os.kill(pid, _signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
print(f"✓ Gateway force-stopped (PID {pid})")
|
||||
except (ProcessLookupError, PermissionError):
|
||||
print("✓ Gateway already stopped")
|
||||
except Exception as e:
|
||||
print(f"⚠ Could not stop gateway: {e}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Active profile (sticky default)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_active_profile() -> str:
|
||||
"""Read the sticky active profile name.
|
||||
|
||||
Returns ``"default"`` if no active_profile file exists or it's empty.
|
||||
"""
|
||||
path = _get_active_profile_path()
|
||||
try:
|
||||
name = path.read_text().strip()
|
||||
if not name:
|
||||
return "default"
|
||||
return name
|
||||
except (FileNotFoundError, UnicodeDecodeError, OSError):
|
||||
return "default"
|
||||
|
||||
|
||||
def set_active_profile(name: str) -> None:
|
||||
"""Set the sticky active profile.
|
||||
|
||||
Writes to ``~/.hermes/active_profile``. Use ``"default"`` to clear.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
if name != "default" and not profile_exists(name):
|
||||
raise FileNotFoundError(
|
||||
f"Profile '{name}' does not exist. "
|
||||
f"Create it with: hermes profile create {name}"
|
||||
)
|
||||
|
||||
path = _get_active_profile_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if name == "default":
|
||||
# Remove the file to indicate default
|
||||
path.unlink(missing_ok=True)
|
||||
else:
|
||||
# Atomic write
|
||||
tmp = path.with_suffix(".tmp")
|
||||
tmp.write_text(name + "\n")
|
||||
tmp.replace(path)
|
||||
|
||||
|
||||
def get_active_profile_name() -> str:
|
||||
"""Infer the current profile name from HERMES_HOME.
|
||||
|
||||
Returns ``"default"`` if HERMES_HOME is not set or points to ``~/.hermes``.
|
||||
Returns the profile name if HERMES_HOME points into ``~/.hermes/profiles/<name>``.
|
||||
Returns ``"custom"`` if HERMES_HOME is set to an unrecognized path.
|
||||
"""
|
||||
from hermes_constants import get_hermes_home
|
||||
hermes_home = get_hermes_home()
|
||||
resolved = hermes_home.resolve()
|
||||
|
||||
default_resolved = _get_default_hermes_home().resolve()
|
||||
if resolved == default_resolved:
|
||||
return "default"
|
||||
|
||||
profiles_root = _get_profiles_root().resolve()
|
||||
try:
|
||||
rel = resolved.relative_to(profiles_root)
|
||||
parts = rel.parts
|
||||
if len(parts) == 1 and _PROFILE_ID_RE.match(parts[0]):
|
||||
return parts[0]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return "custom"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Export / Import
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def export_profile(name: str, output_path: str) -> Path:
|
||||
"""Export a profile to a tar.gz archive.
|
||||
|
||||
Returns the output file path.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
profile_dir = get_profile_dir(name)
|
||||
if not profile_dir.is_dir():
|
||||
raise FileNotFoundError(f"Profile '{name}' does not exist.")
|
||||
|
||||
output = Path(output_path)
|
||||
# shutil.make_archive wants the base name without extension
|
||||
base = str(output).removesuffix(".tar.gz").removesuffix(".tgz")
|
||||
result = shutil.make_archive(base, "gztar", str(profile_dir.parent), name)
|
||||
return Path(result)
|
||||
|
||||
|
||||
def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
|
||||
"""Import a profile from a tar.gz archive.
|
||||
|
||||
If *name* is not given, infers it from the archive's top-level directory.
|
||||
Returns the imported profile directory.
|
||||
"""
|
||||
import tarfile
|
||||
|
||||
archive = Path(archive_path)
|
||||
if not archive.exists():
|
||||
raise FileNotFoundError(f"Archive not found: {archive}")
|
||||
|
||||
# Peek at the archive to find the top-level directory name
|
||||
with tarfile.open(archive, "r:gz") as tf:
|
||||
top_dirs = {m.name.split("/")[0] for m in tf.getmembers() if "/" in m.name}
|
||||
if not top_dirs:
|
||||
top_dirs = {m.name for m in tf.getmembers() if m.isdir()}
|
||||
|
||||
inferred_name = name or (top_dirs.pop() if len(top_dirs) == 1 else None)
|
||||
if not inferred_name:
|
||||
raise ValueError(
|
||||
"Cannot determine profile name from archive. "
|
||||
"Specify it explicitly: hermes profile import <archive> --name <name>"
|
||||
)
|
||||
|
||||
validate_profile_name(inferred_name)
|
||||
profile_dir = get_profile_dir(inferred_name)
|
||||
if profile_dir.exists():
|
||||
raise FileExistsError(f"Profile '{inferred_name}' already exists at {profile_dir}")
|
||||
|
||||
profiles_root = _get_profiles_root()
|
||||
profiles_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
shutil.unpack_archive(str(archive), str(profiles_root))
|
||||
|
||||
# If the archive extracted under a different name, rename
|
||||
extracted = profiles_root / (top_dirs.pop() if top_dirs else inferred_name)
|
||||
if extracted != profile_dir and extracted.exists():
|
||||
extracted.rename(profile_dir)
|
||||
|
||||
return profile_dir
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rename
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def rename_profile(old_name: str, new_name: str) -> Path:
|
||||
"""Rename a profile: directory, wrapper script, service, active_profile.
|
||||
|
||||
Returns the new profile directory.
|
||||
"""
|
||||
validate_profile_name(old_name)
|
||||
validate_profile_name(new_name)
|
||||
|
||||
if old_name == "default":
|
||||
raise ValueError("Cannot rename the default profile.")
|
||||
if new_name == "default":
|
||||
raise ValueError("Cannot rename to 'default' — it is reserved.")
|
||||
|
||||
old_dir = get_profile_dir(old_name)
|
||||
new_dir = get_profile_dir(new_name)
|
||||
|
||||
if not old_dir.is_dir():
|
||||
raise FileNotFoundError(f"Profile '{old_name}' does not exist.")
|
||||
if new_dir.exists():
|
||||
raise FileExistsError(f"Profile '{new_name}' already exists.")
|
||||
|
||||
# 1. Stop gateway if running
|
||||
if _check_gateway_running(old_dir):
|
||||
_cleanup_gateway_service(old_name, old_dir)
|
||||
_stop_gateway_process(old_dir)
|
||||
|
||||
# 2. Rename directory
|
||||
old_dir.rename(new_dir)
|
||||
print(f"✓ Renamed {old_dir.name} → {new_dir.name}")
|
||||
|
||||
# 3. Update wrapper script
|
||||
remove_wrapper_script(old_name)
|
||||
collision = check_alias_collision(new_name)
|
||||
if not collision:
|
||||
create_wrapper_script(new_name)
|
||||
print(f"✓ Alias updated: {new_name}")
|
||||
else:
|
||||
print(f"⚠ Cannot create alias '{new_name}' — {collision}")
|
||||
|
||||
# 4. Update active_profile if it pointed to old name
|
||||
try:
|
||||
if get_active_profile() == old_name:
|
||||
set_active_profile(new_name)
|
||||
print(f"✓ Active profile updated: {new_name}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return new_dir
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tab completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def generate_bash_completion() -> str:
|
||||
"""Generate a bash completion script for hermes profile names."""
|
||||
return '''# Hermes Agent profile completion
|
||||
# Add to ~/.bashrc: eval "$(hermes completion bash)"
|
||||
|
||||
_hermes_profiles() {
|
||||
local profiles_dir="$HOME/.hermes/profiles"
|
||||
local profiles="default"
|
||||
if [ -d "$profiles_dir" ]; then
|
||||
profiles="$profiles $(ls "$profiles_dir" 2>/dev/null)"
|
||||
fi
|
||||
echo "$profiles"
|
||||
}
|
||||
|
||||
_hermes_completion() {
|
||||
local cur prev
|
||||
cur="${COMP_WORDS[COMP_CWORD]}"
|
||||
prev="${COMP_WORDS[COMP_CWORD-1]}"
|
||||
|
||||
# Complete profile names after -p / --profile
|
||||
if [[ "$prev" == "-p" || "$prev" == "--profile" ]]; then
|
||||
COMPREPLY=($(compgen -W "$(_hermes_profiles)" -- "$cur"))
|
||||
return
|
||||
fi
|
||||
|
||||
# Complete profile subcommands
|
||||
if [[ "${COMP_WORDS[1]}" == "profile" ]]; then
|
||||
case "$prev" in
|
||||
profile)
|
||||
COMPREPLY=($(compgen -W "list use create delete show alias rename export import" -- "$cur"))
|
||||
return
|
||||
;;
|
||||
use|delete|show|alias|rename|export)
|
||||
COMPREPLY=($(compgen -W "$(_hermes_profiles)" -- "$cur"))
|
||||
return
|
||||
;;
|
||||
esac
|
||||
fi
|
||||
|
||||
# Top-level subcommands
|
||||
if [[ "$COMP_CWORD" == 1 ]]; then
|
||||
local commands="chat model gateway setup status cron doctor config skills tools mcp sessions profile update version"
|
||||
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
|
||||
fi
|
||||
}
|
||||
|
||||
complete -F _hermes_completion hermes
|
||||
'''
|
||||
|
||||
|
||||
def generate_zsh_completion() -> str:
|
||||
"""Generate a zsh completion script for hermes profile names."""
|
||||
return '''#compdef hermes
|
||||
# Hermes Agent profile completion
|
||||
# Add to ~/.zshrc: eval "$(hermes completion zsh)"
|
||||
|
||||
_hermes() {
|
||||
local -a profiles
|
||||
profiles=(default)
|
||||
if [[ -d "$HOME/.hermes/profiles" ]]; then
|
||||
profiles+=("${(@f)$(ls $HOME/.hermes/profiles 2>/dev/null)}")
|
||||
fi
|
||||
|
||||
_arguments \\
|
||||
'-p[Profile name]:profile:($profiles)' \\
|
||||
'--profile[Profile name]:profile:($profiles)' \\
|
||||
'1:command:(chat model gateway setup status cron doctor config skills tools mcp sessions profile update version)' \\
|
||||
'*::arg:->args'
|
||||
|
||||
case $words[1] in
|
||||
profile)
|
||||
_arguments '1:action:(list use create delete show alias rename export import)' \\
|
||||
'2:profile:($profiles)'
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
_hermes "$@"
|
||||
'''
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile env resolution (called from _apply_profile_override)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def resolve_profile_env(profile_name: str) -> str:
|
||||
"""Resolve a profile name to a HERMES_HOME path string.
|
||||
|
||||
Called early in the CLI entry point, before any hermes modules
|
||||
are imported, to set the HERMES_HOME environment variable.
|
||||
"""
|
||||
validate_profile_name(profile_name)
|
||||
profile_dir = get_profile_dir(profile_name)
|
||||
|
||||
if profile_name != "default" and not profile_dir.is_dir():
|
||||
raise FileNotFoundError(
|
||||
f"Profile '{profile_name}' does not exist. "
|
||||
f"Create it with: hermes profile create {profile_name}"
|
||||
)
|
||||
|
||||
return str(profile_dir)
|
||||
Reference in New Issue
Block a user