Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1e9c5fc458 feat: add wizard-bootstrap shared tooling & environment validation (Epic-004)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Failing after 14s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 1s
Tests / test (pull_request) Failing after 3s
Implements the Wizard Council Automation shared infrastructure:

- wizard-bootstrap/wizard_bootstrap.py — validates Python version, core deps,
  hermes_constants import, HERMES_HOME, LLM provider key, Gitea auth, and
  Telegram connectivity; exits 0 if forge-ready, 1 if any check fails
- wizard-bootstrap/skills_audit.py — compares repo-bundled skills against
  installed ~/.hermes/skills/, reporting MISSING / EXTRA / OUTDATED / OK;
  --fix flag syncs missing/outdated skills automatically
- wizard-bootstrap/dependency_checker.py — reads 'dependencies.binaries' and
  'dependencies.env_vars' from SKILL.md frontmatter and verifies each is
  satisfied in the current environment
- wizard-bootstrap/monthly_audit.py — runs all three checks and generates a
  Markdown report saved to ~/.hermes/wizard-council/audit-YYYY-MM.md;
  --post-telegram flag delivers the summary to the configured channel
- wizard-bootstrap/WIZARD_ENVIRONMENT_CONTRACT.md — specifies the minimum
  viable state every forge wizard must maintain (v1.0.0)
- skills/devops/wizard-council-automation/SKILL.md — skill entry so the
  toolset is discoverable and invocable from any wizard
- tests/test_wizard_bootstrap.py — 21 tests covering all three tools

Fixes #148

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-06 21:55:02 -04:00
17 changed files with 1790 additions and 1880 deletions

13
.github/CODEOWNERS vendored
View File

@@ -1,13 +0,0 @@
# Default owners for all files
* @Timmy
# Critical paths require explicit review
/gateway/ @Timmy
/tools/ @Timmy
/agent/ @Timmy
/config/ @Timmy
/scripts/ @Timmy
/.github/workflows/ @Timmy
/pyproject.toml @Timmy
/requirements.txt @Timmy
/Dockerfile @Timmy

View File

@@ -1,99 +0,0 @@
name: "🔒 Security PR Checklist"
description: "Use this when your PR touches authentication, file I/O, external API calls, or other sensitive paths."
title: "[Security Review]: "
labels: ["security", "needs-review"]
body:
- type: markdown
attributes:
value: |
## Security Pre-Merge Review
Complete this checklist before requesting review on PRs that touch **authentication, file I/O, external API calls, or secrets handling**.
- type: input
id: pr-link
attributes:
label: Pull Request
description: Link to the PR being reviewed
placeholder: "https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/pulls/XXX"
validations:
required: true
- type: dropdown
id: change-type
attributes:
label: Change Category
description: What kind of sensitive change does this PR make?
multiple: true
options:
- Authentication / Authorization
- File I/O (read/write/delete)
- External API calls (outbound HTTP/network)
- Secret / credential handling
- Command execution (subprocess/shell)
- Dependency addition or update
- Configuration changes
- CI/CD pipeline changes
validations:
required: true
- type: checkboxes
id: secrets-checklist
attributes:
label: Secrets & Credentials
options:
- label: No secrets, API keys, or credentials are hardcoded
required: true
- label: All sensitive values are loaded from environment variables or a secrets manager
required: true
- label: Test fixtures use fake/placeholder values, not real credentials
required: true
- type: checkboxes
id: input-validation-checklist
attributes:
label: Input Validation
options:
- label: All external input (user, API, file) is validated before use
required: true
- label: File paths are validated against path traversal (`../`, null bytes, absolute paths)
- label: URLs are validated for SSRF (blocked private/metadata IPs)
- label: Shell commands do not use `shell=True` with user-controlled input
- type: checkboxes
id: auth-checklist
attributes:
label: Authentication & Authorization (if applicable)
options:
- label: Authentication tokens are not logged or exposed in error messages
- label: Authorization checks happen server-side, not just client-side
- label: Session tokens are properly scoped and have expiry
- type: checkboxes
id: supply-chain-checklist
attributes:
label: Supply Chain
options:
- label: New dependencies are pinned to a specific version range
- label: Dependencies come from trusted sources (PyPI, npm, official repos)
- label: No `.pth` files or install hooks that execute arbitrary code
- label: "`pip-audit` passes (no known CVEs in added dependencies)"
- type: textarea
id: threat-model
attributes:
label: Threat Model Notes
description: |
Briefly describe the attack surface this change introduces or modifies, and how it is mitigated.
placeholder: |
This PR adds a new outbound HTTP call to the OpenRouter API.
Mitigation: URL is hardcoded (no user input), response is parsed with strict schema validation.
- type: textarea
id: testing
attributes:
label: Security Testing Done
description: What security testing did you perform?
placeholder: |
- Ran validate_security.py — all checks pass
- Tested path traversal attempts manually
- Verified no secrets in git diff

View File

@@ -1,82 +0,0 @@
name: Dependency Audit
on:
pull_request:
branches: [main]
paths:
- 'requirements.txt'
- 'pyproject.toml'
- 'uv.lock'
schedule:
- cron: '0 8 * * 1' # Weekly on Monday
workflow_dispatch:
permissions:
pull-requests: write
contents: read
jobs:
audit:
name: Audit Python dependencies
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v5
- name: Set up Python
run: uv python install 3.11
- name: Install pip-audit
run: uv pip install --system pip-audit
- name: Run pip-audit
id: audit
run: |
set -euo pipefail
# Run pip-audit against the lock file/requirements
if pip-audit --requirement requirements.txt -f json -o /tmp/audit-results.json 2>/tmp/audit-stderr.txt; then
echo "found=false" >> "$GITHUB_OUTPUT"
else
echo "found=true" >> "$GITHUB_OUTPUT"
# Check severity
CRITICAL=$(python3 -c "
import json, sys
data = json.load(open('/tmp/audit-results.json'))
vulns = data.get('dependencies', [])
for d in vulns:
for v in d.get('vulns', []):
aliases = v.get('aliases', [])
# Check for critical/high CVSS
if any('CVSS' in str(a) for a in aliases):
print('true')
sys.exit(0)
print('false')
" 2>/dev/null || echo 'false')
echo "critical=${CRITICAL}" >> "$GITHUB_OUTPUT"
fi
continue-on-error: true
- name: Post results comment
if: steps.audit.outputs.found == 'true' && github.event_name == 'pull_request'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
BODY="## ⚠️ Dependency Vulnerabilities Detected
\`pip-audit\` found vulnerable dependencies in this PR. Review and update before merging.
\`\`\`
$(cat /tmp/audit-results.json | python3 -c "
import json, sys
data = json.load(sys.stdin)
for dep in data.get('dependencies', []):
for v in dep.get('vulns', []):
print(f\" {dep['name']}=={dep['version']}: {v['id']} - {v.get('description', '')[:120]}\")
" 2>/dev/null || cat /tmp/audit-stderr.txt)
\`\`\`
---
*Automated scan by [dependency-audit](/.github/workflows/dependency-audit.yml)*"
gh pr comment "${{ github.event.pull_request.number }}" --body "$BODY"
- name: Fail on vulnerabilities
if: steps.audit.outputs.found == 'true'
run: |
echo "::error::Vulnerable dependencies detected. See PR comment for details."
cat /tmp/audit-results.json | python3 -m json.tool || true
exit 1

View File

@@ -1,114 +0,0 @@
name: Quarterly Security Audit
on:
schedule:
# Run at 08:00 UTC on the first day of each quarter (Jan, Apr, Jul, Oct)
- cron: '0 8 1 1,4,7,10 *'
workflow_dispatch:
inputs:
reason:
description: 'Reason for manual trigger'
required: false
default: 'Manual quarterly audit'
permissions:
issues: write
contents: read
jobs:
create-audit-issue:
name: Create quarterly security audit issue
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Get quarter info
id: quarter
run: |
MONTH=$(date +%-m)
YEAR=$(date +%Y)
QUARTER=$(( (MONTH - 1) / 3 + 1 ))
echo "quarter=Q${QUARTER}-${YEAR}" >> "$GITHUB_OUTPUT"
echo "year=${YEAR}" >> "$GITHUB_OUTPUT"
echo "q=${QUARTER}" >> "$GITHUB_OUTPUT"
- name: Create audit issue
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
QUARTER="${{ steps.quarter.outputs.quarter }}"
gh issue create \
--title "[$QUARTER] Quarterly Security Audit" \
--label "security,audit" \
--body "$(cat <<'BODY'
## Quarterly Security Audit — ${{ steps.quarter.outputs.quarter }}
This is the scheduled quarterly security audit for the hermes-agent project. Complete each section and close this issue when the audit is done.
**Audit Period:** ${{ steps.quarter.outputs.quarter }}
**Due:** End of quarter
**Owner:** Assign to a maintainer
---
## 1. Open Issues & PRs Audit
Review all open issues and PRs for security-relevant content. Tag any that touch attack surfaces with the `security` label.
- [ ] Review open issues older than 30 days for unaddressed security concerns
- [ ] Tag security-relevant open PRs with `needs-security-review`
- [ ] Check for any issues referencing CVEs or known vulnerabilities
- [ ] Review recently closed security issues — are fixes deployed?
## 2. Dependency Audit
- [ ] Run `pip-audit` against current `requirements.txt` / `pyproject.toml`
- [ ] Check `uv.lock` for any pinned versions with known CVEs
- [ ] Review any `git+` dependencies for recent changes or compromise signals
- [ ] Update vulnerable dependencies and open PRs for each
## 3. Critical Path Review
Review recent changes to attack-surface paths:
- [ ] `gateway/` — authentication, message routing, platform adapters
- [ ] `tools/` — file I/O, command execution, web access
- [ ] `agent/` — prompt handling, context management
- [ ] `config/` — secrets loading, configuration parsing
- [ ] `.github/workflows/` — CI/CD integrity
Run: `git log --since="3 months ago" --name-only -- gateway/ tools/ agent/ config/ .github/workflows/`
## 4. Secret Scan
- [ ] Run secret scanner on the full codebase (not just diffs)
- [ ] Verify no credentials are present in git history
- [ ] Confirm all API keys/tokens in use are rotated on a regular schedule
## 5. Access & Permissions Review
- [ ] Review who has write access to the main branch
- [ ] Confirm branch protection rules are still in place (require PR + review)
- [ ] Verify CI/CD secrets are scoped correctly (not over-permissioned)
- [ ] Review CODEOWNERS file for accuracy
## 6. Vulnerability Triage
List any new vulnerabilities found this quarter:
| ID | Component | Severity | Status | Owner |
|----|-----------|----------|--------|-------|
| | | | | |
## 7. Action Items
| Action | Owner | Due Date | Status |
|--------|-------|----------|--------|
| | | | |
---
*Auto-generated by [quarterly-security-audit](/.github/workflows/quarterly-security-audit.yml). Close this issue when the audit is complete.*
BODY
)"

View File

@@ -1,136 +0,0 @@
name: Secret Scan
on:
pull_request:
types: [opened, synchronize, reopened]
permissions:
pull-requests: write
contents: read
jobs:
scan:
name: Scan for secrets
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Fetch base branch
run: git fetch origin ${{ github.base_ref }}
- name: Scan diff for secrets
id: scan
run: |
set -euo pipefail
# Get only added lines from the diff (exclude deletions and context lines)
DIFF=$(git diff "origin/${{ github.base_ref }}"...HEAD -- \
':!*.lock' ':!uv.lock' ':!package-lock.json' ':!yarn.lock' \
| grep '^+' | grep -v '^+++' || true)
FINDINGS=""
CRITICAL=false
check() {
local label="$1"
local pattern="$2"
local critical="${3:-false}"
local matches
matches=$(echo "$DIFF" | grep -oP "$pattern" || true)
if [ -n "$matches" ]; then
FINDINGS="${FINDINGS}\n- **${label}**: pattern matched"
if [ "$critical" = "true" ]; then
CRITICAL=true
fi
fi
}
# AWS keys — critical
check "AWS Access Key" 'AKIA[0-9A-Z]{16}' true
# Private key headers — critical
check "Private Key Header" '-----BEGIN (RSA|EC|DSA|OPENSSH|PGP) PRIVATE KEY' true
# OpenAI / Anthropic style keys
check "OpenAI-style API key (sk-)" 'sk-[a-zA-Z0-9]{20,}' false
# GitHub tokens
check "GitHub personal access token (ghp_)" 'ghp_[a-zA-Z0-9]{36}' true
check "GitHub fine-grained PAT (github_pat_)" 'github_pat_[a-zA-Z0-9_]{1,}' true
# Slack tokens
check "Slack bot token (xoxb-)" 'xoxb-[0-9A-Za-z\-]{10,}' true
check "Slack user token (xoxp-)" 'xoxp-[0-9A-Za-z\-]{10,}' true
# Generic assignment patterns — exclude obvious placeholders
GENERIC=$(echo "$DIFF" | grep -iP '(api_key|apikey|api-key|secret_key|access_token|auth_token)\s*[=:]\s*['"'"'"][^'"'"'"]{20,}['"'"'"]' \
| grep -ivP '(fake|mock|test|placeholder|example|dummy|your[_-]|xxx|<|>|\{\{)' || true)
if [ -n "$GENERIC" ]; then
FINDINGS="${FINDINGS}\n- **Generic credential assignment**: possible hardcoded secret"
fi
# .env additions with long values
ENV_DIFF=$(git diff "origin/${{ github.base_ref }}"...HEAD -- '*.env' '**/.env' '.env*' \
| grep '^+' | grep -v '^+++' || true)
ENV_MATCHES=$(echo "$ENV_DIFF" | grep -P '^[A-Z_]+=.{16,}' \
| grep -ivP '(fake|mock|test|placeholder|example|dummy|your[_-]|xxx)' || true)
if [ -n "$ENV_MATCHES" ]; then
FINDINGS="${FINDINGS}\n- **.env file**: lines with potentially real secret values detected"
fi
# Write outputs
if [ -n "$FINDINGS" ]; then
echo "found=true" >> "$GITHUB_OUTPUT"
else
echo "found=false" >> "$GITHUB_OUTPUT"
fi
if [ "$CRITICAL" = "true" ]; then
echo "critical=true" >> "$GITHUB_OUTPUT"
else
echo "critical=false" >> "$GITHUB_OUTPUT"
fi
# Store findings in a file to use in comment step
printf "%b" "$FINDINGS" > /tmp/secret-findings.txt
- name: Post PR comment with findings
if: steps.scan.outputs.found == 'true' && github.event_name == 'pull_request'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
FINDINGS=$(cat /tmp/secret-findings.txt)
SEVERITY="warning"
if [ "${{ steps.scan.outputs.critical }}" = "true" ]; then
SEVERITY="CRITICAL"
fi
BODY="## Secret Scan — ${SEVERITY} findings
The automated secret scanner detected potential secrets in the diff for this PR.
### Findings
${FINDINGS}
### What to do
1. Remove any real credentials from the diff immediately.
2. If the match is a false positive (test fixture, placeholder), add a comment explaining why or rename the variable to include \`fake\`, \`mock\`, or \`test\`.
3. Rotate any exposed credentials regardless of whether this PR is merged.
---
*Automated scan by [secret-scan](/.github/workflows/secret-scan.yml)*"
gh pr comment "${{ github.event.pull_request.number }}" --body "$BODY"
- name: Fail on critical secrets
if: steps.scan.outputs.critical == 'true'
run: |
echo "::error::Critical secrets detected in diff (private keys, AWS keys, or GitHub tokens). Remove them before merging."
exit 1
- name: Warn on non-critical findings
if: steps.scan.outputs.found == 'true' && steps.scan.outputs.critical == 'false'
run: |
echo "::warning::Potential secrets detected in diff. Review the PR comment for details."

View File

@@ -1,25 +0,0 @@
repos:
# Secret detection
- repo: https://github.com/gitleaks/gitleaks
rev: v8.21.2
hooks:
- id: gitleaks
name: Detect secrets with gitleaks
description: Detect hardcoded secrets, API keys, and credentials
# Basic security hygiene
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0
hooks:
- id: check-added-large-files
args: ['--maxkb=500']
- id: detect-private-key
name: Detect private keys
- id: check-merge-conflict
- id: check-yaml
- id: check-toml
- id: end-of-file-fixer
- id: trailing-whitespace
args: ['--markdown-linebreak-ext=md']
- id: no-commit-to-branch
args: ['--branch', 'main']

View File

@@ -1,955 +0,0 @@
"""
Observatory — Testbed Health Monitoring & Alerting for Hermes Agent
Checks running services, system resources, and connectivity.
Fires Telegram alerts when thresholds are breached.
Posts daily digest reports.
Stores 30 days of historical health data in SQLite.
Usage:
python observatory.py --check # one-shot health check (stdout)
python observatory.py --daemon # continuous monitor (60s poll)
python observatory.py --digest # print / send daily digest
python observatory.py --history N # show last N health records
python observatory.py --slo # print SLO report
Configuration (env vars, falls back to ~/.hermes/.env):
OBSERVATORY_ALERT_CHAT_ID Telegram chat ID for alerts
OBSERVATORY_DIGEST_CHAT_ID Telegram chat ID for daily digest (default: alert chat)
OBSERVATORY_POLL_INTERVAL Seconds between health polls (default: 60)
OBSERVATORY_DB_PATH SQLite path (default: ~/.hermes/observatory.db)
TELEGRAM_BOT_TOKEN Bot token used to send alerts
# Threshold overrides (all optional):
OBSERVATORY_DISK_WARN_PCT Disk usage warn threshold (default: 80)
OBSERVATORY_DISK_CRIT_PCT Disk usage critical threshold (default: 90)
OBSERVATORY_MEM_WARN_PCT Memory usage warn threshold (default: 80)
OBSERVATORY_MEM_CRIT_PCT Memory usage critical threshold (default: 90)
OBSERVATORY_CPU_WARN_PCT CPU usage warn threshold (default: 80)
OBSERVATORY_CPU_CRIT_PCT CPU usage critical threshold (default: 95)
OBSERVATORY_WEBHOOK_URL Webhook endpoint to probe (default: http://127.0.0.1:8080/health)
OBSERVATORY_API_URL API server health URL (default: http://127.0.0.1:8642/health)
OBSERVATORY_WEBHOOK_LATENCY_SLO_MS Webhook latency SLO ms (default: 2000)
OBSERVATORY_GATEWAY_UPTIME_SLO_PCT Gateway uptime SLO % (default: 99.5)
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import signal
import sqlite3
import sys
import time
import urllib.request
import urllib.error
from contextlib import contextmanager
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Optional imports
# ---------------------------------------------------------------------------
try:
import psutil
_PSUTIL = True
except ImportError:
_PSUTIL = False
try:
from dotenv import load_dotenv as _load_dotenv
_DOTENV = True
except ImportError:
_DOTENV = False
logger = logging.getLogger("observatory")
# ---------------------------------------------------------------------------
# Constants & SLO definitions
# ---------------------------------------------------------------------------
RETENTION_DAYS = 30
SLO_DEFINITIONS = {
"gateway_uptime_pct": {
"description": "Gateway process uptime over the last 24 hours",
"target": 99.5,
"unit": "%",
},
"webhook_latency_ms": {
"description": "Webhook endpoint p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
"api_server_latency_ms": {
"description": "API server /health p95 response latency",
"target": 2000,
"unit": "ms",
"direction": "lower_is_better",
},
}
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
def _load_env() -> None:
"""Load .env from HERMES_HOME if dotenv is available."""
if not _DOTENV:
return
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
env_path = hermes_home / ".env"
if env_path.exists():
_load_dotenv(env_path, override=False)
# Project-level .env as dev fallback
project_env = Path(__file__).parent / ".env"
if project_env.exists():
_load_dotenv(project_env, override=False)
@dataclass
class ObservatoryConfig:
alert_chat_id: Optional[str] = None
digest_chat_id: Optional[str] = None
telegram_token: Optional[str] = None
poll_interval: int = 60
db_path: Path = field(default_factory=lambda: Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "observatory.db")
disk_warn_pct: float = 80.0
disk_crit_pct: float = 90.0
mem_warn_pct: float = 80.0
mem_crit_pct: float = 90.0
cpu_warn_pct: float = 80.0
cpu_crit_pct: float = 95.0
webhook_url: str = "http://127.0.0.1:8080/health"
api_url: str = "http://127.0.0.1:8642/health"
webhook_latency_slo_ms: float = 2000.0
gateway_uptime_slo_pct: float = 99.5
@classmethod
def from_env(cls) -> "ObservatoryConfig":
_load_env()
cfg = cls()
cfg.telegram_token = os.getenv("TELEGRAM_BOT_TOKEN")
cfg.alert_chat_id = os.getenv("OBSERVATORY_ALERT_CHAT_ID")
cfg.digest_chat_id = os.getenv("OBSERVATORY_DIGEST_CHAT_ID") or cfg.alert_chat_id
cfg.poll_interval = int(os.getenv("OBSERVATORY_POLL_INTERVAL", 60))
db_override = os.getenv("OBSERVATORY_DB_PATH")
if db_override:
cfg.db_path = Path(db_override)
cfg.disk_warn_pct = float(os.getenv("OBSERVATORY_DISK_WARN_PCT", 80))
cfg.disk_crit_pct = float(os.getenv("OBSERVATORY_DISK_CRIT_PCT", 90))
cfg.mem_warn_pct = float(os.getenv("OBSERVATORY_MEM_WARN_PCT", 80))
cfg.mem_crit_pct = float(os.getenv("OBSERVATORY_MEM_CRIT_PCT", 90))
cfg.cpu_warn_pct = float(os.getenv("OBSERVATORY_CPU_WARN_PCT", 80))
cfg.cpu_crit_pct = float(os.getenv("OBSERVATORY_CPU_CRIT_PCT", 95))
cfg.webhook_url = os.getenv("OBSERVATORY_WEBHOOK_URL", "http://127.0.0.1:8080/health")
cfg.api_url = os.getenv("OBSERVATORY_API_URL", "http://127.0.0.1:8642/health")
cfg.webhook_latency_slo_ms = float(os.getenv("OBSERVATORY_WEBHOOK_LATENCY_SLO_MS", 2000))
cfg.gateway_uptime_slo_pct = float(os.getenv("OBSERVATORY_GATEWAY_UPTIME_SLO_PCT", 99.5))
return cfg
# ---------------------------------------------------------------------------
# Health check models
# ---------------------------------------------------------------------------
@dataclass
class CheckResult:
name: str
status: str # "ok" | "warn" | "critical" | "error"
message: str
value: Optional[float] = None
unit: Optional[str] = None
extra: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthSnapshot:
ts: str # ISO8601 UTC
checks: List[CheckResult] = field(default_factory=list)
@property
def overall_status(self) -> str:
statuses = {c.status for c in self.checks}
if "critical" in statuses or "error" in statuses:
return "critical"
if "warn" in statuses:
return "warn"
return "ok"
def to_dict(self) -> Dict[str, Any]:
return {
"ts": self.ts,
"overall": self.overall_status,
"checks": [asdict(c) for c in self.checks],
}
# ---------------------------------------------------------------------------
# Individual health checks
# ---------------------------------------------------------------------------
def check_gateway_liveness() -> CheckResult:
"""Check whether the Hermes gateway process is running."""
try:
from gateway.status import is_gateway_running, get_running_pid
running = is_gateway_running()
pid = get_running_pid()
if running:
return CheckResult(
name="gateway_process",
status="ok",
message=f"Gateway running (pid={pid})",
value=float(pid) if pid else None,
)
return CheckResult(
name="gateway_process",
status="critical",
message="Gateway process is NOT running",
)
except Exception as exc:
return CheckResult(
name="gateway_process",
status="error",
message=f"Could not determine gateway status: {exc}",
)
def check_api_server_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check API server /health endpoint responsiveness."""
url = cfg.api_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
body = resp.read(512).decode("utf-8", errors="replace")
status_code = resp.status
if status_code < 400:
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
return CheckResult(
name="api_server_http",
status="ok" if slo_ok else "warn",
message=f"API server OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code, "body_preview": body[:100]},
)
return CheckResult(
name="api_server_http",
status="critical",
message=f"API server returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
# Not running is acceptable if gateway is not configured for API
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="api_server_http",
status="warn",
message=f"API server not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="api_server_http",
status="error",
message=f"API server probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_webhook_http(cfg: ObservatoryConfig) -> CheckResult:
"""Check webhook endpoint responsiveness."""
url = cfg.webhook_url
start = time.monotonic()
try:
req = urllib.request.Request(url, method="GET")
req.add_header("User-Agent", "hermes-observatory/1.0")
with urllib.request.urlopen(req, timeout=10) as resp:
latency_ms = (time.monotonic() - start) * 1000
status_code = resp.status
slo_ok = latency_ms <= cfg.webhook_latency_slo_ms
if status_code < 400:
return CheckResult(
name="webhook_http",
status="ok" if slo_ok else "warn",
message=f"Webhook OK ({latency_ms:.0f}ms){'' if slo_ok else ' — exceeds latency SLO'}",
value=latency_ms,
unit="ms",
extra={"status_code": status_code},
)
return CheckResult(
name="webhook_http",
status="critical",
message=f"Webhook returned HTTP {status_code}",
value=latency_ms,
unit="ms",
)
except urllib.error.URLError as exc:
latency_ms = (time.monotonic() - start) * 1000
reason = str(exc.reason) if hasattr(exc, "reason") else str(exc)
if "Connection refused" in reason or "Connection reset" in reason:
return CheckResult(
name="webhook_http",
status="warn",
message=f"Webhook not reachable at {url} (not started?)",
value=latency_ms,
unit="ms",
)
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe error: {exc}",
value=latency_ms,
unit="ms",
)
except Exception as exc:
latency_ms = (time.monotonic() - start) * 1000
return CheckResult(
name="webhook_http",
status="error",
message=f"Webhook probe exception: {exc}",
value=latency_ms,
unit="ms",
)
def check_disk(cfg: ObservatoryConfig) -> CheckResult:
"""Check disk usage on the HERMES_HOME filesystem."""
if not _PSUTIL:
return CheckResult(name="disk", status="error", message="psutil not installed")
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
path = str(hermes_home) if hermes_home.exists() else "/"
usage = psutil.disk_usage(path)
pct = usage.percent
free_gb = usage.free / (1024 ** 3)
if pct >= cfg.disk_crit_pct:
status = "critical"
elif pct >= cfg.disk_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="disk",
status=status,
message=f"Disk {pct:.1f}% used ({free_gb:.1f}GB free)",
value=pct,
unit="%",
extra={"free_bytes": usage.free, "total_bytes": usage.total},
)
except Exception as exc:
return CheckResult(name="disk", status="error", message=f"Disk check error: {exc}")
def check_memory(cfg: ObservatoryConfig) -> CheckResult:
"""Check system memory usage."""
if not _PSUTIL:
return CheckResult(name="memory", status="error", message="psutil not installed")
try:
mem = psutil.virtual_memory()
pct = mem.percent
available_gb = mem.available / (1024 ** 3)
if pct >= cfg.mem_crit_pct:
status = "critical"
elif pct >= cfg.mem_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="memory",
status=status,
message=f"Memory {pct:.1f}% used ({available_gb:.1f}GB available)",
value=pct,
unit="%",
extra={"available_bytes": mem.available, "total_bytes": mem.total},
)
except Exception as exc:
return CheckResult(name="memory", status="error", message=f"Memory check error: {exc}")
def check_cpu(cfg: ObservatoryConfig) -> CheckResult:
"""Check CPU usage (1-second sample)."""
if not _PSUTIL:
return CheckResult(name="cpu", status="error", message="psutil not installed")
try:
pct = psutil.cpu_percent(interval=1)
if pct >= cfg.cpu_crit_pct:
status = "critical"
elif pct >= cfg.cpu_warn_pct:
status = "warn"
else:
status = "ok"
return CheckResult(
name="cpu",
status=status,
message=f"CPU {pct:.1f}%",
value=pct,
unit="%",
)
except Exception as exc:
return CheckResult(name="cpu", status="error", message=f"CPU check error: {exc}")
def check_database(cfg: ObservatoryConfig) -> CheckResult:
"""Check observatory SQLite DB connectivity and size."""
db_path = cfg.db_path
try:
if not db_path.exists():
return CheckResult(
name="database",
status="warn",
message=f"Observatory DB not yet created at {db_path}",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
conn.execute("SELECT count(*) FROM health_snapshots").fetchone()
conn.close()
return CheckResult(
name="database",
status="ok",
message=f"Observatory DB OK ({size_kb:.1f}KB)",
value=size_kb,
unit="KB",
extra={"path": str(db_path)},
)
except Exception as exc:
return CheckResult(
name="database",
status="error",
message=f"DB check error: {exc}",
)
def check_response_store_db() -> CheckResult:
"""Check the API server's SQLite response store DB if it exists."""
try:
hermes_home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
db_path = hermes_home / "response_store.db"
if not db_path.exists():
return CheckResult(
name="response_store_db",
status="ok",
message="Response store DB not present (API server not yet used)",
)
size_kb = db_path.stat().st_size / 1024
conn = sqlite3.connect(str(db_path), timeout=5)
count = conn.execute("SELECT count(*) FROM responses").fetchone()[0]
conn.close()
return CheckResult(
name="response_store_db",
status="ok",
message=f"Response store DB OK ({count} responses, {size_kb:.1f}KB)",
value=size_kb,
unit="KB",
)
except Exception as exc:
return CheckResult(
name="response_store_db",
status="error",
message=f"Response store DB error: {exc}",
)
# ---------------------------------------------------------------------------
# Snapshot collector
# ---------------------------------------------------------------------------
def collect_snapshot(cfg: ObservatoryConfig) -> HealthSnapshot:
"""Run all checks and return a HealthSnapshot."""
ts = datetime.now(timezone.utc).isoformat()
checks = [
check_gateway_liveness(),
check_api_server_http(cfg),
check_webhook_http(cfg),
check_disk(cfg),
check_memory(cfg),
check_cpu(cfg),
check_database(cfg),
check_response_store_db(),
]
return HealthSnapshot(ts=ts, checks=checks)
# ---------------------------------------------------------------------------
# SQLite persistence
# ---------------------------------------------------------------------------
@contextmanager
def _db(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=10)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
finally:
conn.close()
def _init_db(path: Path) -> None:
"""Create tables if they don't exist."""
with _db(path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS health_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
overall TEXT NOT NULL,
payload TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_snapshots_ts ON health_snapshots(ts)")
conn.execute("""
CREATE TABLE IF NOT EXISTS alerts_sent (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts TEXT NOT NULL,
check_name TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_alerts_ts ON alerts_sent(ts)")
def store_snapshot(cfg: ObservatoryConfig, snapshot: HealthSnapshot) -> None:
"""Persist snapshot to SQLite."""
_init_db(cfg.db_path)
payload = json.dumps(snapshot.to_dict())
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
(snapshot.ts, snapshot.overall_status, payload),
)
# Prune records older than RETENTION_DAYS
cutoff = (datetime.now(timezone.utc) - timedelta(days=RETENTION_DAYS)).isoformat()
conn.execute("DELETE FROM health_snapshots WHERE ts < ?", (cutoff,))
def record_alert_sent(cfg: ObservatoryConfig, check_name: str, status: str, message: str) -> None:
"""Record that an alert was dispatched."""
_init_db(cfg.db_path)
with _db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO alerts_sent (ts, check_name, status, message) VALUES (?, ?, ?, ?)",
(datetime.now(timezone.utc).isoformat(), check_name, status, message),
)
def load_snapshots(cfg: ObservatoryConfig, days: int = RETENTION_DAYS) -> List[Dict[str, Any]]:
"""Load snapshots from the last N days."""
if not cfg.db_path.exists():
return []
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
with _db(cfg.db_path) as conn:
rows = conn.execute(
"SELECT ts, overall, payload FROM health_snapshots WHERE ts >= ? ORDER BY ts DESC",
(cutoff,),
).fetchall()
return [json.loads(row[2]) for row in rows]
# ---------------------------------------------------------------------------
# Alerting
# ---------------------------------------------------------------------------
def _telegram_send(token: str, chat_id: str, text: str) -> bool:
"""Send a Telegram message via the Bot API. Returns True on success."""
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = json.dumps({
"chat_id": chat_id,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}).encode("utf-8")
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("User-Agent", "hermes-observatory/1.0")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
body = json.loads(resp.read())
return bool(body.get("ok"))
except Exception as exc:
logger.warning("Telegram send failed: %s", exc)
return False
def _status_emoji(status: str) -> str:
return {"ok": "", "warn": "⚠️", "critical": "🔴", "error": ""}.get(status, "")
def maybe_alert(cfg: ObservatoryConfig, snapshot: HealthSnapshot, prev_snapshot: Optional[HealthSnapshot]) -> List[str]:
"""
Fire Telegram alerts for newly degraded checks.
Returns list of alert messages sent.
"""
if not cfg.telegram_token or not cfg.alert_chat_id:
return []
alerts_sent = []
prev_statuses: Dict[str, str] = {}
if prev_snapshot:
for c in prev_snapshot.checks:
prev_statuses[c.name] = c.status
for check in snapshot.checks:
if check.status in ("critical", "error"):
prev = prev_statuses.get(check.name, "ok")
if prev not in ("critical", "error"):
# Newly degraded — alert
emoji = _status_emoji(check.status)
msg = (
f"{emoji} <b>Hermes Observatory Alert</b>\n\n"
f"<b>Check:</b> {check.name}\n"
f"<b>Status:</b> {check.status.upper()}\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, check.status, check.message)
logger.info("Alert sent for %s (%s)", check.name, check.status)
elif check.status == "ok":
prev = prev_statuses.get(check.name)
if prev in ("critical", "error"):
# Recovery alert
msg = (
f"✅ <b>Hermes Observatory — Recovery</b>\n\n"
f"<b>Check:</b> {check.name} has recovered\n"
f"<b>Message:</b> {check.message}\n"
f"<b>Time:</b> {snapshot.ts}"
)
if _telegram_send(cfg.telegram_token, cfg.alert_chat_id, msg):
alerts_sent.append(msg)
record_alert_sent(cfg, check.name, "recovery", check.message)
return alerts_sent
# ---------------------------------------------------------------------------
# Daily digest
# ---------------------------------------------------------------------------
def build_digest(cfg: ObservatoryConfig) -> str:
"""Build a daily health digest from stored snapshots."""
snapshots = load_snapshots(cfg, days=1)
total = len(snapshots)
if total == 0:
return "No health data available for the last 24 hours."
# Count by overall status
status_counts: Dict[str, int] = {"ok": 0, "warn": 0, "critical": 0, "error": 0}
check_degraded_counts: Dict[str, int] = {}
latencies: Dict[str, List[float]] = {}
for snap in snapshots:
overall = snap.get("overall", "ok")
status_counts[overall] = status_counts.get(overall, 0) + 1
for check in snap.get("checks", []):
name = check["name"]
status = check["status"]
if status in ("critical", "error", "warn"):
check_degraded_counts[name] = check_degraded_counts.get(name, 0) + 1
value = check.get("value")
unit = check.get("unit")
if value is not None and unit == "ms":
if name not in latencies:
latencies[name] = []
latencies[name].append(float(value))
uptime_pct = 100.0 * status_counts["ok"] / total if total else 0.0
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
lines = [
f"📊 <b>Hermes Observatory — Daily Digest</b>",
f"<b>Generated:</b> {now}",
f"",
f"<b>Last 24h Summary</b> ({total} samples)",
f" Healthy: {status_counts['ok']} ({100*status_counts['ok']//total if total else 0}%)",
f" Warning: {status_counts.get('warn', 0)}",
f" Critical: {status_counts.get('critical', 0)}",
f" Error: {status_counts.get('error', 0)}",
f"",
]
# SLO status
lines.append("<b>SLO Status</b>")
gw_uptime_target = cfg.gateway_uptime_slo_pct
gw_snapshots = [
s for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok" for c in s.get("checks", []))
]
gw_uptime = 100.0 * len(gw_snapshots) / total if total else 0.0
gw_ok = gw_uptime >= gw_uptime_target
lines.append(
f" {'' if gw_ok else ''} Gateway uptime: {gw_uptime:.1f}% (target: ≥{gw_uptime_target}%)"
)
wh_latency_target = cfg.webhook_latency_slo_ms
if "webhook_http" in latencies and latencies["webhook_http"]:
wh_vals = sorted(latencies["webhook_http"])
p95_idx = int(len(wh_vals) * 0.95)
p95 = wh_vals[min(p95_idx, len(wh_vals) - 1)]
wh_ok = p95 <= wh_latency_target
lines.append(
f" {'' if wh_ok else ''} Webhook p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
else:
lines.append(f" ⚫ Webhook latency: no data")
if "api_server_http" in latencies and latencies["api_server_http"]:
api_vals = sorted(latencies["api_server_http"])
p95_idx = int(len(api_vals) * 0.95)
p95 = api_vals[min(p95_idx, len(api_vals) - 1)]
api_ok = p95 <= wh_latency_target
lines.append(
f" {'' if api_ok else ''} API server p95 latency: {p95:.0f}ms (target: ≤{wh_latency_target:.0f}ms)"
)
# Top degraded checks
if check_degraded_counts:
lines.append("")
lines.append("<b>Degraded Checks (24h)</b>")
for name, count in sorted(check_degraded_counts.items(), key=lambda x: -x[1]):
pct = 100 * count // total if total else 0
lines.append(f"{name}: {count} incidents ({pct}%)")
lines.append("")
lines.append(f"<i>Observatory DB: {cfg.db_path}</i>")
return "\n".join(lines)
def send_digest(cfg: ObservatoryConfig) -> bool:
"""Build and send the daily digest to Telegram. Returns True on success."""
digest = build_digest(cfg)
if cfg.telegram_token and cfg.digest_chat_id:
return _telegram_send(cfg.telegram_token, cfg.digest_chat_id, digest)
return False
# ---------------------------------------------------------------------------
# Display helpers
# ---------------------------------------------------------------------------
_STATUS_COLORS = {
"ok": "\033[32m", # green
"warn": "\033[33m", # yellow
"critical": "\033[31m", # red
"error": "\033[91m", # bright red
}
_RESET = "\033[0m"
def _color_status(status: str) -> str:
c = _STATUS_COLORS.get(status, "")
return f"{c}{status.upper()}{_RESET}"
def print_snapshot(snapshot: HealthSnapshot) -> None:
overall_color = _STATUS_COLORS.get(snapshot.overall_status, "")
print(f"\n{'='*60}")
print(f" Hermes Observatory — {snapshot.ts}")
print(f" Overall: {overall_color}{snapshot.overall_status.upper()}{_RESET}")
print(f"{'='*60}")
for check in snapshot.checks:
emoji = _status_emoji(check.status)
val_str = f" [{check.value:.1f}{check.unit}]" if check.value is not None and check.unit else ""
print(f" {emoji} {check.name:<25} {_color_status(check.status):<15} {check.message}{val_str}")
print()
def print_slo_report(cfg: ObservatoryConfig) -> None:
"""Print current SLO definitions and targets."""
snapshots = load_snapshots(cfg, days=30)
total = len(snapshots)
print(f"\n{'='*60}")
print(" Hermes Observatory — SLO Report (last 30 days)")
print(f"{'='*60}")
for slo_key, slo in SLO_DEFINITIONS.items():
print(f"\n {slo['description']}")
print(f" Target: {slo['target']}{slo['unit']}")
if total == 0:
print(f" Status: no data")
continue
if slo_key == "gateway_uptime_pct":
ok_count = sum(
1 for s in snapshots
if any(c["name"] == "gateway_process" and c["status"] == "ok"
for c in s.get("checks", []))
)
actual = 100.0 * ok_count / total
met = actual >= slo["target"]
print(f" Actual: {actual:.2f}% {'✅ MET' if met else '❌ MISSED'}")
elif slo_key in ("webhook_latency_ms", "api_server_http_latency_ms"):
check_name = "webhook_http" if "webhook" in slo_key else "api_server_http"
vals = [
float(c["value"])
for s in snapshots
for c in s.get("checks", [])
if c["name"] == check_name and c.get("value") is not None
]
if vals:
vals.sort()
p95_idx = int(len(vals) * 0.95)
p95 = vals[min(p95_idx, len(vals) - 1)]
met = p95 <= slo["target"]
print(f" p95: {p95:.0f}ms {'✅ MET' if met else '❌ MISSED'}")
else:
print(f" Status: no latency data")
print()
def print_history(cfg: ObservatoryConfig, count: int = 20) -> None:
"""Print recent health records."""
snapshots = load_snapshots(cfg, days=RETENTION_DAYS)[:count]
if not snapshots:
print("No history available.")
return
print(f"\n{'='*60}")
print(f" Last {min(count, len(snapshots))} health records")
print(f"{'='*60}")
for snap in snapshots:
ts = snap.get("ts", "?")
overall = snap.get("overall", "?")
emoji = _status_emoji(overall)
degraded = [c["name"] for c in snap.get("checks", []) if c["status"] != "ok"]
degraded_str = f" — issues: {', '.join(degraded)}" if degraded else ""
print(f" {emoji} {ts} {overall.upper()}{degraded_str}")
print()
# ---------------------------------------------------------------------------
# Daemon mode
# ---------------------------------------------------------------------------
class Observatory:
"""Continuous monitoring daemon."""
def __init__(self, cfg: ObservatoryConfig):
self.cfg = cfg
self._running = False
self._prev_snapshot: Optional[HealthSnapshot] = None
def _handle_signal(self, signum: int, frame: Any) -> None:
logger.info("Received signal %d, shutting down...", signum)
self._running = False
def run_once(self) -> HealthSnapshot:
snapshot = collect_snapshot(self.cfg)
store_snapshot(self.cfg, snapshot)
alerts = maybe_alert(self.cfg, snapshot, self._prev_snapshot)
if alerts:
logger.info("Sent %d alert(s)", len(alerts))
self._prev_snapshot = snapshot
return snapshot
def run(self) -> None:
_init_db(self.cfg.db_path)
logger.info(
"Observatory starting — poll_interval=%ds db=%s",
self.cfg.poll_interval,
self.cfg.db_path,
)
self._running = True
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
while self._running:
try:
snapshot = self.run_once()
logger.info("Health check: %s", snapshot.overall_status)
except Exception as exc:
logger.error("Health check failed: %s", exc, exc_info=True)
if self._running:
time.sleep(self.cfg.poll_interval)
logger.info("Observatory stopped.")
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Hermes Observatory — health monitoring & alerting",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--check", action="store_true", help="Run one health check and print results")
parser.add_argument("--daemon", action="store_true", help="Run as continuous monitoring daemon")
parser.add_argument("--digest", action="store_true", help="Print (and optionally send) daily digest")
parser.add_argument("--history", type=int, metavar="N", help="Show last N health records")
parser.add_argument("--slo", action="store_true", help="Print SLO report")
parser.add_argument("--send-digest", action="store_true", help="Send daily digest via Telegram")
parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")
args = parser.parse_args(argv)
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)s [observatory] %(message)s",
)
cfg = ObservatoryConfig.from_env()
_init_db(cfg.db_path)
if args.check:
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if args.daemon:
obs = Observatory(cfg)
obs.run()
return 0
if args.digest or args.send_digest:
digest = build_digest(cfg)
print(digest)
if args.send_digest:
ok = send_digest(cfg)
if ok:
print("\n[Digest sent to Telegram]")
else:
print("\n[Telegram send skipped — token/chat_id not configured]")
return 0
if args.history is not None:
print_history(cfg, args.history)
return 0
if args.slo:
print_slo_report(cfg)
return 0
# Default: one-shot check
snapshot = collect_snapshot(cfg)
store_snapshot(cfg, snapshot)
print_snapshot(snapshot)
return 0 if snapshot.overall_status == "ok" else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -42,7 +42,6 @@ dependencies = [
modal = ["modal>=1.0.0,<2"]
daytona = ["daytona>=0.148.0,<1"]
dev = ["pytest>=9.0.2,<10", "pytest-asyncio>=1.3.0,<2", "pytest-xdist>=3.0,<4", "mcp>=1.2.0,<2"]
observatory = ["psutil>=5.9.0,<7"]
messaging = ["python-telegram-bot>=22.6,<23", "discord.py[voice]>=2.7.1,<3", "aiohttp>=3.13.3,<4", "slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]
cron = ["croniter>=6.0.0,<7"]
slack = ["slack-bolt>=1.18.0,<2", "slack-sdk>=3.27.0,<4"]

View File

@@ -0,0 +1,106 @@
---
name: wizard-council-automation
description: Run wizard environment validation, skills drift audit, and cross-wizard dependency checks — the Wizard Council shared tooling suite
version: 1.0.0
metadata:
hermes:
tags: [devops, wizards, environment, audit, bootstrap]
related_skills: []
---
# Wizard Council Automation
This skill gives you access to the shared forge tooling for environment
validation, skill drift detection, and cross-wizard dependency checking.
## Tools
All tools live in `wizard-bootstrap/` in the hermes-agent repo root.
### 1. Environment Bootstrap (`wizard_bootstrap.py`)
Validates the full wizard environment in one command:
```bash
python wizard-bootstrap/wizard_bootstrap.py
python wizard-bootstrap/wizard_bootstrap.py --json
```
Checks:
- Python version (>=3.11)
- Core dependency imports
- hermes_constants smoke test
- HERMES_HOME existence and writability
- LLM provider API key
- Gitea authentication (GITEA_TOKEN / FORGE_TOKEN)
- Telegram bot connectivity (TELEGRAM_BOT_TOKEN)
Exits 0 if all checks pass, 1 if any fail.
### 2. Skills Drift Audit (`skills_audit.py`)
Compares repo-bundled skills against installed skills:
```bash
python wizard-bootstrap/skills_audit.py # detect drift
python wizard-bootstrap/skills_audit.py --fix # sync missing/outdated
python wizard-bootstrap/skills_audit.py --diff # show diffs for outdated
python wizard-bootstrap/skills_audit.py --json # machine-readable output
```
Reports: MISSING, EXTRA, OUTDATED, OK.
### 3. Dependency Checker (`dependency_checker.py`)
Validates binary and env-var dependencies declared in SKILL.md frontmatter:
```bash
python wizard-bootstrap/dependency_checker.py
python wizard-bootstrap/dependency_checker.py --skill devops/my-skill
```
Skills declare deps in their frontmatter:
```yaml
dependencies:
binaries: [ffmpeg, imagemagick]
env_vars: [MY_API_KEY]
```
### 4. Monthly Audit (`monthly_audit.py`)
Runs all three checks and generates a Markdown report:
```bash
python wizard-bootstrap/monthly_audit.py
python wizard-bootstrap/monthly_audit.py --post-telegram
```
Report saved to `~/.hermes/wizard-council/audit-YYYY-MM.md`.
## Wizard Environment Contract
See `wizard-bootstrap/WIZARD_ENVIRONMENT_CONTRACT.md` for the full
specification of what every forge wizard must maintain.
## Workflow
### New Wizard Onboarding
1. Clone the hermes-agent repo
2. Install dependencies: `uv pip install -r requirements.txt`
3. Run: `python wizard-bootstrap/wizard_bootstrap.py`
4. Resolve all failures
5. Go online
### Ongoing Maintenance
1. Monthly audit fires automatically via cron
2. Report posted to wizard-council-automation channel
3. Wizards resolve any drift before next audit
### When Drift Is Detected
1. Run `python wizard-bootstrap/skills_audit.py` to identify drift
2. Run `python wizard-bootstrap/skills_audit.py --fix` to sync
3. Run `python wizard-bootstrap/dependency_checker.py` to check deps
4. Update SKILL.md frontmatter with any new binary/env_var requirements

View File

@@ -1,455 +0,0 @@
"""
Tests for observatory.py — health monitoring & alerting.
Refs #147
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import tempfile
import time
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
PROJECT_ROOT = Path(__file__).parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
import observatory as obs
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cfg(tmp_path):
"""Return an ObservatoryConfig pointing at a temp directory."""
cfg = obs.ObservatoryConfig()
cfg.db_path = tmp_path / "observatory.db"
cfg.alert_chat_id = "99999"
cfg.digest_chat_id = "99999"
cfg.telegram_token = "fake-token"
cfg.webhook_url = "http://127.0.0.1:19999/health" # port never bound
cfg.api_url = "http://127.0.0.1:19998/health"
return cfg
# ---------------------------------------------------------------------------
# Config tests
# ---------------------------------------------------------------------------
class TestObservatoryConfig:
def test_defaults(self):
c = obs.ObservatoryConfig()
assert c.disk_warn_pct == 80.0
assert c.disk_crit_pct == 90.0
assert c.mem_warn_pct == 80.0
assert c.mem_crit_pct == 90.0
assert c.cpu_warn_pct == 80.0
assert c.cpu_crit_pct == 95.0
assert c.poll_interval == 60
assert c.webhook_latency_slo_ms == 2000.0
assert c.gateway_uptime_slo_pct == 99.5
def test_from_env_overrides(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_DISK_WARN_PCT", "70")
monkeypatch.setenv("OBSERVATORY_POLL_INTERVAL", "30")
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "12345")
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "tok123")
c = obs.ObservatoryConfig.from_env()
assert c.disk_warn_pct == 70.0
assert c.poll_interval == 30
assert c.alert_chat_id == "12345"
assert c.telegram_token == "tok123"
def test_digest_chat_falls_back_to_alert(self, monkeypatch):
monkeypatch.setenv("OBSERVATORY_ALERT_CHAT_ID", "abc")
monkeypatch.delenv("OBSERVATORY_DIGEST_CHAT_ID", raising=False)
c = obs.ObservatoryConfig.from_env()
assert c.digest_chat_id == "abc"
# ---------------------------------------------------------------------------
# CheckResult / HealthSnapshot tests
# ---------------------------------------------------------------------------
class TestHealthSnapshot:
def _make_snapshot(self, statuses):
checks = [obs.CheckResult(name=f"c{i}", status=s, message="") for i, s in enumerate(statuses)]
return obs.HealthSnapshot(ts="2026-01-01T00:00:00+00:00", checks=checks)
def test_overall_ok(self):
snap = self._make_snapshot(["ok", "ok"])
assert snap.overall_status == "ok"
def test_overall_warn(self):
snap = self._make_snapshot(["ok", "warn"])
assert snap.overall_status == "warn"
def test_overall_critical(self):
snap = self._make_snapshot(["ok", "warn", "critical"])
assert snap.overall_status == "critical"
def test_overall_error(self):
snap = self._make_snapshot(["ok", "error"])
assert snap.overall_status == "critical"
def test_to_dict(self):
snap = self._make_snapshot(["ok"])
d = snap.to_dict()
assert d["overall"] == "ok"
assert isinstance(d["checks"], list)
assert d["checks"][0]["name"] == "c0"
# ---------------------------------------------------------------------------
# Individual check tests
# ---------------------------------------------------------------------------
class TestCheckGatewayLiveness:
def test_running(self):
with patch("gateway.status.is_gateway_running", return_value=True), \
patch("gateway.status.get_running_pid", return_value=12345):
result = obs.check_gateway_liveness()
assert result.status == "ok"
assert "12345" in result.message
def test_not_running(self):
with patch("gateway.status.is_gateway_running", return_value=False), \
patch("gateway.status.get_running_pid", return_value=None):
result = obs.check_gateway_liveness()
assert result.status == "critical"
def test_import_error(self):
import builtins
real_import = builtins.__import__
def mock_import(name, *args, **kwargs):
if name == "gateway.status":
raise ImportError("no module")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=mock_import):
result = obs.check_gateway_liveness()
assert result.status in ("error", "critical", "ok") # graceful
class TestCheckDisk:
def test_ok(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 50.0
mock_usage.free = 10 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "ok"
assert result.value == 50.0
def test_warn(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 85.0
mock_usage.free = 3 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
mock_usage = MagicMock()
mock_usage.percent = 92.0
mock_usage.free = 1 * 1024 ** 3
mock_usage.total = 20 * 1024 ** 3
with patch("psutil.disk_usage", return_value=mock_usage):
result = obs.check_disk(cfg)
assert result.status == "critical"
def test_no_psutil(self, cfg, monkeypatch):
monkeypatch.setattr(obs, "_PSUTIL", False)
result = obs.check_disk(cfg)
assert result.status == "error"
class TestCheckMemory:
def test_ok(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 60.0
mock_mem.available = 4 * 1024 ** 3
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "ok"
def test_critical(self, cfg):
mock_mem = MagicMock()
mock_mem.percent = 95.0
mock_mem.available = 512 * 1024 ** 2
mock_mem.total = 16 * 1024 ** 3
with patch("psutil.virtual_memory", return_value=mock_mem):
result = obs.check_memory(cfg)
assert result.status == "critical"
class TestCheckCPU:
def test_ok(self, cfg):
with patch("psutil.cpu_percent", return_value=40.0):
result = obs.check_cpu(cfg)
assert result.status == "ok"
def test_warn(self, cfg):
with patch("psutil.cpu_percent", return_value=85.0):
result = obs.check_cpu(cfg)
assert result.status == "warn"
def test_critical(self, cfg):
with patch("psutil.cpu_percent", return_value=98.0):
result = obs.check_cpu(cfg)
assert result.status == "critical"
class TestCheckDatabase:
def test_ok(self, cfg):
obs._init_db(cfg.db_path)
result = obs.check_database(cfg)
assert result.status == "ok"
def test_not_yet_created(self, cfg):
# db_path does not exist
result = obs.check_database(cfg)
assert result.status == "warn"
class TestCheckHTTP:
def test_webhook_connection_refused(self, cfg):
result = obs.check_webhook_http(cfg)
# Port 19999 is not bound — should get a "not reachable" warn
assert result.status in ("warn", "error")
def test_api_server_connection_refused(self, cfg):
result = obs.check_api_server_http(cfg)
assert result.status in ("warn", "error")
def test_webhook_ok(self, cfg):
import urllib.error
from unittest.mock import patch, MagicMock
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 200
mock_resp.read.return_value = b'{"status":"ok"}'
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status in ("ok", "warn")
def test_webhook_http_error(self, cfg):
mock_resp = MagicMock()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.status = 503
with patch("urllib.request.urlopen", return_value=mock_resp):
result = obs.check_webhook_http(cfg)
assert result.status == "critical"
# ---------------------------------------------------------------------------
# Persistence tests
# ---------------------------------------------------------------------------
class TestPersistence:
def test_store_and_load(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone
ts = datetime.now(timezone.utc).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[obs.CheckResult(name="test", status="ok", message="fine")],
)
obs.store_snapshot(cfg, snap)
loaded = obs.load_snapshots(cfg, days=30)
assert len(loaded) == 1
assert loaded[0]["overall"] == "ok"
def test_retention_pruning(self, cfg):
obs._init_db(cfg.db_path)
# Insert an old record directly
with obs._db(cfg.db_path) as conn:
conn.execute(
"INSERT INTO health_snapshots (ts, overall, payload) VALUES (?, ?, ?)",
("2000-01-01T00:00:00+00:00", "ok", '{"ts":"2000-01-01T00:00:00+00:00","overall":"ok","checks":[]}'),
)
snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[],
)
obs.store_snapshot(cfg, snap)
# Old record should have been pruned
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM health_snapshots WHERE ts < '2001-01-01'").fetchone()[0]
assert count == 0
def test_record_alert_sent(self, cfg):
obs._init_db(cfg.db_path)
obs.record_alert_sent(cfg, "gateway_process", "critical", "not running")
with obs._db(cfg.db_path) as conn:
count = conn.execute("SELECT count(*) FROM alerts_sent").fetchone()[0]
assert count == 1
# ---------------------------------------------------------------------------
# Alerting tests
# ---------------------------------------------------------------------------
class TestAlerting:
def _snap(self, status):
return obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status=status, message="test")],
)
def test_no_alert_when_ok(self, cfg):
snap = self._snap("ok")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_alert_on_new_critical(self, cfg):
snap = self._snap("critical")
prev = self._snap("ok")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
assert len(alerts) == 1
def test_no_duplicate_alert(self, cfg):
snap = self._snap("critical")
prev = self._snap("critical") # already critical
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_not_called()
assert alerts == []
def test_recovery_alert(self, cfg):
snap = self._snap("ok")
prev = self._snap("critical")
obs._init_db(cfg.db_path)
with patch("observatory._telegram_send", return_value=True) as mock_send:
alerts = obs.maybe_alert(cfg, snap, prev)
mock_send.assert_called_once()
def test_no_alert_without_token(self, cfg):
cfg.telegram_token = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
def test_no_alert_without_chat_id(self, cfg):
cfg.alert_chat_id = None
snap = self._snap("critical")
obs._init_db(cfg.db_path)
alerts = obs.maybe_alert(cfg, snap, None)
assert alerts == []
# ---------------------------------------------------------------------------
# Digest tests
# ---------------------------------------------------------------------------
class TestDigest:
def test_empty_digest(self, cfg):
obs._init_db(cfg.db_path)
digest = obs.build_digest(cfg)
assert "no health data" in digest.lower() or "24 hours" in digest.lower()
def test_digest_with_data(self, cfg):
obs._init_db(cfg.db_path)
from datetime import datetime, timezone, timedelta
ts = (datetime.now(timezone.utc) - timedelta(hours=1)).isoformat()
snap = obs.HealthSnapshot(
ts=ts,
checks=[
obs.CheckResult(name="gateway_process", status="ok", message="running"),
obs.CheckResult(name="disk", status="ok", message="50% used", value=50.0, unit="%"),
obs.CheckResult(name="webhook_http", status="ok", message="ok", value=150.0, unit="ms"),
],
)
obs.store_snapshot(cfg, snap)
digest = obs.build_digest(cfg)
assert "Daily Digest" in digest
assert "Gateway" in digest or "gateway" in digest
def test_send_digest_no_token(self, cfg):
cfg.telegram_token = None
obs._init_db(cfg.db_path)
result = obs.send_digest(cfg)
assert result is False
# ---------------------------------------------------------------------------
# SLO tests
# ---------------------------------------------------------------------------
class TestSLO:
def test_slo_definitions_complete(self):
assert "gateway_uptime_pct" in obs.SLO_DEFINITIONS
assert "webhook_latency_ms" in obs.SLO_DEFINITIONS
assert "api_server_latency_ms" in obs.SLO_DEFINITIONS
def test_slo_targets(self):
assert obs.SLO_DEFINITIONS["gateway_uptime_pct"]["target"] == 99.5
assert obs.SLO_DEFINITIONS["webhook_latency_ms"]["target"] == 2000
# ---------------------------------------------------------------------------
# CLI tests
# ---------------------------------------------------------------------------
class TestCLI:
def test_check_exits_0_on_ok(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
ok_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="all_good", status="ok", message="fine")],
)
with patch("observatory.collect_snapshot", return_value=ok_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc == 0
def test_check_exits_nonzero_on_critical(self, cfg, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
bad_snap = obs.HealthSnapshot(
ts="2026-01-01T00:00:00+00:00",
checks=[obs.CheckResult(name="gateway_process", status="critical", message="down")],
)
with patch("observatory.collect_snapshot", return_value=bad_snap), \
patch("observatory.store_snapshot"):
rc = obs.main(["--check"])
assert rc != 0
def test_digest_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--digest"])
assert rc == 0
def test_slo_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--slo"])
assert rc == 0
def test_history_flag(self, monkeypatch, tmp_path):
monkeypatch.setenv("OBSERVATORY_DB_PATH", str(tmp_path / "obs.db"))
rc = obs.main(["--history", "5"])
assert rc == 0

View File

@@ -0,0 +1,242 @@
"""
Tests for wizard-bootstrap tooling (Epic-004).
These tests exercise the bootstrap, skills audit, and dependency checker
without requiring network access or API keys.
"""
import json
import os
import sys
from pathlib import Path
from unittest import mock
import pytest
# Ensure repo root importable
REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(REPO_ROOT))
sys.path.insert(0, str(REPO_ROOT / "wizard-bootstrap"))
import wizard_bootstrap as wb
import skills_audit as sa
import dependency_checker as dc
# ---------------------------------------------------------------------------
# wizard_bootstrap tests
# ---------------------------------------------------------------------------
class TestCheckPythonVersion:
def test_current_python_passes(self):
result = wb.check_python_version()
assert result.passed
assert "Python" in result.message
def test_old_python_fails(self):
# Patch version_info as a tuple (matches [:3] unpacking used in the check)
old_info = sys.version_info
try:
sys.version_info = (3, 10, 0, "final", 0) # type: ignore[assignment]
result = wb.check_python_version()
finally:
sys.version_info = old_info # type: ignore[assignment]
assert not result.passed
class TestCheckCoreDeps:
def test_passes_when_all_present(self):
result = wb.check_core_deps()
# In a healthy dev environment all packages should be importable
assert result.passed
def test_fails_when_package_missing(self):
orig = __import__
def fake_import(name, *args, **kwargs):
if name == "openai":
raise ModuleNotFoundError(name)
return orig(name, *args, **kwargs)
with mock.patch("builtins.__import__", side_effect=fake_import):
with mock.patch("importlib.import_module", side_effect=ModuleNotFoundError("openai")):
result = wb.check_core_deps()
# With mocked importlib the check should detect the missing module
assert not result.passed
assert "openai" in result.message
class TestCheckEnvVars:
def test_fails_when_no_key_set(self):
env_keys = [
"OPENROUTER_API_KEY", "ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN",
"OPENAI_API_KEY", "GLM_API_KEY", "KIMI_API_KEY", "MINIMAX_API_KEY",
]
with mock.patch.dict(os.environ, {k: "" for k in env_keys}, clear=False):
# Remove all provider keys
env = {k: v for k, v in os.environ.items() if k not in env_keys}
with mock.patch.dict(os.environ, env, clear=True):
result = wb.check_env_vars()
assert not result.passed
def test_passes_when_key_set(self):
with mock.patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-test-key"}):
result = wb.check_env_vars()
assert result.passed
assert "ANTHROPIC_API_KEY" in result.message
class TestCheckHermesHome:
def test_passes_with_existing_writable_dir(self, tmp_path):
with mock.patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
result = wb.check_hermes_home()
assert result.passed
def test_fails_when_dir_missing(self, tmp_path):
missing = tmp_path / "nonexistent"
with mock.patch.dict(os.environ, {"HERMES_HOME": str(missing)}):
result = wb.check_hermes_home()
assert not result.passed
class TestBootstrapReport:
def test_passed_when_all_pass(self):
report = wb.BootstrapReport()
report.add(wb.CheckResult("a", True, "ok"))
report.add(wb.CheckResult("b", True, "ok"))
assert report.passed
assert report.failed == []
def test_failed_when_any_fail(self):
report = wb.BootstrapReport()
report.add(wb.CheckResult("a", True, "ok"))
report.add(wb.CheckResult("b", False, "bad", fix_hint="fix it"))
assert not report.passed
assert len(report.failed) == 1
# ---------------------------------------------------------------------------
# skills_audit tests
# ---------------------------------------------------------------------------
class TestSkillsAudit:
def _make_skill(self, skills_root: Path, rel_path: str, content: str = "# skill") -> Path:
"""Create a SKILL.md at skills_root/rel_path/SKILL.md."""
skill_dir = skills_root / rel_path
skill_dir.mkdir(parents=True, exist_ok=True)
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(content)
return skill_md
def test_no_drift_when_identical(self, tmp_path):
# run_audit expects repo_root/skills/ and installed_root/
repo = tmp_path / "repo"
installed = tmp_path / "installed"
content = "# Same content"
self._make_skill(repo / "skills", "cat/skill-a", content)
self._make_skill(installed, "cat/skill-a", content)
report = sa.run_audit(repo, installed)
assert not report.has_drift
assert len(report.by_status("OK")) == 1
def test_detects_missing_skill(self, tmp_path):
repo = tmp_path / "repo"
installed = tmp_path / "installed"
installed.mkdir()
self._make_skill(repo / "skills", "cat/skill-a")
report = sa.run_audit(repo, installed)
assert report.has_drift
assert len(report.by_status("MISSING")) == 1
def test_detects_extra_skill(self, tmp_path):
repo = tmp_path / "repo"
(repo / "skills").mkdir(parents=True)
installed = tmp_path / "installed"
self._make_skill(installed, "cat/skill-a")
report = sa.run_audit(repo, installed)
assert report.has_drift
assert len(report.by_status("EXTRA")) == 1
def test_detects_outdated_skill(self, tmp_path):
repo = tmp_path / "repo"
installed = tmp_path / "installed"
self._make_skill(repo / "skills", "cat/skill-a", "# Repo version")
self._make_skill(installed, "cat/skill-a", "# Installed version")
report = sa.run_audit(repo, installed)
assert report.has_drift
assert len(report.by_status("OUTDATED")) == 1
def test_fix_copies_missing_skills(self, tmp_path):
repo = tmp_path / "repo"
installed = tmp_path / "installed"
installed.mkdir()
self._make_skill(repo / "skills", "cat/skill-a", "# content")
report = sa.run_audit(repo, installed)
assert len(report.by_status("MISSING")) == 1
sa.apply_fix(report)
report2 = sa.run_audit(repo, installed)
assert not report2.has_drift
# ---------------------------------------------------------------------------
# dependency_checker tests
# ---------------------------------------------------------------------------
class TestDependencyChecker:
def _make_skill(self, root: Path, rel_path: str, content: str) -> None:
skill_dir = root / rel_path
skill_dir.mkdir(parents=True, exist_ok=True)
(skill_dir / "SKILL.md").write_text(content)
def test_no_deps_when_no_frontmatter(self, tmp_path):
self._make_skill(tmp_path, "cat/plain", "# No frontmatter")
report = dc.run_dep_check(skills_dir=tmp_path)
assert report.deps == []
def test_detects_missing_binary(self, tmp_path):
content = "---\nname: test\ndependencies:\n binaries: [definitely_not_a_real_binary_xyz]\n---\n"
self._make_skill(tmp_path, "cat/skill", content)
report = dc.run_dep_check(skills_dir=tmp_path)
assert len(report.deps) == 1
assert not report.deps[0].satisfied
assert report.deps[0].binary == "definitely_not_a_real_binary_xyz"
def test_detects_present_binary(self, tmp_path):
content = "---\nname: test\ndependencies:\n binaries: [python3]\n---\n"
self._make_skill(tmp_path, "cat/skill", content)
report = dc.run_dep_check(skills_dir=tmp_path)
assert len(report.deps) == 1
assert report.deps[0].satisfied
def test_detects_missing_env_var(self, tmp_path):
content = "---\nname: test\ndependencies:\n env_vars: [DEFINITELY_NOT_SET_XYZ_123]\n---\n"
self._make_skill(tmp_path, "cat/skill", content)
env = {k: v for k, v in os.environ.items() if k != "DEFINITELY_NOT_SET_XYZ_123"}
with mock.patch.dict(os.environ, env, clear=True):
report = dc.run_dep_check(skills_dir=tmp_path)
assert len(report.deps) == 1
assert not report.deps[0].satisfied
def test_detects_present_env_var(self, tmp_path):
content = "---\nname: test\ndependencies:\n env_vars: [MY_TEST_VAR_WIZARD]\n---\n"
self._make_skill(tmp_path, "cat/skill", content)
with mock.patch.dict(os.environ, {"MY_TEST_VAR_WIZARD": "set"}):
report = dc.run_dep_check(skills_dir=tmp_path)
assert len(report.deps) == 1
assert report.deps[0].satisfied
def test_skill_filter(self, tmp_path):
content = "---\nname: test\ndependencies:\n binaries: [python3]\n---\n"
self._make_skill(tmp_path, "cat/skill-a", content)
self._make_skill(tmp_path, "cat/skill-b", content)
report = dc.run_dep_check(skills_dir=tmp_path, skill_filter="skill-a")
assert len(report.deps) == 1
assert "skill-a" in report.deps[0].skill_path

View File

@@ -0,0 +1,162 @@
# Wizard Environment Contract
> **Version:** 1.0.0
> **Owner:** Wizard Council (Bezalel Epic-004)
> **Last updated:** 2026-04-06
This document defines the minimum viable state every forge wizard must maintain.
A wizard that satisfies all requirements is considered **forge-ready**.
---
## 1. Python Runtime
| Requirement | Minimum | Notes |
|-------------|---------|-------|
| Python version | 3.11 | 3.12+ recommended |
| Virtual environment | Activated | `source venv/bin/activate` before running |
Run `python --version` to verify.
---
## 2. Core Package Dependencies
All packages in `requirements.txt` must be installed and importable.
Critical packages: `openai`, `anthropic`, `pyyaml`, `rich`, `requests`, `pydantic`, `prompt_toolkit`.
**Verify:**
```bash
python wizard-bootstrap/wizard_bootstrap.py
```
---
## 3. LLM Provider Key
At least one LLM provider API key must be set in `~/.hermes/.env`:
| Variable | Provider |
|----------|----------|
| `OPENROUTER_API_KEY` | OpenRouter (200+ models) |
| `ANTHROPIC_API_KEY` | Anthropic Claude |
| `ANTHROPIC_TOKEN` | Anthropic Claude (alt) |
| `OPENAI_API_KEY` | OpenAI |
| `GLM_API_KEY` | z.ai/GLM |
| `KIMI_API_KEY` | Moonshot/Kimi |
| `MINIMAX_API_KEY` | MiniMax |
---
## 4. Gitea Authentication
| Requirement | Details |
|-------------|---------|
| Variable | `GITEA_TOKEN` or `FORGE_TOKEN` |
| Scope | Must have repo read/write access |
| Forge URL | `https://forge.alexanderwhitestone.com` (or `FORGE_URL` env var) |
The wizard must be able to create and merge PRs on the forge.
---
## 5. Telegram Connectivity (Gateway Wizards)
Wizards that operate via the messaging gateway must also satisfy:
| Requirement | Details |
|-------------|---------|
| Variable | `TELEGRAM_BOT_TOKEN` |
| Home channel | `TELEGRAM_HOME_CHANNEL` |
| API reachability | `api.telegram.org` must be reachable |
CLI-only wizards may skip Telegram checks.
---
## 6. HERMES_HOME
| Requirement | Details |
|-------------|---------|
| Default | `~/.hermes` |
| Override | `HERMES_HOME` env var |
| Permissions | Owner-writable (700 recommended) |
The directory must exist and be writable before any hermes command runs.
---
## 7. Skill Dependencies (Per-Skill)
Each skill may declare binary and environment-variable dependencies in its
`SKILL.md` frontmatter:
```yaml
---
name: my-skill
dependencies:
binaries: [ffmpeg, imagemagick]
env_vars: [MY_API_KEY]
---
```
A wizard must satisfy all dependencies for any skill it intends to run.
**Check all skill deps:**
```bash
python wizard-bootstrap/dependency_checker.py
```
---
## 8. Enforcement
### New Wizard Onboarding
Run the bootstrap script before going online:
```bash
python wizard-bootstrap/wizard_bootstrap.py
```
Resolve all failures before beginning work.
### Ongoing Compliance
A monthly audit runs automatically (see `wizard-bootstrap/monthly_audit.py`).
The report is saved to `~/.hermes/wizard-council/audit-YYYY-MM.md` and posted
to the `wizard-council-automation` Telegram channel.
### Skill Drift
Run the skills audit to detect and fix drift:
```bash
python wizard-bootstrap/skills_audit.py # detect
python wizard-bootstrap/skills_audit.py --fix # sync
```
---
## 9. Contract Versioning
Changes to this contract require a PR reviewed by at least one wizard council
member. Bump the version number and update the date above with each change.
---
## Quick Reference
```bash
# Full environment validation
python wizard-bootstrap/wizard_bootstrap.py
# Skills drift check
python wizard-bootstrap/skills_audit.py
# Dependency check
python wizard-bootstrap/dependency_checker.py
# Full monthly audit (all three checks, saves report)
python wizard-bootstrap/monthly_audit.py
```

View File

@@ -0,0 +1 @@
# wizard-bootstrap package

View File

@@ -0,0 +1,300 @@
#!/usr/bin/env python3
"""
dependency_checker.py — Cross-Wizard Dependency Validator
Each skill may declare binary or environment-variable dependencies in its
SKILL.md frontmatter under a `dependencies` key:
---
name: my-skill
dependencies:
binaries: [ffmpeg, imagemagick]
env_vars: [MY_API_KEY, MY_SECRET]
---
This script scans all installed skills, extracts declared dependencies, and
checks whether each is satisfied in the current environment.
Usage:
python wizard-bootstrap/dependency_checker.py
python wizard-bootstrap/dependency_checker.py --json
python wizard-bootstrap/dependency_checker.py --skill software-development/code-review
"""
import argparse
import json
import os
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class SkillDep:
skill_path: str
skill_name: str
binary: Optional[str] = None
env_var: Optional[str] = None
satisfied: bool = False
detail: str = ""
@dataclass
class DepReport:
deps: list[SkillDep] = field(default_factory=list)
@property
def all_satisfied(self) -> bool:
return all(d.satisfied for d in self.deps)
@property
def unsatisfied(self) -> list[SkillDep]:
return [d for d in self.deps if not d.satisfied]
# ---------------------------------------------------------------------------
# Frontmatter parser
# ---------------------------------------------------------------------------
def _parse_frontmatter(text: str) -> dict:
"""Extract YAML frontmatter from a SKILL.md file."""
if not text.startswith("---"):
return {}
end = text.find("\n---", 3)
if end == -1:
return {}
fm_text = text[3:end].strip()
if not HAS_YAML:
return {}
try:
return yaml.safe_load(fm_text) or {}
except Exception:
return {}
def _load_skill_deps(skill_md: Path) -> tuple[str, list[str], list[str]]:
"""
Returns (skill_name, binaries, env_vars) from a SKILL.md frontmatter.
"""
text = skill_md.read_text(encoding="utf-8", errors="replace")
fm = _parse_frontmatter(text)
skill_name = fm.get("name", skill_md.parent.name)
deps = fm.get("dependencies", {})
if not isinstance(deps, dict):
return skill_name, [], []
binaries = deps.get("binaries") or []
env_vars = deps.get("env_vars") or []
if isinstance(binaries, str):
binaries = [binaries]
if isinstance(env_vars, str):
env_vars = [env_vars]
return skill_name, list(binaries), list(env_vars)
# ---------------------------------------------------------------------------
# Checks
# ---------------------------------------------------------------------------
def _check_binary(binary: str) -> tuple[bool, str]:
path = shutil.which(binary)
if path:
return True, f"found at {path}"
return False, f"not found in PATH"
def _check_env_var(var: str) -> tuple[bool, str]:
val = os.environ.get(var)
if val:
return True, "set"
return False, "not set"
# ---------------------------------------------------------------------------
# Scanner
# ---------------------------------------------------------------------------
def _find_skills_dir() -> Optional[Path]:
"""Resolve skills directory: prefer repo root, fall back to HERMES_HOME."""
# Check if we're inside the repo
repo_root = Path(__file__).parent.parent
repo_skills = repo_root / "skills"
if repo_skills.exists():
return repo_skills
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
for candidate in [hermes_home / "skills", hermes_home / "hermes-agent" / "skills"]:
if candidate.exists():
return candidate
return None
def run_dep_check(skills_dir: Optional[Path] = None, skill_filter: Optional[str] = None) -> DepReport:
resolved = skills_dir or _find_skills_dir()
report = DepReport()
if resolved is None or not resolved.exists():
return report
# Load ~/.hermes/.env so env var checks work
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
env_path = hermes_home / ".env"
if env_path.exists():
try:
from dotenv import load_dotenv # noqa: PLC0415
load_dotenv(env_path, override=False)
except Exception:
pass
for skill_md in sorted(resolved.rglob("SKILL.md")):
rel = str(skill_md.parent.relative_to(resolved))
if skill_filter and skill_filter not in rel:
continue
skill_name, binaries, env_vars = _load_skill_deps(skill_md)
for binary in binaries:
ok, detail = _check_binary(binary)
report.deps.append(SkillDep(
skill_path=rel,
skill_name=skill_name,
binary=binary,
satisfied=ok,
detail=detail,
))
for var in env_vars:
ok, detail = _check_env_var(var)
report.deps.append(SkillDep(
skill_path=rel,
skill_name=skill_name,
env_var=var,
satisfied=ok,
detail=detail,
))
return report
# ---------------------------------------------------------------------------
# Rendering
# ---------------------------------------------------------------------------
_GREEN = "\033[32m"
_RED = "\033[31m"
_YELLOW = "\033[33m"
_BOLD = "\033[1m"
_RESET = "\033[0m"
def _render_terminal(report: DepReport) -> None:
print(f"\n{_BOLD}=== Cross-Wizard Dependency Check ==={_RESET}\n")
if not report.deps:
print("No skill dependencies declared. Skills use implicit deps only.\n")
print(
f"{_YELLOW}Tip:{_RESET} Declare binary/env_var deps in SKILL.md frontmatter "
"under a 'dependencies' key to make them checkable.\n"
)
return
for dep in report.deps:
icon = f"{_GREEN}{_RESET}" if dep.satisfied else f"{_RED}{_RESET}"
if dep.binary:
dep_type = "binary"
dep_name = dep.binary
else:
dep_type = "env_var"
dep_name = dep.env_var
print(f" {icon} [{dep.skill_path}] {dep_type}:{dep_name}{dep.detail}")
total = len(report.deps)
satisfied = sum(1 for d in report.deps if d.satisfied)
print()
if report.all_satisfied:
print(f"{_GREEN}{_BOLD}All {total} dependencies satisfied.{_RESET}\n")
else:
failed = total - satisfied
print(
f"{_RED}{_BOLD}{failed}/{total} dependencies unsatisfied.{_RESET} "
"Install missing binaries and set missing env vars.\n"
)
def _render_json(report: DepReport) -> None:
out = {
"all_satisfied": report.all_satisfied,
"summary": {
"total": len(report.deps),
"satisfied": sum(1 for d in report.deps if d.satisfied),
"unsatisfied": len(report.unsatisfied),
},
"deps": [
{
"skill_path": d.skill_path,
"skill_name": d.skill_name,
"type": "binary" if d.binary else "env_var",
"name": d.binary or d.env_var,
"satisfied": d.satisfied,
"detail": d.detail,
}
for d in report.deps
],
}
print(json.dumps(out, indent=2))
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
if not HAS_YAML:
print("WARNING: pyyaml not installed — cannot parse SKILL.md frontmatter. "
"Dependency declarations will be skipped.", file=sys.stderr)
parser = argparse.ArgumentParser(
description="Check cross-wizard skill dependencies (binaries, env vars)."
)
parser.add_argument(
"--skills-dir",
default=None,
help="Skills directory to scan (default: auto-detect)",
)
parser.add_argument(
"--skill",
default=None,
help="Filter to a specific skill path substring",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
args = parser.parse_args()
skills_dir = Path(args.skills_dir).resolve() if args.skills_dir else None
report = run_dep_check(skills_dir=skills_dir, skill_filter=args.skill)
if args.json:
_render_json(report)
else:
_render_terminal(report)
sys.exit(0 if report.all_satisfied else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,259 @@
#!/usr/bin/env python3
"""
monthly_audit.py — Wizard Council Monthly Environment Audit
Runs all three checks (bootstrap, skills audit, dependency check) and
produces a combined Markdown report. Designed to be invoked by cron or
manually.
Usage:
python wizard-bootstrap/monthly_audit.py
python wizard-bootstrap/monthly_audit.py --output /path/to/report.md
python wizard-bootstrap/monthly_audit.py --post-telegram # post to configured channel
The report is also written to ~/.hermes/wizard-council/audit-YYYY-MM.md
"""
import argparse
import io
import json
import os
import sys
from contextlib import redirect_stdout
from datetime import datetime, timezone
from pathlib import Path
# Ensure repo root is importable
_REPO_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(_REPO_ROOT))
from wizard_bootstrap import run_all_checks
from skills_audit import run_audit
from dependency_checker import run_dep_check
# ---------------------------------------------------------------------------
# Report builder
# ---------------------------------------------------------------------------
def _emoji(ok: bool) -> str:
return "" if ok else ""
def build_report(repo_root: Path) -> str:
now = datetime.now(timezone.utc)
lines = [
f"# Wizard Council Environment Audit",
f"",
f"**Date:** {now.strftime('%Y-%m-%d %H:%M UTC')}",
f"",
f"---",
f"",
]
# 1. Bootstrap checks
lines.append("## 1. Environment Bootstrap")
lines.append("")
bootstrap = run_all_checks()
for check in bootstrap.checks:
icon = _emoji(check.passed)
label = check.name.replace("_", " ").title()
lines.append(f"- {icon} **{label}**: {check.message}")
if not check.passed and check.fix_hint:
lines.append(f" - _Fix_: {check.fix_hint}")
lines.append("")
if bootstrap.passed:
lines.append("**Environment: READY** ✅")
else:
failed = len(bootstrap.failed)
lines.append(f"**Environment: {failed} check(s) FAILED** ❌")
lines.append("")
lines.append("---")
lines.append("")
# 2. Skills audit
lines.append("## 2. Skills Drift Audit")
lines.append("")
skills_report = run_audit(repo_root)
missing = skills_report.by_status("MISSING")
extra = skills_report.by_status("EXTRA")
outdated = skills_report.by_status("OUTDATED")
ok_count = len(skills_report.by_status("OK"))
total = len(skills_report.drifts)
lines.append(f"| Status | Count |")
lines.append(f"|--------|-------|")
lines.append(f"| ✅ OK | {ok_count} |")
lines.append(f"| ❌ Missing | {len(missing)} |")
lines.append(f"| ⚠️ Extra | {len(extra)} |")
lines.append(f"| 🔄 Outdated | {len(outdated)} |")
lines.append(f"| **Total** | **{total}** |")
lines.append("")
if missing:
lines.append("### Missing Skills (in repo, not installed)")
for d in missing:
lines.append(f"- `{d.skill_path}`")
lines.append("")
if outdated:
lines.append("### Outdated Skills")
for d in outdated:
lines.append(f"- `{d.skill_path}` (repo: `{d.repo_hash}`, installed: `{d.installed_hash}`)")
lines.append("")
if extra:
lines.append("### Extra Skills (installed, not in repo)")
for d in extra:
lines.append(f"- `{d.skill_path}`")
lines.append("")
if not skills_report.has_drift:
lines.append("**Skills: IN SYNC** ✅")
else:
lines.append("**Skills: DRIFT DETECTED** ❌ — run `python wizard-bootstrap/skills_audit.py --fix`")
lines.append("")
lines.append("---")
lines.append("")
# 3. Dependency check
lines.append("## 3. Cross-Wizard Dependency Check")
lines.append("")
dep_report = run_dep_check()
if not dep_report.deps:
lines.append("No explicit dependencies declared in SKILL.md frontmatter.")
lines.append("")
lines.append(
"_Tip: Add a `dependencies` block to SKILL.md to make binary/env_var "
"requirements checkable automatically._"
)
else:
satisfied = sum(1 for d in dep_report.deps if d.satisfied)
total_deps = len(dep_report.deps)
lines.append(f"**{satisfied}/{total_deps} dependencies satisfied.**")
lines.append("")
if dep_report.unsatisfied:
lines.append("### Unsatisfied Dependencies")
for dep in dep_report.unsatisfied:
dep_type = "binary" if dep.binary else "env_var"
dep_name = dep.binary or dep.env_var
lines.append(f"- `[{dep.skill_path}]` {dep_type}:`{dep_name}` — {dep.detail}")
lines.append("")
if dep_report.all_satisfied:
lines.append("**Dependencies: ALL SATISFIED** ✅")
else:
lines.append("**Dependencies: ISSUES FOUND** ❌")
lines.append("")
lines.append("---")
lines.append("")
# Summary
overall_ok = bootstrap.passed and not skills_report.has_drift and dep_report.all_satisfied
lines.append("## Summary")
lines.append("")
lines.append(f"| Check | Status |")
lines.append(f"|-------|--------|")
lines.append(f"| Environment Bootstrap | {_emoji(bootstrap.passed)} |")
lines.append(f"| Skills Drift | {_emoji(not skills_report.has_drift)} |")
lines.append(f"| Dependency Check | {_emoji(dep_report.all_satisfied)} |")
lines.append("")
if overall_ok:
lines.append("**Overall: FORGE READY** ✅")
else:
lines.append("**Overall: ACTION REQUIRED** ❌")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Output / delivery
# ---------------------------------------------------------------------------
def _save_report(report: str, output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"Report saved to: {output_path}")
def _post_telegram(report: str) -> None:
"""Post the report summary to Telegram via hermes gateway if configured."""
token = os.environ.get("TELEGRAM_BOT_TOKEN")
channel = os.environ.get("TELEGRAM_HOME_CHANNEL") or os.environ.get("TELEGRAM_CHANNEL_ID")
if not (token and channel):
print("Telegram not configured (need TELEGRAM_BOT_TOKEN + TELEGRAM_HOME_CHANNEL).", file=sys.stderr)
return
try:
import requests # noqa: PLC0415
# Extract just the summary section for Telegram (keep it brief)
summary_start = report.find("## Summary")
summary_text = report[summary_start:] if summary_start != -1 else report[-1000:]
payload = {
"chat_id": channel,
"text": f"🧙 **Wizard Council Monthly Audit**\n\n{summary_text}",
"parse_mode": "Markdown",
}
resp = requests.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json=payload,
timeout=15,
)
if resp.status_code == 200:
print("Report summary posted to Telegram.")
else:
print(f"Telegram post failed: HTTP {resp.status_code}", file=sys.stderr)
except Exception as exc:
print(f"Telegram post error: {exc}", file=sys.stderr)
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Run the monthly Wizard Council environment audit."
)
parser.add_argument(
"--output",
default=None,
help="Path to save the Markdown report (default: ~/.hermes/wizard-council/audit-YYYY-MM.md)",
)
parser.add_argument(
"--repo-root",
default=str(_REPO_ROOT),
help="Root of the hermes-agent repo",
)
parser.add_argument(
"--post-telegram",
action="store_true",
help="Post the report summary to Telegram",
)
args = parser.parse_args()
repo_root = Path(args.repo_root).resolve()
report = build_report(repo_root)
# Print to stdout
print(report)
# Save to default location
now = datetime.now(timezone.utc)
if args.output:
output_path = Path(args.output)
else:
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
output_path = hermes_home / "wizard-council" / f"audit-{now.strftime('%Y-%m')}.md"
_save_report(report, output_path)
if args.post_telegram:
_post_telegram(report)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,342 @@
#!/usr/bin/env python3
"""
skills_audit.py — Skills Drift Detector
Compares the skills bundled in the repo against those installed in
HERMES_HOME/skills/, then reports any drift:
- MISSING — skill in repo but not in installed location
- EXTRA — skill installed but not in repo (local-only)
- OUTDATED — repo skill.md differs from installed skill.md
Usage:
python wizard-bootstrap/skills_audit.py
python wizard-bootstrap/skills_audit.py --fix # copy missing skills
python wizard-bootstrap/skills_audit.py --json
python wizard-bootstrap/skills_audit.py --repo-root /path/to/hermes-agent
"""
import argparse
import difflib
import hashlib
import json
import os
import shutil
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class SkillDrift:
skill_path: str # e.g. "software-development/code-review"
status: str # "MISSING" | "EXTRA" | "OUTDATED" | "OK"
repo_hash: Optional[str] = None
installed_hash: Optional[str] = None
diff_lines: list[str] = field(default_factory=list)
@dataclass
class AuditReport:
drifts: list[SkillDrift] = field(default_factory=list)
repo_root: Path = Path(".")
installed_root: Path = Path(".")
@property
def has_drift(self) -> bool:
return any(d.status != "OK" for d in self.drifts)
def by_status(self, status: str) -> list[SkillDrift]:
return [d for d in self.drifts if d.status == status]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _sha256_file(path: Path) -> str:
h = hashlib.sha256()
h.update(path.read_bytes())
return h.hexdigest()[:16]
def _find_skills(root: Path) -> dict[str, Path]:
"""Return {relative_skill_path: SKILL.md path} for every skill under root."""
skills: dict[str, Path] = {}
for skill_md in root.rglob("SKILL.md"):
# skill path is relative to root, e.g. "software-development/code-review"
rel = skill_md.parent.relative_to(root)
skills[str(rel)] = skill_md
return skills
def _diff_skills(repo_md: Path, installed_md: Path) -> list[str]:
repo_lines = repo_md.read_text(encoding="utf-8", errors="replace").splitlines()
inst_lines = installed_md.read_text(encoding="utf-8", errors="replace").splitlines()
diff = list(
difflib.unified_diff(
inst_lines,
repo_lines,
fromfile="installed",
tofile="repo",
lineterm="",
)
)
return diff
# ---------------------------------------------------------------------------
# Core audit logic
# ---------------------------------------------------------------------------
def _resolve_installed_skills_root() -> Optional[Path]:
"""Return the installed skills directory, or None if not found."""
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
candidates = [
hermes_home / "skills",
hermes_home / "hermes-agent" / "skills",
]
for candidate in candidates:
if candidate.exists():
return candidate
return None
def run_audit(repo_root: Path, installed_root: Optional[Path] = None) -> AuditReport:
repo_skills_dir = repo_root / "skills"
if not repo_skills_dir.exists():
print(f"ERROR: Repo skills directory not found: {repo_skills_dir}", file=sys.stderr)
sys.exit(1)
resolved_installed = installed_root or _resolve_installed_skills_root()
report = AuditReport(
repo_root=repo_root,
installed_root=resolved_installed or Path("/not-found"),
)
repo_map = _find_skills(repo_skills_dir)
if resolved_installed is None or not resolved_installed.exists():
# All repo skills are "MISSING" from the installation
for skill_path in sorted(repo_map):
report.drifts.append(
SkillDrift(
skill_path=skill_path,
status="MISSING",
repo_hash=_sha256_file(repo_map[skill_path]),
)
)
return report
installed_map = _find_skills(resolved_installed)
all_paths = sorted(set(repo_map) | set(installed_map))
for skill_path in all_paths:
in_repo = skill_path in repo_map
in_installed = skill_path in installed_map
if in_repo and not in_installed:
report.drifts.append(
SkillDrift(
skill_path=skill_path,
status="MISSING",
repo_hash=_sha256_file(repo_map[skill_path]),
)
)
elif in_installed and not in_repo:
report.drifts.append(
SkillDrift(
skill_path=skill_path,
status="EXTRA",
installed_hash=_sha256_file(installed_map[skill_path]),
)
)
else:
rh = _sha256_file(repo_map[skill_path])
ih = _sha256_file(installed_map[skill_path])
if rh != ih:
diff = _diff_skills(repo_map[skill_path], installed_map[skill_path])
report.drifts.append(
SkillDrift(
skill_path=skill_path,
status="OUTDATED",
repo_hash=rh,
installed_hash=ih,
diff_lines=diff,
)
)
else:
report.drifts.append(
SkillDrift(skill_path=skill_path, status="OK", repo_hash=rh, installed_hash=ih)
)
return report
# ---------------------------------------------------------------------------
# Fix: copy missing skills into installed location
# ---------------------------------------------------------------------------
def apply_fix(report: AuditReport) -> None:
if report.installed_root == Path("/not-found"):
print("Cannot fix: installed skills directory not found.", file=sys.stderr)
return
repo_skills_dir = report.repo_root / "skills"
for drift in report.by_status("MISSING"):
src = repo_skills_dir / drift.skill_path / "SKILL.md"
dst = report.installed_root / drift.skill_path / "SKILL.md"
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
print(f" Installed: {drift.skill_path}")
for drift in report.by_status("OUTDATED"):
src = repo_skills_dir / drift.skill_path / "SKILL.md"
dst = report.installed_root / drift.skill_path / "SKILL.md"
shutil.copy2(src, dst)
print(f" Updated: {drift.skill_path}")
# ---------------------------------------------------------------------------
# Rendering
# ---------------------------------------------------------------------------
_GREEN = "\033[32m"
_RED = "\033[31m"
_YELLOW = "\033[33m"
_CYAN = "\033[36m"
_BOLD = "\033[1m"
_RESET = "\033[0m"
_STATUS_COLOR = {
"OK": _GREEN,
"MISSING": _RED,
"EXTRA": _YELLOW,
"OUTDATED": _CYAN,
}
def _render_terminal(report: AuditReport, show_diff: bool = False) -> None:
print(f"\n{_BOLD}=== Wizard Skills Audit ==={_RESET}")
print(f" Repo skills: {report.repo_root / 'skills'}")
print(f" Installed skills: {report.installed_root}\n")
if not report.drifts:
print(f"{_GREEN}No skills found to compare.{_RESET}\n")
return
total = len(report.drifts)
ok = len(report.by_status("OK"))
missing = len(report.by_status("MISSING"))
extra = len(report.by_status("EXTRA"))
outdated = len(report.by_status("OUTDATED"))
for drift in sorted(report.drifts, key=lambda d: (d.status == "OK", d.skill_path)):
color = _STATUS_COLOR.get(drift.status, _RESET)
print(f" {color}{drift.status:8}{_RESET} {drift.skill_path}")
if show_diff and drift.diff_lines:
for line in drift.diff_lines[:20]:
print(f" {line}")
if len(drift.diff_lines) > 20:
print(f" ... ({len(drift.diff_lines) - 20} more lines)")
print()
print(f" Total: {total} OK: {_GREEN}{ok}{_RESET} "
f"Missing: {_RED}{missing}{_RESET} "
f"Extra: {_YELLOW}{extra}{_RESET} "
f"Outdated: {_CYAN}{outdated}{_RESET}")
print()
if not report.has_drift:
print(f"{_GREEN}{_BOLD}No drift detected. Skills are in sync.{_RESET}\n")
else:
print(f"{_YELLOW}{_BOLD}Drift detected. Run with --fix to sync missing/outdated skills.{_RESET}\n")
def _render_json(report: AuditReport) -> None:
out = {
"has_drift": report.has_drift,
"repo_skills_dir": str(report.repo_root / "skills"),
"installed_skills_dir": str(report.installed_root),
"summary": {
"total": len(report.drifts),
"ok": len(report.by_status("OK")),
"missing": len(report.by_status("MISSING")),
"extra": len(report.by_status("EXTRA")),
"outdated": len(report.by_status("OUTDATED")),
},
"drifts": [
{
"skill_path": d.skill_path,
"status": d.status,
"repo_hash": d.repo_hash,
"installed_hash": d.installed_hash,
"diff_line_count": len(d.diff_lines),
}
for d in report.drifts
if d.status != "OK"
],
}
print(json.dumps(out, indent=2))
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Audit wizard skills for drift between repo and installed location."
)
parser.add_argument(
"--repo-root",
default=str(Path(__file__).parent.parent),
help="Root of the hermes-agent repo (default: parent of this script)",
)
parser.add_argument(
"--installed-root",
default=None,
help="Installed skills directory (default: auto-detect from HERMES_HOME)",
)
parser.add_argument(
"--fix",
action="store_true",
help="Copy missing/outdated skills from repo to installed location",
)
parser.add_argument(
"--diff",
action="store_true",
help="Show diff for outdated skills",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
args = parser.parse_args()
repo_root = Path(args.repo_root).resolve()
installed_root = Path(args.installed_root).resolve() if args.installed_root else None
report = run_audit(repo_root, installed_root)
if args.fix:
apply_fix(report)
# Re-run audit after fix to show updated state
report = run_audit(repo_root, installed_root)
if args.json:
_render_json(report)
else:
_render_terminal(report, show_diff=args.diff)
sys.exit(0 if not report.has_drift else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
wizard_bootstrap.py — Wizard Environment Validator
Validates that a new wizard's forge environment is ready:
1. Python version check (>=3.11)
2. Core dependencies installed
3. Gitea authentication
4. Telegram connectivity
5. Smoke test (hermes import)
Usage:
python wizard-bootstrap/wizard_bootstrap.py
python wizard-bootstrap/wizard_bootstrap.py --fix
python wizard-bootstrap/wizard_bootstrap.py --json
Exits 0 if all checks pass, 1 if any check fails.
"""
import argparse
import importlib
import json
import os
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Result model
# ---------------------------------------------------------------------------
@dataclass
class CheckResult:
name: str
passed: bool
message: str
fix_hint: Optional[str] = None
detail: Optional[str] = None
@dataclass
class BootstrapReport:
checks: list[CheckResult] = field(default_factory=list)
@property
def passed(self) -> bool:
return all(c.passed for c in self.checks)
@property
def failed(self) -> list[CheckResult]:
return [c for c in self.checks if not c.passed]
def add(self, result: CheckResult) -> None:
self.checks.append(result)
# ---------------------------------------------------------------------------
# Individual checks
# ---------------------------------------------------------------------------
def check_python_version() -> CheckResult:
"""Require Python >= 3.11."""
major, minor, micro = sys.version_info[:3]
ok = (major, minor) >= (3, 11)
return CheckResult(
name="python_version",
passed=ok,
message=f"Python {major}.{minor}.{micro}",
fix_hint="Install Python 3.11+ via uv, pyenv, or your OS package manager.",
)
def check_core_deps() -> CheckResult:
"""Verify that hermes core Python packages are importable."""
required = [
"openai",
"anthropic",
"dotenv",
"yaml",
"rich",
"requests",
"pydantic",
"prompt_toolkit",
]
missing = []
for pkg in required:
# dotenv ships as 'python-dotenv' but imports as 'dotenv'
try:
importlib.import_module(pkg)
except ModuleNotFoundError:
missing.append(pkg)
if missing:
return CheckResult(
name="core_deps",
passed=False,
message=f"Missing packages: {', '.join(missing)}",
fix_hint="Run: uv pip install -r requirements.txt (or: pip install -r requirements.txt)",
)
return CheckResult(name="core_deps", passed=True, message="All core packages importable")
def check_hermes_importable() -> CheckResult:
"""Smoke-test: import hermes_constants (no side effects)."""
# Add repo root to sys.path so we can import regardless of cwd
repo_root = str(Path(__file__).parent.parent)
if repo_root not in sys.path:
sys.path.insert(0, repo_root)
try:
import hermes_constants # noqa: F401
return CheckResult(name="hermes_smoke", passed=True, message="hermes_constants imported OK")
except Exception as exc:
return CheckResult(
name="hermes_smoke",
passed=False,
message=f"Import error: {exc}",
fix_hint="Ensure you are in the hermes-agent repo root and your venv is active.",
)
def check_gitea_auth() -> CheckResult:
"""Verify Gitea token env var is set and the API responds."""
token = os.environ.get("GITEA_TOKEN") or os.environ.get("FORGE_TOKEN")
if not token:
return CheckResult(
name="gitea_auth",
passed=False,
message="GITEA_TOKEN / FORGE_TOKEN not set",
fix_hint="Export GITEA_TOKEN=<your-token> in your shell or ~/.hermes/.env",
)
# Attempt a lightweight API call — list repos endpoint returns quickly
forge_url = os.environ.get("FORGE_URL", "https://forge.alexanderwhitestone.com")
try:
import requests # noqa: PLC0415
resp = requests.get(
f"{forge_url}/api/v1/repos/search",
headers={"Authorization": f"token {token}"},
params={"limit": 1},
timeout=10,
)
if resp.status_code == 200:
return CheckResult(name="gitea_auth", passed=True, message="Gitea API reachable and token valid")
return CheckResult(
name="gitea_auth",
passed=False,
message=f"Gitea API returned HTTP {resp.status_code}",
fix_hint="Check that your GITEA_TOKEN is correct and not expired.",
)
except Exception as exc:
return CheckResult(
name="gitea_auth",
passed=False,
message=f"Gitea API unreachable: {exc}",
fix_hint="Check network connectivity and FORGE_URL env var.",
)
def check_telegram_connectivity() -> CheckResult:
"""Verify Telegram bot token is set and the Bot API responds."""
token = os.environ.get("TELEGRAM_BOT_TOKEN")
if not token:
return CheckResult(
name="telegram",
passed=False,
message="TELEGRAM_BOT_TOKEN not set",
fix_hint="Export TELEGRAM_BOT_TOKEN=<token> in your shell or ~/.hermes/.env",
)
try:
import requests # noqa: PLC0415
resp = requests.get(
f"https://api.telegram.org/bot{token}/getMe",
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
username = data.get("result", {}).get("username", "?")
return CheckResult(
name="telegram",
passed=True,
message=f"Telegram bot @{username} reachable",
)
return CheckResult(
name="telegram",
passed=False,
message=f"Telegram API returned HTTP {resp.status_code}",
fix_hint="Check that TELEGRAM_BOT_TOKEN is valid.",
)
except Exception as exc:
return CheckResult(
name="telegram",
passed=False,
message=f"Telegram unreachable: {exc}",
fix_hint="Check network connectivity.",
)
def check_env_vars() -> CheckResult:
"""Check that at least one LLM provider key is configured."""
provider_keys = [
"OPENROUTER_API_KEY",
"ANTHROPIC_API_KEY",
"ANTHROPIC_TOKEN",
"OPENAI_API_KEY",
"GLM_API_KEY",
"KIMI_API_KEY",
"MINIMAX_API_KEY",
]
found = [k for k in provider_keys if os.environ.get(k)]
if found:
return CheckResult(
name="llm_provider",
passed=True,
message=f"LLM provider key(s) present: {', '.join(found)}",
)
return CheckResult(
name="llm_provider",
passed=False,
message="No LLM provider API key found",
fix_hint=(
"Set at least one of: OPENROUTER_API_KEY, ANTHROPIC_API_KEY, OPENAI_API_KEY "
"in ~/.hermes/.env or your shell."
),
)
def check_hermes_home() -> CheckResult:
"""Verify HERMES_HOME directory exists and is writable."""
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
if not hermes_home.exists():
return CheckResult(
name="hermes_home",
passed=False,
message=f"HERMES_HOME does not exist: {hermes_home}",
fix_hint="Run 'hermes setup' or create the directory manually.",
)
if not os.access(hermes_home, os.W_OK):
return CheckResult(
name="hermes_home",
passed=False,
message=f"HERMES_HOME not writable: {hermes_home}",
fix_hint=f"Fix permissions: chmod u+w {hermes_home}",
)
return CheckResult(
name="hermes_home",
passed=True,
message=f"HERMES_HOME OK: {hermes_home}",
)
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
def _load_dotenv_if_available() -> None:
"""Load ~/.hermes/.env so token checks work without manual export."""
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
env_path = hermes_home / ".env"
if env_path.exists():
try:
from dotenv import load_dotenv # noqa: PLC0415
load_dotenv(env_path, override=False)
except Exception:
pass # dotenv not installed yet — that's fine
def run_all_checks() -> BootstrapReport:
report = BootstrapReport()
_load_dotenv_if_available()
checks = [
check_python_version,
check_core_deps,
check_hermes_importable,
check_hermes_home,
check_env_vars,
check_gitea_auth,
check_telegram_connectivity,
]
for fn in checks:
result = fn()
report.add(result)
return report
# ---------------------------------------------------------------------------
# Rendering
# ---------------------------------------------------------------------------
_GREEN = "\033[32m"
_RED = "\033[31m"
_YELLOW = "\033[33m"
_BOLD = "\033[1m"
_RESET = "\033[0m"
def _render_terminal(report: BootstrapReport) -> None:
print(f"\n{_BOLD}=== Wizard Bootstrap — Environment Check ==={_RESET}\n")
for check in report.checks:
icon = f"{_GREEN}{_RESET}" if check.passed else f"{_RED}{_RESET}"
label = check.name.replace("_", " ").title()
print(f" {icon} {_BOLD}{label}{_RESET}: {check.message}")
if not check.passed and check.fix_hint:
print(f" {_YELLOW}{check.fix_hint}{_RESET}")
if check.detail:
print(f" {check.detail}")
total = len(report.checks)
passed = sum(1 for c in report.checks if c.passed)
print()
if report.passed:
print(f"{_GREEN}{_BOLD}All {total} checks passed. Forge is ready.{_RESET}\n")
else:
failed = total - passed
print(
f"{_RED}{_BOLD}{failed}/{total} check(s) failed.{_RESET} "
f"Resolve the issues above before going online.\n"
)
def _render_json(report: BootstrapReport) -> None:
out = {
"passed": report.passed,
"summary": {
"total": len(report.checks),
"passed": sum(1 for c in report.checks if c.passed),
"failed": sum(1 for c in report.checks if not c.passed),
},
"checks": [
{
"name": c.name,
"passed": c.passed,
"message": c.message,
"fix_hint": c.fix_hint,
"detail": c.detail,
}
for c in report.checks
],
}
print(json.dumps(out, indent=2))
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Validate the forge wizard environment."
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
args = parser.parse_args()
report = run_all_checks()
if args.json:
_render_json(report)
else:
_render_terminal(report)
sys.exit(0 if report.passed else 1)
if __name__ == "__main__":
main()