diff --git a/devkit/README.md b/devkit/README.md new file mode 100644 index 000000000..40db3e664 --- /dev/null +++ b/devkit/README.md @@ -0,0 +1,56 @@ +# Bezalel's Devkit — Shared Tools for the Wizard Fleet + +This directory contains reusable CLI tools and Python modules for CI, testing, deployment, observability, and Gitea automation. Any wizard can invoke them via `python -m devkit.`. + +## Tools + +### `gitea_client` — Gitea API Client +List issues/PRs, post comments, create PRs, update issues. + +```bash +python -m devkit.gitea_client issues --state open --limit 20 +python -m devkit.gitea_client create-comment --number 142 --body "Update from Bezalel" +python -m devkit.gitea_client prs --state open +``` + +### `health` — Fleet Health Monitor +Checks system load, disk, memory, running processes, and key package versions. + +```bash +python -m devkit.health --threshold-load 1.0 --threshold-disk 90.0 --fail-on-critical +``` + +### `notebook_runner` — Notebook Execution Wrapper +Parameterizes and executes Jupyter notebooks via Papermill with structured JSON reporting. + +```bash +python -m devkit.notebook_runner task.ipynb output.ipynb -p threshold=1.0 -p hostname=forge +``` + +### `smoke_test` — Fast Smoke Test Runner +Runs core import checks, CLI entrypoint tests, and one bare green-path E2E. + +```bash +python -m devkit.smoke_test --verbose +``` + +### `secret_scan` — Secret Leak Scanner +Scans the repo for API keys, tokens, and private keys. + +```bash +python -m devkit.secret_scan --path . --fail-on-find +``` + +### `wizard_env` — Environment Validator +Checks that a wizard environment has all required binaries, env vars, Python packages, and Hermes config. + +```bash +python -m devkit.wizard_env --json --fail-on-incomplete +``` + +## Philosophy + +- **CLI-first** — Every tool is runnable as `python -m devkit.` +- **JSON output** — Easy to parse from other agents and CI pipelines +- **Zero dependencies beyond stdlib** where possible; optional heavy deps are runtime-checked +- **Fail-fast** — Exit codes are meaningful for CI gating diff --git a/devkit/__init__.py b/devkit/__init__.py new file mode 100644 index 000000000..9a16cf9a4 --- /dev/null +++ b/devkit/__init__.py @@ -0,0 +1,9 @@ +""" +Bezalel's Devkit — Shared development tools for the wizard fleet. + +A collection of CLI-accessible utilities for CI, testing, deployment, +observability, and Gitea automation. Designed to be used by any agent +via subprocess or direct Python import. +""" + +__version__ = "0.1.0" diff --git a/devkit/gitea_client.py b/devkit/gitea_client.py new file mode 100644 index 000000000..427ec3abb --- /dev/null +++ b/devkit/gitea_client.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +""" +Shared Gitea API client for wizard fleet automation. + +Usage as CLI: + python -m devkit.gitea_client issues --repo Timmy_Foundation/hermes-agent --state open + python -m devkit.gitea_client issue --repo Timmy_Foundation/hermes-agent --number 142 + python -m devkit.gitea_client create-comment --repo Timmy_Foundation/hermes-agent --number 142 --body "Update from Bezalel" + python -m devkit.gitea_client prs --repo Timmy_Foundation/hermes-agent --state open + +Usage as module: + from devkit.gitea_client import GiteaClient + client = GiteaClient() + issues = client.list_issues("Timmy_Foundation/hermes-agent", state="open") +""" + +import argparse +import json +import os +import sys +from typing import Any, Dict, List, Optional + +import urllib.request + + +DEFAULT_BASE_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com") +DEFAULT_TOKEN = os.getenv("GITEA_TOKEN", "") + + +class GiteaClient: + def __init__(self, base_url: str = DEFAULT_BASE_URL, token: str = DEFAULT_TOKEN): + self.base_url = base_url.rstrip("/") + self.token = token or "" + + def _request( + self, + method: str, + path: str, + data: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Any: + url = f"{self.base_url}/api/v1{path}" + req_headers = {"Content-Type": "application/json", "Accept": "application/json"} + if self.token: + req_headers["Authorization"] = f"token {self.token}" + if headers: + req_headers.update(headers) + + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=req_headers, method=method) + + try: + with urllib.request.urlopen(req) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + return {"error": True, "status": e.code, "body": e.read().decode()} + + def list_issues(self, repo: str, state: str = "open", limit: int = 50) -> List[Dict]: + return self._request("GET", f"/repos/{repo}/issues?state={state}&limit={limit}") or [] + + def get_issue(self, repo: str, number: int) -> Dict: + return self._request("GET", f"/repos/{repo}/issues/{number}") or {} + + def create_comment(self, repo: str, number: int, body: str) -> Dict: + return self._request( + "POST", f"/repos/{repo}/issues/{number}/comments", {"body": body} + ) + + def update_issue(self, repo: str, number: int, **fields) -> Dict: + return self._request("PATCH", f"/repos/{repo}/issues/{number}", fields) + + def list_prs(self, repo: str, state: str = "open", limit: int = 50) -> List[Dict]: + return self._request("GET", f"/repos/{repo}/pulls?state={state}&limit={limit}") or [] + + def get_pr(self, repo: str, number: int) -> Dict: + return self._request("GET", f"/repos/{repo}/pulls/{number}") or {} + + def create_pr(self, repo: str, title: str, head: str, base: str, body: str = "") -> Dict: + return self._request( + "POST", + f"/repos/{repo}/pulls", + {"title": title, "head": head, "base": base, "body": body}, + ) + + +def _fmt_json(obj: Any) -> str: + return json.dumps(obj, indent=2, ensure_ascii=False) + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Gitea CLI for wizard fleet") + parser.add_argument("--repo", default="Timmy_Foundation/hermes-agent", help="Repository full name") + parser.add_argument("--token", default=DEFAULT_TOKEN, help="Gitea API token") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Gitea base URL") + sub = parser.add_subparsers(dest="cmd") + + p_issues = sub.add_parser("issues", help="List issues") + p_issues.add_argument("--state", default="open") + p_issues.add_argument("--limit", type=int, default=50) + + p_issue = sub.add_parser("issue", help="Get single issue") + p_issue.add_argument("--number", type=int, required=True) + + p_prs = sub.add_parser("prs", help="List PRs") + p_prs.add_argument("--state", default="open") + p_prs.add_argument("--limit", type=int, default=50) + + p_pr = sub.add_parser("pr", help="Get single PR") + p_pr.add_argument("--number", type=int, required=True) + + p_comment = sub.add_parser("create-comment", help="Post comment on issue/PR") + p_comment.add_argument("--number", type=int, required=True) + p_comment.add_argument("--body", required=True) + + p_update = sub.add_parser("update-issue", help="Update issue fields") + p_update.add_argument("--number", type=int, required=True) + p_update.add_argument("--title", default=None) + p_update.add_argument("--body", default=None) + p_update.add_argument("--state", default=None) + + p_create_pr = sub.add_parser("create-pr", help="Create a PR") + p_create_pr.add_argument("--title", required=True) + p_create_pr.add_argument("--head", required=True) + p_create_pr.add_argument("--base", default="main") + p_create_pr.add_argument("--body", default="") + + args = parser.parse_args(argv) + client = GiteaClient(base_url=args.base_url, token=args.token) + + if args.cmd == "issues": + print(_fmt_json(client.list_issues(args.repo, args.state, args.limit))) + elif args.cmd == "issue": + print(_fmt_json(client.get_issue(args.repo, args.number))) + elif args.cmd == "prs": + print(_fmt_json(client.list_prs(args.repo, args.state, args.limit))) + elif args.cmd == "pr": + print(_fmt_json(client.get_pr(args.repo, args.number))) + elif args.cmd == "create-comment": + print(_fmt_json(client.create_comment(args.repo, args.number, args.body))) + elif args.cmd == "update-issue": + fields = {k: v for k, v in {"title": args.title, "body": args.body, "state": args.state}.items() if v is not None} + print(_fmt_json(client.update_issue(args.repo, args.number, **fields))) + elif args.cmd == "create-pr": + print(_fmt_json(client.create_pr(args.repo, args.title, args.head, args.base, args.body))) + else: + parser.print_help() + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/devkit/health.py b/devkit/health.py new file mode 100644 index 000000000..a5ebfa929 --- /dev/null +++ b/devkit/health.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +""" +Fleet health monitor for wizard agents. +Checks local system state and reports structured health metrics. + +Usage as CLI: + python -m devkit.health + python -m devkit.health --threshold-load 1.0 --check-disk + +Usage as module: + from devkit.health import check_health + report = check_health() +""" + +import argparse +import json +import os +import shutil +import subprocess +import sys +import time +from typing import Any, Dict, List + + +def _run(cmd: List[str]) -> str: + try: + return subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode().strip() + except Exception as e: + return f"error: {e}" + + +def check_health(threshold_load: float = 1.0, threshold_disk_percent: float = 90.0) -> Dict[str, Any]: + gather_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + + # Load average + load_raw = _run(["cat", "/proc/loadavg"]) + load_values = [] + avg_load = None + if load_raw.startswith("error:"): + load_status = load_raw + else: + try: + load_values = [float(x) for x in load_raw.split()[:3]] + avg_load = sum(load_values) / len(load_values) + load_status = "critical" if avg_load > threshold_load else "ok" + except Exception as e: + load_status = f"error parsing load: {e}" + + # Disk usage + disk = shutil.disk_usage("/") + disk_percent = (disk.used / disk.total) * 100 if disk.total else 0.0 + disk_status = "critical" if disk_percent > threshold_disk_percent else "ok" + + # Memory + meminfo = _run(["cat", "/proc/meminfo"]) + mem_stats = {} + for line in meminfo.splitlines(): + if ":" in line: + key, val = line.split(":", 1) + mem_stats[key.strip()] = val.strip() + + # Running processes + hermes_pids = [] + try: + ps_out = subprocess.check_output(["pgrep", "-a", "-f", "hermes"]).decode().strip() + hermes_pids = [line.split(None, 1) for line in ps_out.splitlines() if line.strip()] + except subprocess.CalledProcessError: + hermes_pids = [] + + # Python package versions (key ones) + key_packages = ["jupyterlab", "papermill", "requests"] + pkg_versions = {} + for pkg in key_packages: + try: + out = subprocess.check_output([sys.executable, "-m", "pip", "show", pkg], stderr=subprocess.DEVNULL).decode() + for line in out.splitlines(): + if line.startswith("Version:"): + pkg_versions[pkg] = line.split(":", 1)[1].strip() + break + except Exception: + pkg_versions[pkg] = None + + overall = "ok" + if load_status == "critical" or disk_status == "critical": + overall = "critical" + elif not hermes_pids: + overall = "warning" + + return { + "timestamp": gather_time, + "overall": overall, + "load": { + "raw": load_raw if not load_raw.startswith("error:") else None, + "1min": load_values[0] if len(load_values) > 0 else None, + "5min": load_values[1] if len(load_values) > 1 else None, + "15min": load_values[2] if len(load_values) > 2 else None, + "avg": round(avg_load, 3) if avg_load is not None else None, + "threshold": threshold_load, + "status": load_status, + }, + "disk": { + "total_gb": round(disk.total / (1024 ** 3), 2), + "used_gb": round(disk.used / (1024 ** 3), 2), + "free_gb": round(disk.free / (1024 ** 3), 2), + "used_percent": round(disk_percent, 2), + "threshold_percent": threshold_disk_percent, + "status": disk_status, + }, + "memory": mem_stats, + "processes": { + "hermes_count": len(hermes_pids), + "hermes_pids": hermes_pids[:10], + }, + "packages": pkg_versions, + } + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Fleet health monitor") + parser.add_argument("--threshold-load", type=float, default=1.0) + parser.add_argument("--threshold-disk", type=float, default=90.0) + parser.add_argument("--fail-on-critical", action="store_true", help="Exit non-zero if overall is critical") + args = parser.parse_args(argv) + + report = check_health(args.threshold_load, args.threshold_disk) + print(json.dumps(report, indent=2)) + if args.fail_on_critical and report.get("overall") == "critical": + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/devkit/notebook_runner.py b/devkit/notebook_runner.py new file mode 100644 index 000000000..900239647 --- /dev/null +++ b/devkit/notebook_runner.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Notebook execution runner for agent tasks. +Wraps papermill with sensible defaults and structured JSON reporting. + +Usage as CLI: + python -m devkit.notebook_runner notebooks/task.ipynb output.ipynb -p threshold 1.0 + python -m devkit.notebook_runner notebooks/task.ipynb --dry-run + +Usage as module: + from devkit.notebook_runner import run_notebook + result = run_notebook("task.ipynb", "output.ipynb", parameters={"threshold": 1.0}) +""" + +import argparse +import json +import os +import subprocess +import sys +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional + + +def run_notebook( + input_path: str, + output_path: Optional[str] = None, + parameters: Optional[Dict[str, Any]] = None, + kernel: str = "python3", + timeout: Optional[int] = None, + dry_run: bool = False, +) -> Dict[str, Any]: + input_path = str(Path(input_path).expanduser().resolve()) + if output_path is None: + fd, output_path = tempfile.mkstemp(suffix=".ipynb") + os.close(fd) + else: + output_path = str(Path(output_path).expanduser().resolve()) + + if dry_run: + return { + "status": "dry_run", + "input": input_path, + "output": output_path, + "parameters": parameters or {}, + "kernel": kernel, + } + + cmd = ["papermill", input_path, output_path, "--kernel", kernel] + if timeout is not None: + cmd.extend(["--execution-timeout", str(timeout)]) + for key, value in (parameters or {}).items(): + cmd.extend(["-p", key, str(value)]) + + start = os.times() + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + ) + end = os.times() + return { + "status": "ok", + "input": input_path, + "output": output_path, + "parameters": parameters or {}, + "kernel": kernel, + "elapsed_seconds": round((end.elapsed - start.elapsed), 2), + "stdout": proc.stdout[-2000:] if proc.stdout else "", + } + except subprocess.CalledProcessError as e: + end = os.times() + return { + "status": "error", + "input": input_path, + "output": output_path, + "parameters": parameters or {}, + "kernel": kernel, + "elapsed_seconds": round((end.elapsed - start.elapsed), 2), + "stdout": e.stdout[-2000:] if e.stdout else "", + "stderr": e.stderr[-2000:] if e.stderr else "", + "returncode": e.returncode, + } + except FileNotFoundError: + return { + "status": "error", + "message": "papermill not found. Install with: uv tool install papermill", + } + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Notebook runner for agents") + parser.add_argument("input", help="Input notebook path") + parser.add_argument("output", nargs="?", default=None, help="Output notebook path") + parser.add_argument("-p", "--parameter", action="append", default=[], help="Parameters as key=value") + parser.add_argument("--kernel", default="python3") + parser.add_argument("--timeout", type=int, default=None) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args(argv) + + parameters = {} + for raw in args.parameter: + if "=" not in raw: + print(f"Invalid parameter (expected key=value): {raw}", file=sys.stderr) + return 1 + k, v = raw.split("=", 1) + # Best-effort type inference + if v.lower() in ("true", "false"): + v = v.lower() == "true" + else: + try: + v = int(v) + except ValueError: + try: + v = float(v) + except ValueError: + pass + parameters[k] = v + + result = run_notebook( + args.input, + args.output, + parameters=parameters, + kernel=args.kernel, + timeout=args.timeout, + dry_run=args.dry_run, + ) + print(json.dumps(result, indent=2)) + return 0 if result.get("status") == "ok" else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/devkit/secret_scan.py b/devkit/secret_scan.py new file mode 100644 index 000000000..f776aa316 --- /dev/null +++ b/devkit/secret_scan.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +Fast secret leak scanner for the repository. +Checks for common patterns that should never be committed. + +Usage as CLI: + python -m devkit.secret_scan + python -m devkit.secret_scan --path /some/repo --fail-on-find + +Usage as module: + from devkit.secret_scan import scan + findings = scan("/path/to/repo") +""" + +import argparse +import json +import os +import re +import sys +from pathlib import Path +from typing import Any, Dict, List + +# Patterns to flag +PATTERNS = { + "aws_access_key_id": re.compile(r"AKIA[0-9A-Z]{16}"), + "aws_secret_key": re.compile(r"['\"\s][0-9a-zA-Z/+]{40}['\"\s]"), + "generic_api_key": re.compile(r"api[_-]?key\s*[:=]\s*['\"][a-zA-Z0-9_\-]{20,}['\"]", re.IGNORECASE), + "private_key": re.compile(r"-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----"), + "github_token": re.compile(r"gh[pousr]_[A-Za-z0-9_]{36,}"), + "gitea_token": re.compile(r"[0-9a-f]{40}"), # heuristic for long hex strings after "token" + "telegram_bot_token": re.compile(r"[0-9]{9,}:[A-Za-z0-9_-]{35,}"), +} + +# Files and paths to skip +SKIP_PATHS = [ + ".git", + "__pycache__", + ".pytest_cache", + "node_modules", + "venv", + ".env", + ".agent-skills", +] + +# Max file size to scan (bytes) +MAX_FILE_SIZE = 1024 * 1024 + + +def _should_skip(path: Path) -> bool: + for skip in SKIP_PATHS: + if skip in path.parts: + return True + return False + + +def scan(root: str = ".") -> List[Dict[str, Any]]: + root_path = Path(root).resolve() + findings = [] + for file_path in root_path.rglob("*"): + if not file_path.is_file(): + continue + if _should_skip(file_path): + continue + if file_path.stat().st_size > MAX_FILE_SIZE: + continue + try: + text = file_path.read_text(encoding="utf-8", errors="ignore") + except Exception: + continue + for pattern_name, pattern in PATTERNS.items(): + for match in pattern.finditer(text): + # Simple context: line around match + start = max(0, match.start() - 40) + end = min(len(text), match.end() + 40) + context = text[start:end].replace("\n", " ") + findings.append({ + "file": str(file_path.relative_to(root_path)), + "pattern": pattern_name, + "line": text[:match.start()].count("\n") + 1, + "context": context, + }) + return findings + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Secret leak scanner") + parser.add_argument("--path", default=".", help="Repository root to scan") + parser.add_argument("--fail-on-find", action="store_true", help="Exit non-zero if secrets found") + parser.add_argument("--json", action="store_true", help="Output as JSON") + args = parser.parse_args(argv) + + findings = scan(args.path) + if args.json: + print(json.dumps({"findings": findings, "count": len(findings)}, indent=2)) + else: + print(f"Scanned {args.path}") + print(f"Findings: {len(findings)}") + for f in findings: + print(f" [{f['pattern']}] {f['file']}:{f['line']} -> ...{f['context']}...") + + if args.fail_on_find and findings: + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/devkit/smoke_test.py b/devkit/smoke_test.py new file mode 100644 index 000000000..830c0190b --- /dev/null +++ b/devkit/smoke_test.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +Shared smoke test runner for hermes-agent. +Fast checks that catch obvious breakage without maintenance burden. + +Usage as CLI: + python -m devkit.smoke_test + python -m devkit.smoke_test --verbose + +Usage as module: + from devkit.smoke_test import run_smoke_tests + results = run_smoke_tests() +""" + +import argparse +import importlib +import json +import subprocess +import sys +from pathlib import Path +from typing import Any, Dict, List + + +HERMES_ROOT = Path(__file__).resolve().parent.parent + + +def _test_imports() -> Dict[str, Any]: + modules = [ + "hermes_constants", + "hermes_state", + "cli", + "tools.skills_sync", + "tools.skills_hub", + ] + errors = [] + for mod in modules: + try: + importlib.import_module(mod) + except Exception as e: + errors.append({"module": mod, "error": str(e)}) + return { + "name": "core_imports", + "status": "ok" if not errors else "fail", + "errors": errors, + } + + +def _test_cli_entrypoints() -> Dict[str, Any]: + entrypoints = [ + [sys.executable, "-m", "cli", "--help"], + ] + errors = [] + for cmd in entrypoints: + try: + subprocess.run(cmd, capture_output=True, text=True, check=True, cwd=HERMES_ROOT) + except subprocess.CalledProcessError as e: + errors.append({"cmd": cmd, "error": f"exit {e.returncode}"}) + except Exception as e: + errors.append({"cmd": cmd, "error": str(e)}) + return { + "name": "cli_entrypoints", + "status": "ok" if not errors else "fail", + "errors": errors, + } + + +def _test_green_path_e2e() -> Dict[str, Any]: + """One bare green-path E2E: terminal_tool echo hello.""" + try: + from tools.terminal_tool import terminal + result = terminal(command="echo hello") + output = result.get("output", "") + if "hello" in output.lower(): + return {"name": "green_path_e2e", "status": "ok", "output": output.strip()} + return {"name": "green_path_e2e", "status": "fail", "error": f"Unexpected output: {output}"} + except Exception as e: + return {"name": "green_path_e2e", "status": "fail", "error": str(e)} + + +def run_smoke_tests(verbose: bool = False) -> Dict[str, Any]: + tests = [ + _test_imports(), + _test_cli_entrypoints(), + _test_green_path_e2e(), + ] + failed = [t for t in tests if t["status"] != "ok"] + result = { + "overall": "ok" if not failed else "fail", + "tests": tests, + "failed_count": len(failed), + } + if verbose: + print(json.dumps(result, indent=2)) + return result + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Smoke test runner") + parser.add_argument("--verbose", action="store_true") + args = parser.parse_args(argv) + + result = run_smoke_tests(verbose=True) + return 0 if result["overall"] == "ok" else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/devkit/wizard_env.py b/devkit/wizard_env.py new file mode 100644 index 000000000..f4c8bf47b --- /dev/null +++ b/devkit/wizard_env.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +""" +Wizard environment validator. +Checks that a new wizard environment is ready for duty. + +Usage as CLI: + python -m devkit.wizard_env + python -m devkit.wizard_env --fix + +Usage as module: + from devkit.wizard_env import validate + report = validate() +""" + +import argparse +import json +import os +import shutil +import subprocess +import sys +from typing import Any, Dict, List + + +def _has_cmd(name: str) -> bool: + return shutil.which(name) is not None + + +def _check_env_var(name: str) -> Dict[str, Any]: + value = os.getenv(name) + return { + "name": name, + "status": "ok" if value else "missing", + "value": value[:10] + "..." if value and len(value) > 20 else value, + } + + +def _check_python_pkg(name: str) -> Dict[str, Any]: + try: + __import__(name) + return {"name": name, "status": "ok"} + except ImportError: + return {"name": name, "status": "missing"} + + +def validate() -> Dict[str, Any]: + checks = { + "binaries": [ + {"name": "python3", "status": "ok" if _has_cmd("python3") else "missing"}, + {"name": "git", "status": "ok" if _has_cmd("git") else "missing"}, + {"name": "curl", "status": "ok" if _has_cmd("curl") else "missing"}, + {"name": "jupyter-lab", "status": "ok" if _has_cmd("jupyter-lab") else "missing"}, + {"name": "papermill", "status": "ok" if _has_cmd("papermill") else "missing"}, + {"name": "jupytext", "status": "ok" if _has_cmd("jupytext") else "missing"}, + ], + "env_vars": [ + _check_env_var("GITEA_URL"), + _check_env_var("GITEA_TOKEN"), + _check_env_var("TELEGRAM_BOT_TOKEN"), + ], + "python_packages": [ + _check_python_pkg("requests"), + _check_python_pkg("jupyter_server"), + _check_python_pkg("nbformat"), + ], + } + + all_ok = all( + c["status"] == "ok" + for group in checks.values() + for c in group + ) + + # Hermes-specific checks + hermes_home = os.path.expanduser("~/.hermes") + checks["hermes"] = [ + {"name": "config.yaml", "status": "ok" if os.path.exists(f"{hermes_home}/config.yaml") else "missing"}, + {"name": "skills_dir", "status": "ok" if os.path.exists(f"{hermes_home}/skills") else "missing"}, + ] + + all_ok = all_ok and all(c["status"] == "ok" for c in checks["hermes"]) + + return { + "overall": "ok" if all_ok else "incomplete", + "checks": checks, + } + + +def main(argv: List[str] = None) -> int: + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description="Wizard environment validator") + parser.add_argument("--json", action="store_true") + parser.add_argument("--fail-on-incomplete", action="store_true") + args = parser.parse_args(argv) + + report = validate() + if args.json: + print(json.dumps(report, indent=2)) + else: + print(f"Wizard Environment: {report['overall']}") + for group, items in report["checks"].items(): + print(f"\n[{group}]") + for item in items: + status_icon = "✅" if item["status"] == "ok" else "❌" + print(f" {status_icon} {item['name']}: {item['status']}") + + if args.fail_on_incomplete and report["overall"] != "ok": + return 1 + return 0 + + +if __name__ == "__main__": + sys.exit(main())