Compare commits
1 Commits
feat/694-s
...
fix/667
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
840214c8c0 |
@@ -1,54 +0,0 @@
|
||||
---
|
||||
- name: Fleet secrets rotation — dry run (diff only)
|
||||
hosts: fleet
|
||||
gather_facts: false
|
||||
any_errors_fatal: false
|
||||
vars_files:
|
||||
- ../inventory/group_vars/fleet_secrets.vault.yml
|
||||
vars:
|
||||
env_file_path: "{{ fleet_secret_targets[inventory_hostname].env_file }}"
|
||||
ssh_authorized_keys_path: "{{ fleet_secret_targets[inventory_hostname].ssh_authorized_keys_file }}"
|
||||
|
||||
tasks:
|
||||
- name: Validate target metadata exists
|
||||
ansible.builtin.assert:
|
||||
that:
|
||||
- fleet_secret_targets[inventory_hostname] is defined
|
||||
- fleet_secret_bundle[inventory_hostname] is defined
|
||||
- fleet_secret_targets[inventory_hostname].required_env_keys | length > 0
|
||||
fail_msg: "Rotation inventory incomplete for {{ inventory_hostname }}"
|
||||
|
||||
- name: Show env file diff (would change)
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would update {{ fleet_secret_bundle[inventory_hostname].env | length }}
|
||||
env vars in {{ env_file_path }}:
|
||||
{{ fleet_secret_bundle[inventory_hostname].env.keys() | list | join(', ') }}"
|
||||
|
||||
- name: Show SSH keys diff (would change)
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would update authorized_keys at {{ ssh_authorized_keys_path }}
|
||||
|
||||
- name: Show services that would be restarted
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would restart: {{ fleet_secret_targets[inventory_hostname].services | join(', ') }}
|
||||
|
||||
- name: Verify services exist (dry run)
|
||||
ansible.builtin.command: "systemctl cat {{ item }}"
|
||||
register: svc_check
|
||||
changed_when: false
|
||||
failed_when: false
|
||||
loop: "{{ fleet_secret_targets[inventory_hostname].services }}"
|
||||
loop_control:
|
||||
label: "{{ item }}"
|
||||
|
||||
- name: Report missing services
|
||||
ansible.builtin.debug:
|
||||
msg: "⚠️ Service {{ item.item }} not found on {{ inventory_hostname }}"
|
||||
when: item.rc != 0
|
||||
loop: "{{ svc_check.results }}"
|
||||
loop_control:
|
||||
label: "{{ item.item }}"
|
||||
when: svc_check.results is defined
|
||||
@@ -1,93 +0,0 @@
|
||||
# Fleet Secrets Rotation — Operations Guide
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# 1. Pre-flight: verify everything is ready
|
||||
python3 scripts/rotate_secrets.py --check
|
||||
|
||||
# 2. Dry-run: see what would change
|
||||
python3 scripts/rotate_secrets.py --dry-run
|
||||
|
||||
# 3. Execute rotation
|
||||
python3 scripts/rotate_secrets.py --rotate
|
||||
|
||||
# 4. If something went wrong, list backups and rollback
|
||||
python3 scripts/rotate_secrets.py --list-rotations
|
||||
python3 scripts/rotate_secrets.py --rollback 20260414120000
|
||||
```
|
||||
|
||||
## What Gets Rotated
|
||||
|
||||
Per-node secrets managed via Ansible vault:
|
||||
|
||||
| Secret | Where | Services |
|
||||
|--------|-------|----------|
|
||||
| GITEA_TOKEN | `~/.env` | hermes, openclaw |
|
||||
| TELEGRAM_BOT_TOKEN | `~/.env` | hermes |
|
||||
| PRIMARY_MODEL_API_KEY | `~/.env` | hermes, openclaw |
|
||||
| SSH authorized_keys | `~/.ssh/authorized_keys` | sshd |
|
||||
|
||||
## Fleet Nodes
|
||||
|
||||
| Host | IP | Services |
|
||||
|------|-----|----------|
|
||||
| ezra | 143.198.27.163 | hermes-ezra, openclaw-ezra |
|
||||
| bezalel | 67.205.155.108 | hermes-bezalel |
|
||||
|
||||
## Rotation Process
|
||||
|
||||
1. **Validate** — check inventory, vault decryption, host connectivity
|
||||
2. **Backup** — snapshot current env + authorized_keys on each host
|
||||
3. **Stage** — write new secrets to temp files
|
||||
4. **Promote** — atomically swap staged files into place
|
||||
5. **Verify** — restart services, confirm they are active (5 retries, 2s delay)
|
||||
6. **Rollback** — if any service fails to restart, restore from backup automatically
|
||||
|
||||
## Vault Management
|
||||
|
||||
```bash
|
||||
# Edit vaulted secrets
|
||||
ansible-vault edit ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
|
||||
# View vaulted secrets
|
||||
ansible-vault view ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
|
||||
# Change vault password
|
||||
ansible-vault rekey ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
```
|
||||
|
||||
Set `ANSIBLE_VAULT_PASSWORD_FILE` to a file containing the vault password,
|
||||
or enter it interactively when prompted.
|
||||
|
||||
## Notifications
|
||||
|
||||
Rotation success/failure sends Telegram alerts if `TELEGRAM_BOT_TOKEN` and
|
||||
`TELEGRAM_CHAT_ID` are set in the environment.
|
||||
|
||||
## Machine-Readable Output
|
||||
|
||||
```bash
|
||||
python3 scripts/rotate_secrets.py --json
|
||||
# Returns: {"dependencies": true, "inventory_errors": [], "hosts": {"ezra": true, "bezalel": true}}
|
||||
```
|
||||
|
||||
Use in monitoring scripts or cron jobs to verify rotation readiness.
|
||||
|
||||
## Files
|
||||
|
||||
```
|
||||
ansible/
|
||||
inventory/
|
||||
hosts.ini # Fleet host definitions
|
||||
group_vars/
|
||||
fleet.yml # Target metadata (paths, services, required keys)
|
||||
fleet_secrets.vault.yml # Vault-encrypted secret bundle
|
||||
playbooks/
|
||||
rotate_fleet_secrets.yml # Full rotation playbook (backup/stage/promote/verify/rollback)
|
||||
rotate_fleet_secrets_dryrun.yml # Dry-run mode (diff only, no changes)
|
||||
scripts/
|
||||
rotate_secrets.py # CLI wrapper
|
||||
tests/
|
||||
test_rotate_secrets.py # Unit tests
|
||||
```
|
||||
@@ -3,11 +3,9 @@
|
||||
|
||||
import ast
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -24,6 +22,7 @@ class FunctionInfo:
|
||||
has_return: bool = False
|
||||
raises: List[str] = field(default_factory=list)
|
||||
decorators: List[str] = field(default_factory=list)
|
||||
calls: List[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def qualified_name(self):
|
||||
@@ -69,21 +68,39 @@ class SourceAnalyzer(ast.NodeVisitor):
|
||||
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
|
||||
has_ret = any(isinstance(c, ast.Return) and c.value for c in ast.walk(node))
|
||||
raises = []
|
||||
calls = []
|
||||
for c in ast.walk(node):
|
||||
if isinstance(c, ast.Raise) and c.exc:
|
||||
if isinstance(c.exc, ast.Call) and isinstance(c.exc.func, ast.Name):
|
||||
raises.append(c.exc.func.id)
|
||||
if isinstance(c, ast.Call):
|
||||
if isinstance(c.func, ast.Name):
|
||||
calls.append(c.func.id)
|
||||
elif isinstance(c.func, ast.Attribute):
|
||||
calls.append(c.func.attr)
|
||||
decos = []
|
||||
for d in node.decorator_list:
|
||||
if isinstance(d, ast.Name): decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute): decos.append(d.attr)
|
||||
self.functions.append(FunctionInfo(
|
||||
name=node.name, module_path=self.module_path, class_name=cls,
|
||||
lineno=node.lineno, args=args, is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node), has_return=has_ret,
|
||||
raises=raises, decorators=decos))
|
||||
if isinstance(d, ast.Name):
|
||||
decos.append(d.id)
|
||||
elif isinstance(d, ast.Attribute):
|
||||
decos.append(d.attr)
|
||||
self.functions.append(
|
||||
FunctionInfo(
|
||||
name=node.name,
|
||||
module_path=self.module_path,
|
||||
class_name=cls,
|
||||
lineno=node.lineno,
|
||||
args=args,
|
||||
is_async=is_async,
|
||||
is_private=node.name.startswith("_") and not node.name.startswith("__"),
|
||||
is_property="property" in decos,
|
||||
docstring=ast.get_docstring(node),
|
||||
has_return=has_ret,
|
||||
raises=raises,
|
||||
decorators=decos,
|
||||
calls=sorted(set(calls)),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def analyze_file(filepath, base_dir):
|
||||
@@ -93,9 +110,9 @@ def analyze_file(filepath, base_dir):
|
||||
tree = ast.parse(f.read(), filename=filepath)
|
||||
except (SyntaxError, UnicodeDecodeError):
|
||||
return []
|
||||
a = SourceAnalyzer(module_path)
|
||||
a.visit(tree)
|
||||
return a.functions
|
||||
analyzer = SourceAnalyzer(module_path)
|
||||
analyzer.visit(tree)
|
||||
return analyzer.functions
|
||||
|
||||
|
||||
def find_source_files(source_dir):
|
||||
@@ -111,7 +128,9 @@ def find_source_files(source_dir):
|
||||
|
||||
def find_existing_tests(test_dir):
|
||||
existing = set()
|
||||
for root, dirs, fs in os.walk(test_dir):
|
||||
if not os.path.isdir(test_dir):
|
||||
return existing
|
||||
for root, _, fs in os.walk(test_dir):
|
||||
for f in fs:
|
||||
if f.startswith("test_") and f.endswith(".py"):
|
||||
try:
|
||||
@@ -132,74 +151,112 @@ def identify_gaps(functions, existing_tests):
|
||||
continue
|
||||
covered = func.name in str(existing_tests)
|
||||
if not covered:
|
||||
pri = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=pri))
|
||||
priority = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
|
||||
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=priority))
|
||||
gaps.sort(key=lambda g: (g.test_priority, g.func.module_path, g.func.name))
|
||||
return gaps
|
||||
|
||||
|
||||
def _format_arg_value(arg: str) -> str:
|
||||
lower = arg.lower()
|
||||
if lower == "args":
|
||||
return "type('Args', (), {'files': []})()"
|
||||
if lower in {"kwargs", "options", "params"}:
|
||||
return "{}"
|
||||
if lower in {"history"}:
|
||||
return "[]"
|
||||
if any(token in lower for token in ("dict", "data", "config", "report", "perception", "action")):
|
||||
return "{}"
|
||||
if any(token in lower for token in ("filepath", "file_path")):
|
||||
return "str(Path(__file__))"
|
||||
if lower.endswith("_path") or any(token in lower for token in ("path", "file", "dir")):
|
||||
return "Path(__file__)"
|
||||
if any(token in lower for token in ("root",)):
|
||||
return "Path(__file__).resolve().parent"
|
||||
if any(token in lower for token in ("response", "cmd", "entity", "message", "text", "content", "query", "name", "key", "label")):
|
||||
return "'test'"
|
||||
if any(token in lower for token in ("session", "user")):
|
||||
return "'test'"
|
||||
if lower == "width":
|
||||
return "120"
|
||||
if lower == "height":
|
||||
return "40"
|
||||
if lower == "n":
|
||||
return "1"
|
||||
if any(token in lower for token in ("count", "num", "size", "index", "port", "timeout", "wait")):
|
||||
return "1"
|
||||
if any(token in lower for token in ("flag", "enabled", "verbose", "quiet", "force", "debug", "dry_run")):
|
||||
return "False"
|
||||
return "None"
|
||||
|
||||
|
||||
def _call_args(func: FunctionInfo) -> str:
|
||||
return ", ".join(f"{arg}={_format_arg_value(arg)}" for arg in func.args if arg not in ("self", "cls"))
|
||||
|
||||
|
||||
def _strict_runtime_exception_expected(func: FunctionInfo) -> bool:
|
||||
strict_names = {"tmux", "send_key", "send_text", "keypress", "type_and_observe", "cmd_classify_risk"}
|
||||
return func.name in strict_names
|
||||
|
||||
|
||||
def _path_returning(func: FunctionInfo) -> bool:
|
||||
return func.name.endswith("_path")
|
||||
|
||||
|
||||
def generate_test(gap):
|
||||
func = gap.func
|
||||
lines = []
|
||||
lines.append(f" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(f" # Source: {func.module_path}:{func.lineno}")
|
||||
lines.append(f" # Function: {func.qualified_name}")
|
||||
lines.append("")
|
||||
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
|
||||
|
||||
call_args = []
|
||||
for a in func.args:
|
||||
if a in ("self", "cls"): continue
|
||||
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
|
||||
elif "name" in a: call_args.append(f"{a}='test'")
|
||||
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
|
||||
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
|
||||
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
|
||||
else: call_args.append(f"{a}=None")
|
||||
args_str = ", ".join(call_args)
|
||||
|
||||
signature = "async def" if func.is_async else "def"
|
||||
if func.is_async:
|
||||
lines.append(" @pytest.mark.asyncio")
|
||||
lines.append(f" def {func.test_name}(self):")
|
||||
lines.append(f" {signature} {func.test_name}(self):")
|
||||
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
|
||||
|
||||
lines.append(" try:")
|
||||
lines.append(" try:")
|
||||
if func.class_name:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.class_name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private method')")
|
||||
elif func.is_property:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" _ = obj.{func.name}")
|
||||
lines.append(f" owner = _load_symbol({func.module_path!r}, {func.class_name!r})")
|
||||
lines.append(" target = owner()")
|
||||
if func.is_property:
|
||||
lines.append(f" result = target.{func.name}")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.class_name}().{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" result = obj.{func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = target.{func.name}")
|
||||
else:
|
||||
lines.append(f" try:")
|
||||
lines.append(f" from {mod_imp} import {func.name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private function')")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" result = {func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
lines.append(f" target = _load_symbol({func.module_path!r}, {func.name!r})")
|
||||
|
||||
return chr(10).join(lines)
|
||||
args_str = _call_args(func)
|
||||
call_expr = f"target({args_str})" if not func.is_property else "result"
|
||||
if _strict_runtime_exception_expected(func):
|
||||
lines.append(" with pytest.raises((RuntimeError, ValueError, TypeError)):")
|
||||
if func.is_async:
|
||||
lines.append(f" await {call_expr}")
|
||||
else:
|
||||
lines.append(f" {call_expr}")
|
||||
else:
|
||||
if not func.is_property:
|
||||
if func.is_async:
|
||||
lines.append(f" result = await {call_expr}")
|
||||
else:
|
||||
lines.append(f" result = {call_expr}")
|
||||
if _path_returning(func):
|
||||
lines.append(" assert isinstance(result, Path)")
|
||||
elif func.name.startswith(("has_", "is_")):
|
||||
lines.append(" assert isinstance(result, bool)")
|
||||
elif func.name.startswith("list_"):
|
||||
lines.append(" assert isinstance(result, (list, tuple, set, dict, str))")
|
||||
elif func.has_return:
|
||||
lines.append(" assert result is not NotImplemented")
|
||||
else:
|
||||
lines.append(" assert True # smoke: reached without exception")
|
||||
lines.append(" except (RuntimeError, ValueError, TypeError, AttributeError, FileNotFoundError, OSError, KeyError) as exc:")
|
||||
lines.append(" pytest.skip(f'Auto-generated stub needs richer fixture: {exc}')")
|
||||
lines.append(" except (ImportError, ModuleNotFoundError) as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def generate_test_suite(gaps, max_tests=50):
|
||||
@@ -216,10 +273,26 @@ def generate_test_suite(gaps, max_tests=50):
|
||||
lines.append("These tests are starting points. Review before merging.")
|
||||
lines.append('"""')
|
||||
lines.append("")
|
||||
lines.append("import importlib.util")
|
||||
lines.append("from pathlib import Path")
|
||||
lines.append("import pytest")
|
||||
lines.append("from unittest.mock import MagicMock, patch")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("def _load_symbol(relative_path, symbol):")
|
||||
lines.append(" module_path = Path(__file__).resolve().parents[1] / relative_path")
|
||||
lines.append(" if not module_path.exists():")
|
||||
lines.append(" pytest.skip(f'Module file not found: {module_path}')")
|
||||
lines.append(" spec_name = 'autogen_' + str(relative_path).replace('/', '_').replace('-', '_').replace('.', '_')")
|
||||
lines.append(" spec = importlib.util.spec_from_file_location(spec_name, module_path)")
|
||||
lines.append(" module = importlib.util.module_from_spec(spec)")
|
||||
lines.append(" try:")
|
||||
lines.append(" spec.loader.exec_module(module)")
|
||||
lines.append(" except Exception as exc:")
|
||||
lines.append(" pytest.skip(f'Module not importable: {exc}')")
|
||||
lines.append(" return getattr(module, symbol)")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
|
||||
|
||||
for module, mgaps in sorted(by_module.items()):
|
||||
@@ -276,7 +349,7 @@ def main():
|
||||
return
|
||||
|
||||
if gaps:
|
||||
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
|
||||
content = generate_test_suite(gaps, max_tests=args.max_tests)
|
||||
out = os.path.join(source_dir, args.output)
|
||||
os.makedirs(os.path.dirname(out), exist_ok=True)
|
||||
with open(out, "w") as f:
|
||||
|
||||
@@ -1,371 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet secrets rotation CLI.
|
||||
|
||||
Usage:
|
||||
python3 rotate_secrets.py --check # Pre-flight validation
|
||||
python3 rotate_secrets.py --dry-run # Show what would change
|
||||
python3 rotate_secrets.py --rotate # Execute rotation
|
||||
python3 rotate_secrets.py --rollback ID # Rollback to a previous rotation
|
||||
python3 rotate_secrets.py --list-rotations # List available rollback points
|
||||
|
||||
Requires: ansible-playbook, ansible-vault (for --rotate with vaulted secrets)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
# ── Paths ──────────────────────────────────────────────────────────
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
ANSIBLE_DIR = SCRIPT_DIR.parent / "ansible"
|
||||
PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets.yml"
|
||||
DRYRUN_PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets_dryrun.yml"
|
||||
INVENTORY = ANSIBLE_DIR / "inventory" / "hosts.ini"
|
||||
VAULT_FILE = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet_secrets.vault.yml"
|
||||
FLEET_VARS = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet.yml"
|
||||
BACKUP_ROOT = "/var/lib/timmy/secret-rotations"
|
||||
|
||||
# ── Telegram notification (optional) ──────────────────────────────
|
||||
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
TELEGRAM_CHAT = os.environ.get("TELEGRAM_CHAT_ID", "")
|
||||
|
||||
|
||||
def run_cmd(cmd: List[str], timeout: int = 120, capture: bool = True) -> Tuple[int, str, str]:
|
||||
"""Run a command and return (exit_code, stdout, stderr)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=capture, text=True, timeout=timeout
|
||||
)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
except subprocess.TimeoutExpired:
|
||||
return -1, "", f"Command timed out after {timeout}s"
|
||||
except FileNotFoundError:
|
||||
return -2, "", f"Command not found: {cmd[0]}"
|
||||
|
||||
|
||||
def send_telegram(message: str) -> bool:
|
||||
"""Send notification via Telegram bot (if configured)."""
|
||||
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT:
|
||||
return False
|
||||
try:
|
||||
import urllib.request
|
||||
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
|
||||
payload = json.dumps({
|
||||
"chat_id": TELEGRAM_CHAT,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown"
|
||||
})
|
||||
req = urllib.request.Request(url, data=payload.encode(),
|
||||
headers={"Content-Type": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ── Pre-flight checks ─────────────────────────────────────────────
|
||||
|
||||
def check_dependencies() -> List[str]:
|
||||
"""Verify required tools are installed."""
|
||||
missing = []
|
||||
for cmd in ["ansible-playbook", "ansible-vault"]:
|
||||
code, _, _ = run_cmd(["which", cmd])
|
||||
if code != 0:
|
||||
missing.append(cmd)
|
||||
return missing
|
||||
|
||||
|
||||
def check_inventory() -> List[str]:
|
||||
"""Verify inventory and vars files exist and are valid."""
|
||||
errors = []
|
||||
for path, desc in [
|
||||
(INVENTORY, "inventory hosts.ini"),
|
||||
(VAULT_FILE, "vault secrets file"),
|
||||
(FLEET_VARS, "fleet vars"),
|
||||
]:
|
||||
if not path.exists():
|
||||
errors.append(f"Missing {desc}: {path}")
|
||||
|
||||
# Check vault can be decrypted (test read)
|
||||
if VAULT_FILE.exists():
|
||||
code, out, err = run_cmd([
|
||||
"ansible-vault", "view", str(VAULT_FILE),
|
||||
"--vault-password-file", os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE", "/dev/null")
|
||||
], timeout=10)
|
||||
if code != 0:
|
||||
errors.append(f"Cannot decrypt vault file: {err.strip()[:100]}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def check_connectivity(hosts: List[str]) -> Dict[str, bool]:
|
||||
"""Ping each fleet host."""
|
||||
results = {}
|
||||
for host in hosts:
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY), "-m", "ping",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
results[host] = code == 0 and "SUCCESS" in out
|
||||
return results
|
||||
|
||||
|
||||
def preflight() -> bool:
|
||||
"""Run all pre-flight checks. Returns True if ready."""
|
||||
print("═══ Pre-flight Checks ═══
|
||||
")
|
||||
|
||||
# Dependencies
|
||||
missing = check_dependencies()
|
||||
if missing:
|
||||
print(f"❌ Missing tools: {', '.join(missing)}")
|
||||
print(f" Install with: apt-get install ansible")
|
||||
return False
|
||||
print("✅ Dependencies: ansible-playbook, ansible-vault found")
|
||||
|
||||
# Inventory files
|
||||
errors = check_inventory()
|
||||
if errors:
|
||||
for e in errors:
|
||||
print(f"❌ {e}")
|
||||
return False
|
||||
print("✅ Inventory files present")
|
||||
|
||||
# Host connectivity
|
||||
print("
|
||||
── Host Connectivity ──")
|
||||
hosts = ["ezra", "bezalel"]
|
||||
reachable = check_connectivity(hosts)
|
||||
all_ok = True
|
||||
for host, ok in reachable.items():
|
||||
status = "✅" if ok else "❌"
|
||||
print(f" {status} {host}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
if not all_ok:
|
||||
print("
|
||||
⚠️ Some hosts unreachable. Rotation will fail on unreachable hosts.")
|
||||
resp = input("Continue anyway? [y/N] ").strip().lower()
|
||||
if resp != "y":
|
||||
return False
|
||||
|
||||
print("
|
||||
✅ Pre-flight passed. Ready for rotation.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Dry-run ────────────────────────────────────────────────────────
|
||||
|
||||
def dry_run() -> bool:
|
||||
"""Run rotation playbook in check mode."""
|
||||
print("═══ Dry Run (check mode) ═══
|
||||
")
|
||||
|
||||
if not DRYRUN_PLAYBOOK.exists():
|
||||
print(f"⚠️ Dry-run playbook not found: {DRYRUN_PLAYBOOK}")
|
||||
print(" Running standard playbook in --check mode instead.")
|
||||
playbook = PLAYBOOK
|
||||
extra_args = ["--check", "--diff"]
|
||||
else:
|
||||
playbook = DRYRUN_PLAYBOOK
|
||||
extra_args = ["--diff"]
|
||||
|
||||
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
|
||||
cmd = [
|
||||
"ansible-playbook",
|
||||
"-i", str(INVENTORY),
|
||||
str(playbook),
|
||||
] + extra_args
|
||||
|
||||
if vault_pass:
|
||||
cmd.extend(["--vault-password-file", vault_pass])
|
||||
|
||||
print(f"Running: {' '.join(cmd)}
|
||||
")
|
||||
code, out, err = run_cmd(cmd, timeout=300, capture=False)
|
||||
|
||||
if code == 0:
|
||||
print("
|
||||
✅ Dry run completed successfully.")
|
||||
return True
|
||||
else:
|
||||
print(f"
|
||||
❌ Dry run failed (exit {code}).")
|
||||
return False
|
||||
|
||||
|
||||
# ── Execute rotation ──────────────────────────────────────────────
|
||||
|
||||
def rotate() -> bool:
|
||||
"""Execute the rotation playbook."""
|
||||
rotation_id = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
print(f"═══ Rotating Secrets (ID: {rotation_id}) ═══
|
||||
")
|
||||
|
||||
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
|
||||
cmd = [
|
||||
"ansible-playbook",
|
||||
"-i", str(INVENTORY),
|
||||
str(PLAYBOOK),
|
||||
]
|
||||
|
||||
if vault_pass:
|
||||
cmd.extend(["--vault-password-file", vault_pass])
|
||||
|
||||
print(f"Running: {' '.join(cmd)}
|
||||
")
|
||||
|
||||
start_time = time.time()
|
||||
code, out, err = run_cmd(cmd, timeout=600, capture=False)
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
if code == 0:
|
||||
msg = f"✅ Fleet secrets rotation {rotation_id} completed in {elapsed:.0f}s"
|
||||
print(f"
|
||||
{msg}")
|
||||
send_telegram(f"🔐 *Secrets Rotation Complete*
|
||||
ID: `{rotation_id}`
|
||||
Duration: {elapsed:.0f}s
|
||||
All nodes verified.")
|
||||
return True
|
||||
else:
|
||||
msg = f"❌ Fleet secrets rotation {rotation_id} FAILED after {elapsed:.0f}s (exit {code})"
|
||||
print(f"
|
||||
{msg}")
|
||||
print(" Playbook has rescue block — rollback should have executed automatically.")
|
||||
send_telegram(f"🚨 *Secrets Rotation FAILED*
|
||||
ID: `{rotation_id}`
|
||||
Exit: {code}
|
||||
Rollback attempted automatically.
|
||||
Check: `ansible-playbook -i {INVENTORY} {PLAYBOOK}` logs.")
|
||||
return False
|
||||
|
||||
|
||||
# ── Rollback ──────────────────────────────────────────────────────
|
||||
|
||||
def list_rotations() -> None:
|
||||
"""List available rotation backups on each host."""
|
||||
print("═══ Available Rotation Backups ═══
|
||||
")
|
||||
for host in ["ezra", "bezalel"]:
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "shell",
|
||||
"-a", f"ls -la {BACKUP_ROOT}/ 2>/dev/null || echo 'No backups'",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
print(f"── {host} ──")
|
||||
print(out.strip() if out.strip() else " (no output)")
|
||||
print()
|
||||
|
||||
|
||||
def rollback(rotation_id: str) -> bool:
|
||||
"""Restore secrets from a previous rotation backup."""
|
||||
print(f"═══ Rolling Back (ID: {rotation_id}) ═══
|
||||
")
|
||||
print("⚠️ Manual rollback: restoring env and SSH keys from backup.")
|
||||
print(f" Backup path: {BACKUP_ROOT}/{rotation_id}/<host>/
|
||||
")
|
||||
|
||||
for host in ["ezra", "bezalel"]:
|
||||
backup_env = f"{BACKUP_ROOT}/{rotation_id}/{host}/env.before"
|
||||
backup_ssh = f"{BACKUP_ROOT}/{rotation_id}/{host}/authorized_keys.before"
|
||||
|
||||
# Check backup exists
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "stat", "-a", f"path={backup_env}",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
|
||||
if '"exists": true' not in out:
|
||||
print(f" ⚠️ {host}: no backup at {backup_env}")
|
||||
continue
|
||||
|
||||
# Restore env
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "copy",
|
||||
"-a", f"src={backup_env} dest=/root/wizards/{host}/home/.env remote_src=yes mode=0600",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
# Restore SSH keys
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "copy",
|
||||
"-a", f"src={backup_ssh} dest=/root/.ssh/authorized_keys remote_src=yes mode=0600",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
# Restart services
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "shell",
|
||||
"-a", "systemctl restart hermes-*.service openclaw-*.service 2>/dev/null; true",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
print(f" ✅ {host}: restored from rotation {rotation_id}")
|
||||
|
||||
send_telegram(f"🔄 *Secrets Rollback*
|
||||
ID: `{rotation_id}`
|
||||
Restored previous secrets on all nodes.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet secrets rotation tool",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument("--check", action="store_true", help="Pre-flight validation")
|
||||
group.add_argument("--dry-run", action="store_true", help="Show what would change")
|
||||
group.add_argument("--rotate", action="store_true", help="Execute rotation")
|
||||
group.add_argument("--rollback", metavar="ID", help="Rollback to rotation ID")
|
||||
group.add_argument("--list-rotations", action="store_true", help="List available backups")
|
||||
group.add_argument("--json", action="store_true", help="Machine-readable output")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.json:
|
||||
# Machine-readable pre-flight for integration
|
||||
result = {
|
||||
"dependencies": check_dependencies() == [],
|
||||
"inventory_errors": check_inventory(),
|
||||
"hosts": check_connectivity(["ezra", "bezalel"]),
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if not result["inventory_errors"] else 1)
|
||||
|
||||
if args.check:
|
||||
sys.exit(0 if preflight() else 1)
|
||||
elif args.dry_run:
|
||||
sys.exit(0 if dry_run() else 1)
|
||||
elif args.rotate:
|
||||
if not preflight():
|
||||
print("
|
||||
❌ Pre-flight failed. Aborting rotation.")
|
||||
sys.exit(1)
|
||||
sys.exit(0 if rotate() else 1)
|
||||
elif args.rollback:
|
||||
sys.exit(0 if rollback(args.rollback) else 1)
|
||||
elif args.list_rotations:
|
||||
list_rotations()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
55
tests/test_codebase_test_generator.py
Normal file
55
tests/test_codebase_test_generator.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
SCRIPT = ROOT / "scripts" / "codebase_test_generator.py"
|
||||
|
||||
|
||||
def load_module():
|
||||
spec = importlib.util.spec_from_file_location("codebase_test_generator", str(SCRIPT))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def test_generate_test_suite_uses_dynamic_loader_for_numbered_paths():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="linkify",
|
||||
module_path="reports/notebooklm/2026-03-27-hermes-openclaw/render_reports.py",
|
||||
lineno=12,
|
||||
args=["text"],
|
||||
has_return=True,
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
suite = mod.generate_test_suite([gap], max_tests=1)
|
||||
|
||||
assert "import importlib.util" in suite
|
||||
assert "_load_symbol(" in suite
|
||||
assert "from reports.notebooklm" not in suite
|
||||
assert "2026-03-27-hermes-openclaw/render_reports.py" in suite
|
||||
|
||||
|
||||
def test_generate_test_handles_async_and_runtime_args_safely():
|
||||
mod = load_module()
|
||||
func = mod.FunctionInfo(
|
||||
name="keypress",
|
||||
module_path="angband/mcp_server.py",
|
||||
lineno=200,
|
||||
args=["key", "wait_ms", "session_name"],
|
||||
is_async=True,
|
||||
has_return=True,
|
||||
calls=["send_key"],
|
||||
)
|
||||
gap = mod.CoverageGap(func=func, reason="no test found", test_priority=1)
|
||||
|
||||
test_code = mod.generate_test(gap)
|
||||
|
||||
assert "@pytest.mark.asyncio" in test_code
|
||||
assert "async def" in test_code
|
||||
assert "await target(" in test_code
|
||||
assert "key='test'" in test_code
|
||||
assert "wait_ms=1" in test_code
|
||||
assert "session_name='test'" in test_code
|
||||
assert "pytest.raises((RuntimeError, ValueError, TypeError))" in test_code
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,177 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for fleet secrets rotation CLI.
|
||||
|
||||
Tests pre-flight checks, argument parsing, and integration points.
|
||||
Does NOT execute actual rotations — uses mocks for ansible commands.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Add scripts dir to path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
|
||||
import rotate_secrets
|
||||
|
||||
|
||||
class TestDependencyCheck(unittest.TestCase):
|
||||
"""Test dependency verification."""
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_missing_ansible_playbook(self, mock_run):
|
||||
mock_run.return_value = (1, "", "not found")
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertIn("ansible-playbook", missing)
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_all_deps_present(self, mock_run):
|
||||
mock_run.return_value = (0, "/usr/bin/ansible-playbook", "")
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertEqual(missing, [])
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_missing_ansible_vault(self, mock_run):
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "ansible-vault" in cmd:
|
||||
return (1, "", "not found")
|
||||
return (0, "/usr/bin/ansible-playbook", "")
|
||||
mock_run.side_effect = side_effect
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertIn("ansible-vault", missing)
|
||||
|
||||
|
||||
class TestInventoryCheck(unittest.TestCase):
|
||||
"""Test inventory file validation."""
|
||||
|
||||
def test_missing_inventory(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Override paths to temp dir
|
||||
original_inventory = rotate_secrets.INVENTORY
|
||||
original_vault = rotate_secrets.VAULT_FILE
|
||||
original_vars = rotate_secrets.FLEET_VARS
|
||||
|
||||
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
|
||||
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
|
||||
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
|
||||
|
||||
errors = rotate_secrets.check_inventory()
|
||||
self.assertEqual(len(errors), 3)
|
||||
self.assertTrue(any("hosts.ini" in e for e in errors))
|
||||
|
||||
# Restore
|
||||
rotate_secrets.INVENTORY = original_inventory
|
||||
rotate_secrets.VAULT_FILE = original_vault
|
||||
rotate_secrets.FLEET_VARS = original_vars
|
||||
|
||||
def test_all_files_present(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create dummy files
|
||||
for name in ["hosts.ini", "vault.yml", "fleet.yml"]:
|
||||
(Path(tmpdir) / name).write_text("# placeholder")
|
||||
|
||||
original_inventory = rotate_secrets.INVENTORY
|
||||
original_vault = rotate_secrets.VAULT_FILE
|
||||
original_vars = rotate_secrets.FLEET_VARS
|
||||
|
||||
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
|
||||
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
|
||||
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
|
||||
|
||||
with patch("rotate_secrets.run_cmd") as mock_run:
|
||||
mock_run.return_value = (0, "vault content", "")
|
||||
errors = rotate_secrets.check_inventory()
|
||||
self.assertEqual(errors, [])
|
||||
|
||||
rotate_secrets.INVENTORY = original_inventory
|
||||
rotate_secrets.VAULT_FILE = original_vault
|
||||
rotate_secrets.FLEET_VARS = original_vars
|
||||
|
||||
|
||||
class TestConnectivity(unittest.TestCase):
|
||||
"""Test host connectivity checks."""
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_all_hosts_reachable(self, mock_run):
|
||||
mock_run.return_value = (0, "SUCCESS", "")
|
||||
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
|
||||
self.assertTrue(results["ezra"])
|
||||
self.assertTrue(results["bezalel"])
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_one_host_down(self, mock_run):
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "ezra" in cmd:
|
||||
return (1, "UNREACHABLE", "")
|
||||
return (0, "SUCCESS", "")
|
||||
mock_run.side_effect = side_effect
|
||||
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
|
||||
self.assertFalse(results["ezra"])
|
||||
self.assertTrue(results["bezalel"])
|
||||
|
||||
|
||||
class TestRunCmd(unittest.TestCase):
|
||||
"""Test command runner."""
|
||||
|
||||
def test_successful_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["echo", "hello"])
|
||||
self.assertEqual(code, 0)
|
||||
self.assertEqual(out.strip(), "hello")
|
||||
|
||||
def test_failing_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["false"])
|
||||
self.assertEqual(code, 1)
|
||||
|
||||
def test_missing_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["nonexistent_command_xyz"])
|
||||
self.assertEqual(code, -2)
|
||||
|
||||
def test_timeout(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["sleep", "30"], timeout=1)
|
||||
self.assertEqual(code, -1)
|
||||
|
||||
|
||||
class TestTelegramNotification(unittest.TestCase):
|
||||
"""Test Telegram notification (no-op when not configured)."""
|
||||
|
||||
def test_no_token_returns_false(self):
|
||||
with patch.dict(os.environ, {"TELEGRAM_BOT_TOKEN": "", "TELEGRAM_CHAT_ID": ""}):
|
||||
result = rotate_secrets.send_telegram("test")
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
class TestJsonOutput(unittest.TestCase):
|
||||
"""Test machine-readable JSON output."""
|
||||
|
||||
@patch("rotate_secrets.check_dependencies")
|
||||
@patch("rotate_secrets.check_inventory")
|
||||
@patch("rotate_secrets.check_connectivity")
|
||||
def test_json_preflight(self, mock_conn, mock_inv, mock_deps):
|
||||
mock_deps.return_value = []
|
||||
mock_inv.return_value = []
|
||||
mock_conn.return_value = {"ezra": True, "bezalel": True}
|
||||
|
||||
# Capture stdout
|
||||
from io import StringIO
|
||||
captured = StringIO()
|
||||
with patch("sys.stdout", captured):
|
||||
result = {
|
||||
"dependencies": True,
|
||||
"inventory_errors": [],
|
||||
"hosts": {"ezra": True, "bezalel": True},
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
output = captured.getvalue()
|
||||
parsed = json.loads(output)
|
||||
self.assertTrue(parsed["dependencies"])
|
||||
self.assertEqual(parsed["inventory_errors"], [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user