372 lines
12 KiB
Python
Executable File
372 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
deploy-validate — pre-flight configuration checker for Hermes deployments.
|
|
|
|
Catches common configuration errors BEFORE they cause runtime failures.
|
|
Safe to run at any time: it only reads files and makes lightweight network
|
|
checks — it never writes state or sends messages.
|
|
|
|
Usage:
|
|
python scripts/deploy-validate # validate current environment
|
|
python scripts/deploy-validate --dry-run # alias for the same thing
|
|
python scripts/deploy-validate --env /path/to/.env
|
|
|
|
Exit codes:
|
|
0 All checks passed (or only warnings).
|
|
1 One or more blocking errors found.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import socket
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
RESET = "\033[0m"
|
|
RED = "\033[91m"
|
|
YELLOW = "\033[93m"
|
|
GREEN = "\033[92m"
|
|
BOLD = "\033[1m"
|
|
|
|
|
|
def _color(text: str, code: str) -> str:
|
|
if sys.stdout.isatty():
|
|
return f"{code}{text}{RESET}"
|
|
return text
|
|
|
|
|
|
def ok(msg: str) -> None:
|
|
print(f" {_color('✔', GREEN)} {msg}")
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
print(f" {_color('⚠', YELLOW)} {msg}")
|
|
|
|
|
|
def error(msg: str) -> None:
|
|
print(f" {_color('✘', RED)} {msg}")
|
|
|
|
|
|
def section(title: str) -> None:
|
|
print(f"\n{_color(BOLD + title, BOLD)}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# .env loader (minimal — avoids dependency on python-dotenv for portability)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _load_env_file(path: Path) -> dict[str, str]:
|
|
"""Parse a .env file and return a dict of key→value pairs."""
|
|
result: dict[str, str] = {}
|
|
if not path.exists():
|
|
return result
|
|
for line in path.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
key = key.strip()
|
|
# Strip inline comments and surrounding quotes.
|
|
value = value.split("#")[0].strip().strip("\"'")
|
|
if key:
|
|
result[key] = value
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Individual checks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_env_file(env_path: Path) -> dict[str, str]:
|
|
section("Environment file")
|
|
if not env_path.exists():
|
|
error(f".env not found at {env_path}")
|
|
error("Copy .env.example → .env and fill in your API keys.")
|
|
return {}
|
|
ok(f".env found at {env_path}")
|
|
|
|
raw = _load_env_file(env_path)
|
|
|
|
# Warn if any value looks like a placeholder.
|
|
placeholder_patterns = ("your_", "xxxx", "changeme", "todo", "replace_me")
|
|
for key, value in raw.items():
|
|
if value and any(p in value.lower() for p in placeholder_patterns):
|
|
warn(f"{key} looks like a placeholder: {value!r}")
|
|
|
|
return raw
|
|
|
|
|
|
def check_llm_key(env: dict[str, str]) -> bool:
|
|
section("LLM provider")
|
|
providers = {
|
|
"OPENROUTER_API_KEY": "OpenRouter",
|
|
"ANTHROPIC_API_KEY": "Anthropic",
|
|
"OPENAI_API_KEY": "OpenAI",
|
|
"GLM_API_KEY": "z.ai / GLM",
|
|
"KIMI_API_KEY": "Kimi / Moonshot",
|
|
"MINIMAX_API_KEY": "MiniMax",
|
|
"NOUS_API_KEY": "Nous Portal",
|
|
"HF_TOKEN": "Hugging Face",
|
|
"KILOCODE_API_KEY": "KiloCode",
|
|
"OPENCODE_ZEN_API_KEY": "OpenCode Zen",
|
|
}
|
|
found = [name for key, name in providers.items() if env.get(key, "").strip()]
|
|
if not found:
|
|
error("No LLM API key detected. Set at least one (e.g. OPENROUTER_API_KEY).")
|
|
return False
|
|
ok(f"LLM provider key present: {', '.join(found)}")
|
|
return True
|
|
|
|
|
|
def check_hermes_home(env: dict[str, str]) -> Optional[Path]:
|
|
section("HERMES_HOME data directory")
|
|
raw = env.get("HERMES_HOME") or os.environ.get("HERMES_HOME") or ""
|
|
if raw:
|
|
home = Path(raw).expanduser()
|
|
else:
|
|
home = Path.home() / ".hermes"
|
|
|
|
if not home.exists():
|
|
warn(f"HERMES_HOME does not exist yet: {home} (will be created on first run)")
|
|
return home
|
|
|
|
ok(f"HERMES_HOME exists: {home}")
|
|
|
|
required_dirs = ["logs", "sessions", "cron", "memories", "skills"]
|
|
for d in required_dirs:
|
|
if not (home / d).is_dir():
|
|
warn(f"Expected subdirectory missing: {home / d} (created automatically at runtime)")
|
|
|
|
if (home / ".env").exists():
|
|
ok(f"Data-directory .env present: {home / '.env'}")
|
|
else:
|
|
warn(f"No .env in HERMES_HOME ({home}). "
|
|
"The Docker entrypoint copies .env.example on first run; "
|
|
"for bare-metal installs copy it manually.")
|
|
|
|
return home
|
|
|
|
|
|
def check_gateway_platforms(env: dict[str, str]) -> None:
|
|
section("Messaging platform tokens")
|
|
platforms: dict[str, list[str]] = {
|
|
"Telegram": ["TELEGRAM_BOT_TOKEN"],
|
|
"Discord": ["DISCORD_BOT_TOKEN"],
|
|
"Slack": ["SLACK_BOT_TOKEN", "SLACK_APP_TOKEN"],
|
|
"WhatsApp": [], # pairing-based, no env key required
|
|
"Email": ["EMAIL_ADDRESS", "EMAIL_PASSWORD"],
|
|
}
|
|
any_found = False
|
|
for platform, keys in platforms.items():
|
|
if not keys:
|
|
continue # WhatsApp — no key check
|
|
if all(env.get(k, "").strip() for k in keys):
|
|
ok(f"{platform}: configured ({', '.join(keys)})")
|
|
any_found = True
|
|
if not any_found:
|
|
warn("No messaging platform tokens found. "
|
|
"The gateway will start but accept no inbound messages. "
|
|
"Set at least one platform token (e.g. TELEGRAM_BOT_TOKEN).")
|
|
|
|
|
|
def check_api_server_reachable(host: str = "127.0.0.1", port: int = 8642) -> None:
|
|
section("API server health check")
|
|
url = f"http://{host}:{port}/health"
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=5) as resp:
|
|
body = resp.read().decode()
|
|
if '"status"' in body and "ok" in body:
|
|
ok(f"API server healthy: {url}")
|
|
else:
|
|
warn(f"Unexpected /health response from {url}: {body[:200]}")
|
|
except urllib.error.URLError as exc:
|
|
# Not a failure — the server may not be running in --dry-run mode.
|
|
warn(f"API server not reachable at {url}: {exc.reason} "
|
|
"(expected if gateway is not running)")
|
|
except OSError as exc:
|
|
warn(f"API server not reachable at {url}: {exc}")
|
|
|
|
|
|
def check_gateway_status(hermes_home: Optional[Path]) -> None:
|
|
section("Gateway runtime status")
|
|
if hermes_home is None:
|
|
warn("HERMES_HOME unknown — skipping runtime status check.")
|
|
return
|
|
|
|
state_file = hermes_home / "gateway_state.json"
|
|
pid_file = hermes_home / "gateway.pid"
|
|
|
|
if not state_file.exists() and not pid_file.exists():
|
|
warn("Gateway does not appear to be running (no PID or state file). "
|
|
"This is expected before the first start.")
|
|
return
|
|
|
|
if state_file.exists():
|
|
import json
|
|
try:
|
|
state = json.loads(state_file.read_text())
|
|
gw_state = state.get("gateway_state", "unknown")
|
|
updated = state.get("updated_at", "?")
|
|
if gw_state == "running":
|
|
ok(f"Gateway state: {gw_state} (updated {updated})")
|
|
platforms = state.get("platforms", {})
|
|
for plat, pdata in platforms.items():
|
|
pstate = pdata.get("state", "unknown")
|
|
if pstate in ("connected", "running", "ok"):
|
|
ok(f" Platform {plat}: {pstate}")
|
|
else:
|
|
warn(f" Platform {plat}: {pstate} — {pdata.get('error_message', '')}")
|
|
elif gw_state in ("stopped", "startup_failed"):
|
|
error(f"Gateway state: {gw_state} — {state.get('exit_reason', 'no reason recorded')}")
|
|
else:
|
|
warn(f"Gateway state: {gw_state}")
|
|
except Exception as exc:
|
|
warn(f"Could not parse {state_file}: {exc}")
|
|
else:
|
|
warn("State file missing; only PID file found. Gateway may be starting.")
|
|
|
|
|
|
def check_docker_available() -> None:
|
|
section("Docker / compose availability")
|
|
for cmd in ("docker", "docker compose"):
|
|
_check_command(cmd.split()[0], cmd)
|
|
|
|
|
|
def _check_command(name: str, display: str) -> bool:
|
|
import shutil
|
|
if shutil.which(name):
|
|
ok(f"{display} found")
|
|
return True
|
|
warn(f"{display} not found in PATH (only required for Docker deployments)")
|
|
return False
|
|
|
|
|
|
def check_ports_free(ports: list[int] = None) -> None:
|
|
section("Port availability")
|
|
if ports is None:
|
|
ports = [8642]
|
|
for port in ports:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.settimeout(1)
|
|
result = s.connect_ex(("127.0.0.1", port))
|
|
if result == 0:
|
|
warn(f"Port {port} is already in use. "
|
|
"The API server will fail to bind unless you change its port.")
|
|
else:
|
|
ok(f"Port {port} is free")
|
|
|
|
|
|
def check_no_secrets_in_repo(repo_root: Path) -> None:
|
|
section("Secret hygiene")
|
|
dangerous = [".env", "*.pem", "*.key", "id_rsa", "id_ed25519"]
|
|
gitignore = repo_root / ".gitignore"
|
|
if gitignore.exists():
|
|
content = gitignore.read_text()
|
|
for pattern in [".env", "*.pem", "*.key"]:
|
|
if pattern in content or pattern.lstrip("*. ") in content:
|
|
ok(f".gitignore covers {pattern}")
|
|
else:
|
|
warn(f".gitignore does not mention {pattern}. "
|
|
"Ensure secrets are never committed.")
|
|
else:
|
|
warn("No .gitignore found. Secrets could accidentally be committed.")
|
|
|
|
# Check the env file itself isn't tracked.
|
|
env_file = repo_root / ".env"
|
|
if env_file.exists():
|
|
import subprocess
|
|
try:
|
|
out = subprocess.run(
|
|
["git", "ls-files", "--error-unmatch", ".env"],
|
|
cwd=repo_root,
|
|
capture_output=True,
|
|
)
|
|
if out.returncode == 0:
|
|
error(".env IS tracked by git! Remove it immediately: git rm --cached .env")
|
|
else:
|
|
ok(".env is not tracked by git")
|
|
except FileNotFoundError:
|
|
warn("git not found — cannot verify .env tracking status")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Pre-flight configuration validator for Hermes deployments.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run", action="store_true",
|
|
help="Alias for the default mode (no state is written regardless).",
|
|
)
|
|
parser.add_argument(
|
|
"--env", metavar="PATH",
|
|
help="Path to .env file (default: .env in repo root).",
|
|
)
|
|
parser.add_argument(
|
|
"--check-ports", action="store_true",
|
|
help="Also verify that required ports are free (useful before first start).",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-health", action="store_true",
|
|
help="Skip the live /health HTTP check (use when gateway is not running).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
print(f"\n{_color(BOLD + 'Hermes Deploy Validator', BOLD)}")
|
|
print("=" * 50)
|
|
|
|
repo_root = Path(__file__).resolve().parent.parent
|
|
env_path = Path(args.env) if args.env else repo_root / ".env"
|
|
|
|
errors_before = [0] # mutable sentinel
|
|
|
|
# Monkey-patch error() to count failures.
|
|
_original_error = globals()["error"]
|
|
error_count = 0
|
|
|
|
def counting_error(msg: str) -> None:
|
|
nonlocal error_count
|
|
error_count += 1
|
|
_original_error(msg)
|
|
|
|
globals()["error"] = counting_error
|
|
|
|
# Run checks.
|
|
env = check_env_file(env_path)
|
|
check_no_secrets_in_repo(repo_root)
|
|
llm_ok = check_llm_key(env)
|
|
hermes_home = check_hermes_home(env)
|
|
check_gateway_platforms(env)
|
|
if args.check_ports:
|
|
check_ports_free()
|
|
if not args.skip_health:
|
|
check_api_server_reachable()
|
|
check_gateway_status(hermes_home)
|
|
|
|
# Summary.
|
|
print(f"\n{'=' * 50}")
|
|
if error_count == 0:
|
|
print(_color(f"All checks passed (0 errors).", GREEN))
|
|
return 0
|
|
else:
|
|
print(_color(f"{error_count} error(s) found. Fix them before deploying.", RED))
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|