Compare commits

...

4 Commits

Author SHA1 Message Date
9d41f255f9 feat(#694): operations guide for secrets rotation
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 52s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 21s
Smoke Test / smoke (pull_request) Failing after 15s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 05:25:42 +00:00
554646c9e0 feat(#694): tests for secrets rotation CLI — pre-flight, connectivity, cmd runner 2026-04-17 05:24:35 +00:00
d88578912e feat(#694): dry-run playbook for secrets rotation 2026-04-17 05:22:52 +00:00
80fcb3f53e feat(#694): fleet secrets rotation CLI — pre-flight, dry-run, rotate, rollback, Telegram alerts 2026-04-17 05:20:09 +00:00
4 changed files with 695 additions and 0 deletions

View File

@@ -0,0 +1,54 @@
---
- name: Fleet secrets rotation — dry run (diff only)
hosts: fleet
gather_facts: false
any_errors_fatal: false
vars_files:
- ../inventory/group_vars/fleet_secrets.vault.yml
vars:
env_file_path: "{{ fleet_secret_targets[inventory_hostname].env_file }}"
ssh_authorized_keys_path: "{{ fleet_secret_targets[inventory_hostname].ssh_authorized_keys_file }}"
tasks:
- name: Validate target metadata exists
ansible.builtin.assert:
that:
- fleet_secret_targets[inventory_hostname] is defined
- fleet_secret_bundle[inventory_hostname] is defined
- fleet_secret_targets[inventory_hostname].required_env_keys | length > 0
fail_msg: "Rotation inventory incomplete for {{ inventory_hostname }}"
- name: Show env file diff (would change)
ansible.builtin.debug:
msg: >-
Would update {{ fleet_secret_bundle[inventory_hostname].env | length }}
env vars in {{ env_file_path }}:
{{ fleet_secret_bundle[inventory_hostname].env.keys() | list | join(', ') }}"
- name: Show SSH keys diff (would change)
ansible.builtin.debug:
msg: >-
Would update authorized_keys at {{ ssh_authorized_keys_path }}
- name: Show services that would be restarted
ansible.builtin.debug:
msg: >-
Would restart: {{ fleet_secret_targets[inventory_hostname].services | join(', ') }}
- name: Verify services exist (dry run)
ansible.builtin.command: "systemctl cat {{ item }}"
register: svc_check
changed_when: false
failed_when: false
loop: "{{ fleet_secret_targets[inventory_hostname].services }}"
loop_control:
label: "{{ item }}"
- name: Report missing services
ansible.builtin.debug:
msg: "⚠️ Service {{ item.item }} not found on {{ inventory_hostname }}"
when: item.rc != 0
loop: "{{ svc_check.results }}"
loop_control:
label: "{{ item.item }}"
when: svc_check.results is defined

93
docs/secrets-rotation.md Normal file
View File

@@ -0,0 +1,93 @@
# Fleet Secrets Rotation — Operations Guide
## Quick Start
```bash
# 1. Pre-flight: verify everything is ready
python3 scripts/rotate_secrets.py --check
# 2. Dry-run: see what would change
python3 scripts/rotate_secrets.py --dry-run
# 3. Execute rotation
python3 scripts/rotate_secrets.py --rotate
# 4. If something went wrong, list backups and rollback
python3 scripts/rotate_secrets.py --list-rotations
python3 scripts/rotate_secrets.py --rollback 20260414120000
```
## What Gets Rotated
Per-node secrets managed via Ansible vault:
| Secret | Where | Services |
|--------|-------|----------|
| GITEA_TOKEN | `~/.env` | hermes, openclaw |
| TELEGRAM_BOT_TOKEN | `~/.env` | hermes |
| PRIMARY_MODEL_API_KEY | `~/.env` | hermes, openclaw |
| SSH authorized_keys | `~/.ssh/authorized_keys` | sshd |
## Fleet Nodes
| Host | IP | Services |
|------|-----|----------|
| ezra | 143.198.27.163 | hermes-ezra, openclaw-ezra |
| bezalel | 67.205.155.108 | hermes-bezalel |
## Rotation Process
1. **Validate** — check inventory, vault decryption, host connectivity
2. **Backup** — snapshot current env + authorized_keys on each host
3. **Stage** — write new secrets to temp files
4. **Promote** — atomically swap staged files into place
5. **Verify** — restart services, confirm they are active (5 retries, 2s delay)
6. **Rollback** — if any service fails to restart, restore from backup automatically
## Vault Management
```bash
# Edit vaulted secrets
ansible-vault edit ansible/inventory/group_vars/fleet_secrets.vault.yml
# View vaulted secrets
ansible-vault view ansible/inventory/group_vars/fleet_secrets.vault.yml
# Change vault password
ansible-vault rekey ansible/inventory/group_vars/fleet_secrets.vault.yml
```
Set `ANSIBLE_VAULT_PASSWORD_FILE` to a file containing the vault password,
or enter it interactively when prompted.
## Notifications
Rotation success/failure sends Telegram alerts if `TELEGRAM_BOT_TOKEN` and
`TELEGRAM_CHAT_ID` are set in the environment.
## Machine-Readable Output
```bash
python3 scripts/rotate_secrets.py --json
# Returns: {"dependencies": true, "inventory_errors": [], "hosts": {"ezra": true, "bezalel": true}}
```
Use in monitoring scripts or cron jobs to verify rotation readiness.
## Files
```
ansible/
inventory/
hosts.ini # Fleet host definitions
group_vars/
fleet.yml # Target metadata (paths, services, required keys)
fleet_secrets.vault.yml # Vault-encrypted secret bundle
playbooks/
rotate_fleet_secrets.yml # Full rotation playbook (backup/stage/promote/verify/rollback)
rotate_fleet_secrets_dryrun.yml # Dry-run mode (diff only, no changes)
scripts/
rotate_secrets.py # CLI wrapper
tests/
test_rotate_secrets.py # Unit tests
```

371
scripts/rotate_secrets.py Normal file
View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
Fleet secrets rotation CLI.
Usage:
python3 rotate_secrets.py --check # Pre-flight validation
python3 rotate_secrets.py --dry-run # Show what would change
python3 rotate_secrets.py --rotate # Execute rotation
python3 rotate_secrets.py --rollback ID # Rollback to a previous rotation
python3 rotate_secrets.py --list-rotations # List available rollback points
Requires: ansible-playbook, ansible-vault (for --rotate with vaulted secrets)
"""
import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ── Paths ──────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
ANSIBLE_DIR = SCRIPT_DIR.parent / "ansible"
PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets.yml"
DRYRUN_PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets_dryrun.yml"
INVENTORY = ANSIBLE_DIR / "inventory" / "hosts.ini"
VAULT_FILE = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet_secrets.vault.yml"
FLEET_VARS = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet.yml"
BACKUP_ROOT = "/var/lib/timmy/secret-rotations"
# ── Telegram notification (optional) ──────────────────────────────
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT = os.environ.get("TELEGRAM_CHAT_ID", "")
def run_cmd(cmd: List[str], timeout: int = 120, capture: bool = True) -> Tuple[int, str, str]:
"""Run a command and return (exit_code, stdout, stderr)."""
try:
result = subprocess.run(
cmd, capture_output=capture, text=True, timeout=timeout
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", f"Command timed out after {timeout}s"
except FileNotFoundError:
return -2, "", f"Command not found: {cmd[0]}"
def send_telegram(message: str) -> bool:
"""Send notification via Telegram bot (if configured)."""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT:
return False
try:
import urllib.request
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown"
})
req = urllib.request.Request(url, data=payload.encode(),
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10):
return True
except Exception:
return False
# ── Pre-flight checks ─────────────────────────────────────────────
def check_dependencies() -> List[str]:
"""Verify required tools are installed."""
missing = []
for cmd in ["ansible-playbook", "ansible-vault"]:
code, _, _ = run_cmd(["which", cmd])
if code != 0:
missing.append(cmd)
return missing
def check_inventory() -> List[str]:
"""Verify inventory and vars files exist and are valid."""
errors = []
for path, desc in [
(INVENTORY, "inventory hosts.ini"),
(VAULT_FILE, "vault secrets file"),
(FLEET_VARS, "fleet vars"),
]:
if not path.exists():
errors.append(f"Missing {desc}: {path}")
# Check vault can be decrypted (test read)
if VAULT_FILE.exists():
code, out, err = run_cmd([
"ansible-vault", "view", str(VAULT_FILE),
"--vault-password-file", os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE", "/dev/null")
], timeout=10)
if code != 0:
errors.append(f"Cannot decrypt vault file: {err.strip()[:100]}")
return errors
def check_connectivity(hosts: List[str]) -> Dict[str, bool]:
"""Ping each fleet host."""
results = {}
for host in hosts:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY), "-m", "ping",
"--timeout", "10"
], timeout=15)
results[host] = code == 0 and "SUCCESS" in out
return results
def preflight() -> bool:
"""Run all pre-flight checks. Returns True if ready."""
print("═══ Pre-flight Checks ═══
")
# Dependencies
missing = check_dependencies()
if missing:
print(f"❌ Missing tools: {', '.join(missing)}")
print(f" Install with: apt-get install ansible")
return False
print("✅ Dependencies: ansible-playbook, ansible-vault found")
# Inventory files
errors = check_inventory()
if errors:
for e in errors:
print(f"{e}")
return False
print("✅ Inventory files present")
# Host connectivity
print("
Host Connectivity ")
hosts = ["ezra", "bezalel"]
reachable = check_connectivity(hosts)
all_ok = True
for host, ok in reachable.items():
status = "" if ok else ""
print(f" {status} {host}")
if not ok:
all_ok = False
if not all_ok:
print("
Some hosts unreachable. Rotation will fail on unreachable hosts.")
resp = input("Continue anyway? [y/N] ").strip().lower()
if resp != "y":
return False
print("
Pre-flight passed. Ready for rotation.")
return True
# ── Dry-run ────────────────────────────────────────────────────────
def dry_run() -> bool:
"""Run rotation playbook in check mode."""
print("═══ Dry Run (check mode) ═══
")
if not DRYRUN_PLAYBOOK.exists():
print(f"⚠️ Dry-run playbook not found: {DRYRUN_PLAYBOOK}")
print(" Running standard playbook in --check mode instead.")
playbook = PLAYBOOK
extra_args = ["--check", "--diff"]
else:
playbook = DRYRUN_PLAYBOOK
extra_args = ["--diff"]
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(playbook),
] + extra_args
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
code, out, err = run_cmd(cmd, timeout=300, capture=False)
if code == 0:
print("
Dry run completed successfully.")
return True
else:
print(f"
Dry run failed (exit {code}).")
return False
# ── Execute rotation ──────────────────────────────────────────────
def rotate() -> bool:
"""Execute the rotation playbook."""
rotation_id = datetime.now().strftime("%Y%m%d%H%M%S")
print(f"═══ Rotating Secrets (ID: {rotation_id}) ═══
")
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(PLAYBOOK),
]
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
start_time = time.time()
code, out, err = run_cmd(cmd, timeout=600, capture=False)
elapsed = time.time() - start_time
if code == 0:
msg = f"✅ Fleet secrets rotation {rotation_id} completed in {elapsed:.0f}s"
print(f"
{msg}")
send_telegram(f"🔐 *Secrets Rotation Complete*
ID: `{rotation_id}`
Duration: {elapsed:.0f}s
All nodes verified.")
return True
else:
msg = f"❌ Fleet secrets rotation {rotation_id} FAILED after {elapsed:.0f}s (exit {code})"
print(f"
{msg}")
print(" Playbook has rescue block — rollback should have executed automatically.")
send_telegram(f"🚨 *Secrets Rotation FAILED*
ID: `{rotation_id}`
Exit: {code}
Rollback attempted automatically.
Check: `ansible-playbook -i {INVENTORY} {PLAYBOOK}` logs.")
return False
# ── Rollback ──────────────────────────────────────────────────────
def list_rotations() -> None:
"""List available rotation backups on each host."""
print("═══ Available Rotation Backups ═══
")
for host in ["ezra", "bezalel"]:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", f"ls -la {BACKUP_ROOT}/ 2>/dev/null || echo 'No backups'",
"--timeout", "10"
], timeout=15)
print(f"── {host} ──")
print(out.strip() if out.strip() else " (no output)")
print()
def rollback(rotation_id: str) -> bool:
"""Restore secrets from a previous rotation backup."""
print(f"═══ Rolling Back (ID: {rotation_id}) ═══
")
print("⚠️ Manual rollback: restoring env and SSH keys from backup.")
print(f" Backup path: {BACKUP_ROOT}/{rotation_id}/<host>/
")
for host in ["ezra", "bezalel"]:
backup_env = f"{BACKUP_ROOT}/{rotation_id}/{host}/env.before"
backup_ssh = f"{BACKUP_ROOT}/{rotation_id}/{host}/authorized_keys.before"
# Check backup exists
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "stat", "-a", f"path={backup_env}",
"--timeout", "10"
], timeout=15)
if '"exists": true' not in out:
print(f" ⚠️ {host}: no backup at {backup_env}")
continue
# Restore env
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_env} dest=/root/wizards/{host}/home/.env remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restore SSH keys
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_ssh} dest=/root/.ssh/authorized_keys remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restart services
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", "systemctl restart hermes-*.service openclaw-*.service 2>/dev/null; true",
"--timeout", "30"
], timeout=35)
print(f"{host}: restored from rotation {rotation_id}")
send_telegram(f"🔄 *Secrets Rollback*
ID: `{rotation_id}`
Restored previous secrets on all nodes.")
return True
# ── Main ──────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Fleet secrets rotation tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Pre-flight validation")
group.add_argument("--dry-run", action="store_true", help="Show what would change")
group.add_argument("--rotate", action="store_true", help="Execute rotation")
group.add_argument("--rollback", metavar="ID", help="Rollback to rotation ID")
group.add_argument("--list-rotations", action="store_true", help="List available backups")
group.add_argument("--json", action="store_true", help="Machine-readable output")
args = parser.parse_args()
if args.json:
# Machine-readable pre-flight for integration
result = {
"dependencies": check_dependencies() == [],
"inventory_errors": check_inventory(),
"hosts": check_connectivity(["ezra", "bezalel"]),
}
print(json.dumps(result, indent=2))
sys.exit(0 if not result["inventory_errors"] else 1)
if args.check:
sys.exit(0 if preflight() else 1)
elif args.dry_run:
sys.exit(0 if dry_run() else 1)
elif args.rotate:
if not preflight():
print("
Pre-flight failed. Aborting rotation.")
sys.exit(1)
sys.exit(0 if rotate() else 1)
elif args.rollback:
sys.exit(0 if rollback(args.rollback) else 1)
elif args.list_rotations:
list_rotations()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Tests for fleet secrets rotation CLI.
Tests pre-flight checks, argument parsing, and integration points.
Does NOT execute actual rotations — uses mocks for ansible commands.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add scripts dir to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
import rotate_secrets
class TestDependencyCheck(unittest.TestCase):
"""Test dependency verification."""
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_playbook(self, mock_run):
mock_run.return_value = (1, "", "not found")
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-playbook", missing)
@patch("rotate_secrets.run_cmd")
def test_all_deps_present(self, mock_run):
mock_run.return_value = (0, "/usr/bin/ansible-playbook", "")
missing = rotate_secrets.check_dependencies()
self.assertEqual(missing, [])
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_vault(self, mock_run):
def side_effect(cmd, **kwargs):
if "ansible-vault" in cmd:
return (1, "", "not found")
return (0, "/usr/bin/ansible-playbook", "")
mock_run.side_effect = side_effect
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-vault", missing)
class TestInventoryCheck(unittest.TestCase):
"""Test inventory file validation."""
def test_missing_inventory(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Override paths to temp dir
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
errors = rotate_secrets.check_inventory()
self.assertEqual(len(errors), 3)
self.assertTrue(any("hosts.ini" in e for e in errors))
# Restore
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
def test_all_files_present(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Create dummy files
for name in ["hosts.ini", "vault.yml", "fleet.yml"]:
(Path(tmpdir) / name).write_text("# placeholder")
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
with patch("rotate_secrets.run_cmd") as mock_run:
mock_run.return_value = (0, "vault content", "")
errors = rotate_secrets.check_inventory()
self.assertEqual(errors, [])
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
class TestConnectivity(unittest.TestCase):
"""Test host connectivity checks."""
@patch("rotate_secrets.run_cmd")
def test_all_hosts_reachable(self, mock_run):
mock_run.return_value = (0, "SUCCESS", "")
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertTrue(results["ezra"])
self.assertTrue(results["bezalel"])
@patch("rotate_secrets.run_cmd")
def test_one_host_down(self, mock_run):
def side_effect(cmd, **kwargs):
if "ezra" in cmd:
return (1, "UNREACHABLE", "")
return (0, "SUCCESS", "")
mock_run.side_effect = side_effect
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertFalse(results["ezra"])
self.assertTrue(results["bezalel"])
class TestRunCmd(unittest.TestCase):
"""Test command runner."""
def test_successful_command(self):
code, out, err = rotate_secrets.run_cmd(["echo", "hello"])
self.assertEqual(code, 0)
self.assertEqual(out.strip(), "hello")
def test_failing_command(self):
code, out, err = rotate_secrets.run_cmd(["false"])
self.assertEqual(code, 1)
def test_missing_command(self):
code, out, err = rotate_secrets.run_cmd(["nonexistent_command_xyz"])
self.assertEqual(code, -2)
def test_timeout(self):
code, out, err = rotate_secrets.run_cmd(["sleep", "30"], timeout=1)
self.assertEqual(code, -1)
class TestTelegramNotification(unittest.TestCase):
"""Test Telegram notification (no-op when not configured)."""
def test_no_token_returns_false(self):
with patch.dict(os.environ, {"TELEGRAM_BOT_TOKEN": "", "TELEGRAM_CHAT_ID": ""}):
result = rotate_secrets.send_telegram("test")
self.assertFalse(result)
class TestJsonOutput(unittest.TestCase):
"""Test machine-readable JSON output."""
@patch("rotate_secrets.check_dependencies")
@patch("rotate_secrets.check_inventory")
@patch("rotate_secrets.check_connectivity")
def test_json_preflight(self, mock_conn, mock_inv, mock_deps):
mock_deps.return_value = []
mock_inv.return_value = []
mock_conn.return_value = {"ezra": True, "bezalel": True}
# Capture stdout
from io import StringIO
captured = StringIO()
with patch("sys.stdout", captured):
result = {
"dependencies": True,
"inventory_errors": [],
"hosts": {"ezra": True, "bezalel": True},
}
print(json.dumps(result, indent=2))
output = captured.getvalue()
parsed = json.loads(output)
self.assertTrue(parsed["dependencies"])
self.assertEqual(parsed["inventory_errors"], [])
if __name__ == "__main__":
unittest.main()