Compare commits
4 Commits
step35/667
...
feat/694-s
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d41f255f9 | |||
| 554646c9e0 | |||
| d88578912e | |||
| 80fcb3f53e |
54
ansible/playbooks/rotate_fleet_secrets_dryrun.yml
Normal file
54
ansible/playbooks/rotate_fleet_secrets_dryrun.yml
Normal file
@@ -0,0 +1,54 @@
|
||||
---
|
||||
- name: Fleet secrets rotation — dry run (diff only)
|
||||
hosts: fleet
|
||||
gather_facts: false
|
||||
any_errors_fatal: false
|
||||
vars_files:
|
||||
- ../inventory/group_vars/fleet_secrets.vault.yml
|
||||
vars:
|
||||
env_file_path: "{{ fleet_secret_targets[inventory_hostname].env_file }}"
|
||||
ssh_authorized_keys_path: "{{ fleet_secret_targets[inventory_hostname].ssh_authorized_keys_file }}"
|
||||
|
||||
tasks:
|
||||
- name: Validate target metadata exists
|
||||
ansible.builtin.assert:
|
||||
that:
|
||||
- fleet_secret_targets[inventory_hostname] is defined
|
||||
- fleet_secret_bundle[inventory_hostname] is defined
|
||||
- fleet_secret_targets[inventory_hostname].required_env_keys | length > 0
|
||||
fail_msg: "Rotation inventory incomplete for {{ inventory_hostname }}"
|
||||
|
||||
- name: Show env file diff (would change)
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would update {{ fleet_secret_bundle[inventory_hostname].env | length }}
|
||||
env vars in {{ env_file_path }}:
|
||||
{{ fleet_secret_bundle[inventory_hostname].env.keys() | list | join(', ') }}"
|
||||
|
||||
- name: Show SSH keys diff (would change)
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would update authorized_keys at {{ ssh_authorized_keys_path }}
|
||||
|
||||
- name: Show services that would be restarted
|
||||
ansible.builtin.debug:
|
||||
msg: >-
|
||||
Would restart: {{ fleet_secret_targets[inventory_hostname].services | join(', ') }}
|
||||
|
||||
- name: Verify services exist (dry run)
|
||||
ansible.builtin.command: "systemctl cat {{ item }}"
|
||||
register: svc_check
|
||||
changed_when: false
|
||||
failed_when: false
|
||||
loop: "{{ fleet_secret_targets[inventory_hostname].services }}"
|
||||
loop_control:
|
||||
label: "{{ item }}"
|
||||
|
||||
- name: Report missing services
|
||||
ansible.builtin.debug:
|
||||
msg: "⚠️ Service {{ item.item }} not found on {{ inventory_hostname }}"
|
||||
when: item.rc != 0
|
||||
loop: "{{ svc_check.results }}"
|
||||
loop_control:
|
||||
label: "{{ item.item }}"
|
||||
when: svc_check.results is defined
|
||||
93
docs/secrets-rotation.md
Normal file
93
docs/secrets-rotation.md
Normal file
@@ -0,0 +1,93 @@
|
||||
# Fleet Secrets Rotation — Operations Guide
|
||||
|
||||
## Quick Start
|
||||
|
||||
```bash
|
||||
# 1. Pre-flight: verify everything is ready
|
||||
python3 scripts/rotate_secrets.py --check
|
||||
|
||||
# 2. Dry-run: see what would change
|
||||
python3 scripts/rotate_secrets.py --dry-run
|
||||
|
||||
# 3. Execute rotation
|
||||
python3 scripts/rotate_secrets.py --rotate
|
||||
|
||||
# 4. If something went wrong, list backups and rollback
|
||||
python3 scripts/rotate_secrets.py --list-rotations
|
||||
python3 scripts/rotate_secrets.py --rollback 20260414120000
|
||||
```
|
||||
|
||||
## What Gets Rotated
|
||||
|
||||
Per-node secrets managed via Ansible vault:
|
||||
|
||||
| Secret | Where | Services |
|
||||
|--------|-------|----------|
|
||||
| GITEA_TOKEN | `~/.env` | hermes, openclaw |
|
||||
| TELEGRAM_BOT_TOKEN | `~/.env` | hermes |
|
||||
| PRIMARY_MODEL_API_KEY | `~/.env` | hermes, openclaw |
|
||||
| SSH authorized_keys | `~/.ssh/authorized_keys` | sshd |
|
||||
|
||||
## Fleet Nodes
|
||||
|
||||
| Host | IP | Services |
|
||||
|------|-----|----------|
|
||||
| ezra | 143.198.27.163 | hermes-ezra, openclaw-ezra |
|
||||
| bezalel | 67.205.155.108 | hermes-bezalel |
|
||||
|
||||
## Rotation Process
|
||||
|
||||
1. **Validate** — check inventory, vault decryption, host connectivity
|
||||
2. **Backup** — snapshot current env + authorized_keys on each host
|
||||
3. **Stage** — write new secrets to temp files
|
||||
4. **Promote** — atomically swap staged files into place
|
||||
5. **Verify** — restart services, confirm they are active (5 retries, 2s delay)
|
||||
6. **Rollback** — if any service fails to restart, restore from backup automatically
|
||||
|
||||
## Vault Management
|
||||
|
||||
```bash
|
||||
# Edit vaulted secrets
|
||||
ansible-vault edit ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
|
||||
# View vaulted secrets
|
||||
ansible-vault view ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
|
||||
# Change vault password
|
||||
ansible-vault rekey ansible/inventory/group_vars/fleet_secrets.vault.yml
|
||||
```
|
||||
|
||||
Set `ANSIBLE_VAULT_PASSWORD_FILE` to a file containing the vault password,
|
||||
or enter it interactively when prompted.
|
||||
|
||||
## Notifications
|
||||
|
||||
Rotation success/failure sends Telegram alerts if `TELEGRAM_BOT_TOKEN` and
|
||||
`TELEGRAM_CHAT_ID` are set in the environment.
|
||||
|
||||
## Machine-Readable Output
|
||||
|
||||
```bash
|
||||
python3 scripts/rotate_secrets.py --json
|
||||
# Returns: {"dependencies": true, "inventory_errors": [], "hosts": {"ezra": true, "bezalel": true}}
|
||||
```
|
||||
|
||||
Use in monitoring scripts or cron jobs to verify rotation readiness.
|
||||
|
||||
## Files
|
||||
|
||||
```
|
||||
ansible/
|
||||
inventory/
|
||||
hosts.ini # Fleet host definitions
|
||||
group_vars/
|
||||
fleet.yml # Target metadata (paths, services, required keys)
|
||||
fleet_secrets.vault.yml # Vault-encrypted secret bundle
|
||||
playbooks/
|
||||
rotate_fleet_secrets.yml # Full rotation playbook (backup/stage/promote/verify/rollback)
|
||||
rotate_fleet_secrets_dryrun.yml # Dry-run mode (diff only, no changes)
|
||||
scripts/
|
||||
rotate_secrets.py # CLI wrapper
|
||||
tests/
|
||||
test_rotate_secrets.py # Unit tests
|
||||
```
|
||||
371
scripts/rotate_secrets.py
Normal file
371
scripts/rotate_secrets.py
Normal file
@@ -0,0 +1,371 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet secrets rotation CLI.
|
||||
|
||||
Usage:
|
||||
python3 rotate_secrets.py --check # Pre-flight validation
|
||||
python3 rotate_secrets.py --dry-run # Show what would change
|
||||
python3 rotate_secrets.py --rotate # Execute rotation
|
||||
python3 rotate_secrets.py --rollback ID # Rollback to a previous rotation
|
||||
python3 rotate_secrets.py --list-rotations # List available rollback points
|
||||
|
||||
Requires: ansible-playbook, ansible-vault (for --rotate with vaulted secrets)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
|
||||
# ── Paths ──────────────────────────────────────────────────────────
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
ANSIBLE_DIR = SCRIPT_DIR.parent / "ansible"
|
||||
PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets.yml"
|
||||
DRYRUN_PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets_dryrun.yml"
|
||||
INVENTORY = ANSIBLE_DIR / "inventory" / "hosts.ini"
|
||||
VAULT_FILE = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet_secrets.vault.yml"
|
||||
FLEET_VARS = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet.yml"
|
||||
BACKUP_ROOT = "/var/lib/timmy/secret-rotations"
|
||||
|
||||
# ── Telegram notification (optional) ──────────────────────────────
|
||||
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
TELEGRAM_CHAT = os.environ.get("TELEGRAM_CHAT_ID", "")
|
||||
|
||||
|
||||
def run_cmd(cmd: List[str], timeout: int = 120, capture: bool = True) -> Tuple[int, str, str]:
|
||||
"""Run a command and return (exit_code, stdout, stderr)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
cmd, capture_output=capture, text=True, timeout=timeout
|
||||
)
|
||||
return result.returncode, result.stdout, result.stderr
|
||||
except subprocess.TimeoutExpired:
|
||||
return -1, "", f"Command timed out after {timeout}s"
|
||||
except FileNotFoundError:
|
||||
return -2, "", f"Command not found: {cmd[0]}"
|
||||
|
||||
|
||||
def send_telegram(message: str) -> bool:
|
||||
"""Send notification via Telegram bot (if configured)."""
|
||||
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT:
|
||||
return False
|
||||
try:
|
||||
import urllib.request
|
||||
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
|
||||
payload = json.dumps({
|
||||
"chat_id": TELEGRAM_CHAT,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown"
|
||||
})
|
||||
req = urllib.request.Request(url, data=payload.encode(),
|
||||
headers={"Content-Type": "application/json"})
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# ── Pre-flight checks ─────────────────────────────────────────────
|
||||
|
||||
def check_dependencies() -> List[str]:
|
||||
"""Verify required tools are installed."""
|
||||
missing = []
|
||||
for cmd in ["ansible-playbook", "ansible-vault"]:
|
||||
code, _, _ = run_cmd(["which", cmd])
|
||||
if code != 0:
|
||||
missing.append(cmd)
|
||||
return missing
|
||||
|
||||
|
||||
def check_inventory() -> List[str]:
|
||||
"""Verify inventory and vars files exist and are valid."""
|
||||
errors = []
|
||||
for path, desc in [
|
||||
(INVENTORY, "inventory hosts.ini"),
|
||||
(VAULT_FILE, "vault secrets file"),
|
||||
(FLEET_VARS, "fleet vars"),
|
||||
]:
|
||||
if not path.exists():
|
||||
errors.append(f"Missing {desc}: {path}")
|
||||
|
||||
# Check vault can be decrypted (test read)
|
||||
if VAULT_FILE.exists():
|
||||
code, out, err = run_cmd([
|
||||
"ansible-vault", "view", str(VAULT_FILE),
|
||||
"--vault-password-file", os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE", "/dev/null")
|
||||
], timeout=10)
|
||||
if code != 0:
|
||||
errors.append(f"Cannot decrypt vault file: {err.strip()[:100]}")
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def check_connectivity(hosts: List[str]) -> Dict[str, bool]:
|
||||
"""Ping each fleet host."""
|
||||
results = {}
|
||||
for host in hosts:
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY), "-m", "ping",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
results[host] = code == 0 and "SUCCESS" in out
|
||||
return results
|
||||
|
||||
|
||||
def preflight() -> bool:
|
||||
"""Run all pre-flight checks. Returns True if ready."""
|
||||
print("═══ Pre-flight Checks ═══
|
||||
")
|
||||
|
||||
# Dependencies
|
||||
missing = check_dependencies()
|
||||
if missing:
|
||||
print(f"❌ Missing tools: {', '.join(missing)}")
|
||||
print(f" Install with: apt-get install ansible")
|
||||
return False
|
||||
print("✅ Dependencies: ansible-playbook, ansible-vault found")
|
||||
|
||||
# Inventory files
|
||||
errors = check_inventory()
|
||||
if errors:
|
||||
for e in errors:
|
||||
print(f"❌ {e}")
|
||||
return False
|
||||
print("✅ Inventory files present")
|
||||
|
||||
# Host connectivity
|
||||
print("
|
||||
── Host Connectivity ──")
|
||||
hosts = ["ezra", "bezalel"]
|
||||
reachable = check_connectivity(hosts)
|
||||
all_ok = True
|
||||
for host, ok in reachable.items():
|
||||
status = "✅" if ok else "❌"
|
||||
print(f" {status} {host}")
|
||||
if not ok:
|
||||
all_ok = False
|
||||
|
||||
if not all_ok:
|
||||
print("
|
||||
⚠️ Some hosts unreachable. Rotation will fail on unreachable hosts.")
|
||||
resp = input("Continue anyway? [y/N] ").strip().lower()
|
||||
if resp != "y":
|
||||
return False
|
||||
|
||||
print("
|
||||
✅ Pre-flight passed. Ready for rotation.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Dry-run ────────────────────────────────────────────────────────
|
||||
|
||||
def dry_run() -> bool:
|
||||
"""Run rotation playbook in check mode."""
|
||||
print("═══ Dry Run (check mode) ═══
|
||||
")
|
||||
|
||||
if not DRYRUN_PLAYBOOK.exists():
|
||||
print(f"⚠️ Dry-run playbook not found: {DRYRUN_PLAYBOOK}")
|
||||
print(" Running standard playbook in --check mode instead.")
|
||||
playbook = PLAYBOOK
|
||||
extra_args = ["--check", "--diff"]
|
||||
else:
|
||||
playbook = DRYRUN_PLAYBOOK
|
||||
extra_args = ["--diff"]
|
||||
|
||||
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
|
||||
cmd = [
|
||||
"ansible-playbook",
|
||||
"-i", str(INVENTORY),
|
||||
str(playbook),
|
||||
] + extra_args
|
||||
|
||||
if vault_pass:
|
||||
cmd.extend(["--vault-password-file", vault_pass])
|
||||
|
||||
print(f"Running: {' '.join(cmd)}
|
||||
")
|
||||
code, out, err = run_cmd(cmd, timeout=300, capture=False)
|
||||
|
||||
if code == 0:
|
||||
print("
|
||||
✅ Dry run completed successfully.")
|
||||
return True
|
||||
else:
|
||||
print(f"
|
||||
❌ Dry run failed (exit {code}).")
|
||||
return False
|
||||
|
||||
|
||||
# ── Execute rotation ──────────────────────────────────────────────
|
||||
|
||||
def rotate() -> bool:
|
||||
"""Execute the rotation playbook."""
|
||||
rotation_id = datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
print(f"═══ Rotating Secrets (ID: {rotation_id}) ═══
|
||||
")
|
||||
|
||||
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
|
||||
cmd = [
|
||||
"ansible-playbook",
|
||||
"-i", str(INVENTORY),
|
||||
str(PLAYBOOK),
|
||||
]
|
||||
|
||||
if vault_pass:
|
||||
cmd.extend(["--vault-password-file", vault_pass])
|
||||
|
||||
print(f"Running: {' '.join(cmd)}
|
||||
")
|
||||
|
||||
start_time = time.time()
|
||||
code, out, err = run_cmd(cmd, timeout=600, capture=False)
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
if code == 0:
|
||||
msg = f"✅ Fleet secrets rotation {rotation_id} completed in {elapsed:.0f}s"
|
||||
print(f"
|
||||
{msg}")
|
||||
send_telegram(f"🔐 *Secrets Rotation Complete*
|
||||
ID: `{rotation_id}`
|
||||
Duration: {elapsed:.0f}s
|
||||
All nodes verified.")
|
||||
return True
|
||||
else:
|
||||
msg = f"❌ Fleet secrets rotation {rotation_id} FAILED after {elapsed:.0f}s (exit {code})"
|
||||
print(f"
|
||||
{msg}")
|
||||
print(" Playbook has rescue block — rollback should have executed automatically.")
|
||||
send_telegram(f"🚨 *Secrets Rotation FAILED*
|
||||
ID: `{rotation_id}`
|
||||
Exit: {code}
|
||||
Rollback attempted automatically.
|
||||
Check: `ansible-playbook -i {INVENTORY} {PLAYBOOK}` logs.")
|
||||
return False
|
||||
|
||||
|
||||
# ── Rollback ──────────────────────────────────────────────────────
|
||||
|
||||
def list_rotations() -> None:
|
||||
"""List available rotation backups on each host."""
|
||||
print("═══ Available Rotation Backups ═══
|
||||
")
|
||||
for host in ["ezra", "bezalel"]:
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "shell",
|
||||
"-a", f"ls -la {BACKUP_ROOT}/ 2>/dev/null || echo 'No backups'",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
print(f"── {host} ──")
|
||||
print(out.strip() if out.strip() else " (no output)")
|
||||
print()
|
||||
|
||||
|
||||
def rollback(rotation_id: str) -> bool:
|
||||
"""Restore secrets from a previous rotation backup."""
|
||||
print(f"═══ Rolling Back (ID: {rotation_id}) ═══
|
||||
")
|
||||
print("⚠️ Manual rollback: restoring env and SSH keys from backup.")
|
||||
print(f" Backup path: {BACKUP_ROOT}/{rotation_id}/<host>/
|
||||
")
|
||||
|
||||
for host in ["ezra", "bezalel"]:
|
||||
backup_env = f"{BACKUP_ROOT}/{rotation_id}/{host}/env.before"
|
||||
backup_ssh = f"{BACKUP_ROOT}/{rotation_id}/{host}/authorized_keys.before"
|
||||
|
||||
# Check backup exists
|
||||
code, out, err = run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "stat", "-a", f"path={backup_env}",
|
||||
"--timeout", "10"
|
||||
], timeout=15)
|
||||
|
||||
if '"exists": true' not in out:
|
||||
print(f" ⚠️ {host}: no backup at {backup_env}")
|
||||
continue
|
||||
|
||||
# Restore env
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "copy",
|
||||
"-a", f"src={backup_env} dest=/root/wizards/{host}/home/.env remote_src=yes mode=0600",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
# Restore SSH keys
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "copy",
|
||||
"-a", f"src={backup_ssh} dest=/root/.ssh/authorized_keys remote_src=yes mode=0600",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
# Restart services
|
||||
run_cmd([
|
||||
"ansible", host, "-i", str(INVENTORY),
|
||||
"-m", "shell",
|
||||
"-a", "systemctl restart hermes-*.service openclaw-*.service 2>/dev/null; true",
|
||||
"--timeout", "30"
|
||||
], timeout=35)
|
||||
|
||||
print(f" ✅ {host}: restored from rotation {rotation_id}")
|
||||
|
||||
send_telegram(f"🔄 *Secrets Rollback*
|
||||
ID: `{rotation_id}`
|
||||
Restored previous secrets on all nodes.")
|
||||
return True
|
||||
|
||||
|
||||
# ── Main ──────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Fleet secrets rotation tool",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument("--check", action="store_true", help="Pre-flight validation")
|
||||
group.add_argument("--dry-run", action="store_true", help="Show what would change")
|
||||
group.add_argument("--rotate", action="store_true", help="Execute rotation")
|
||||
group.add_argument("--rollback", metavar="ID", help="Rollback to rotation ID")
|
||||
group.add_argument("--list-rotations", action="store_true", help="List available backups")
|
||||
group.add_argument("--json", action="store_true", help="Machine-readable output")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.json:
|
||||
# Machine-readable pre-flight for integration
|
||||
result = {
|
||||
"dependencies": check_dependencies() == [],
|
||||
"inventory_errors": check_inventory(),
|
||||
"hosts": check_connectivity(["ezra", "bezalel"]),
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
sys.exit(0 if not result["inventory_errors"] else 1)
|
||||
|
||||
if args.check:
|
||||
sys.exit(0 if preflight() else 1)
|
||||
elif args.dry_run:
|
||||
sys.exit(0 if dry_run() else 1)
|
||||
elif args.rotate:
|
||||
if not preflight():
|
||||
print("
|
||||
❌ Pre-flight failed. Aborting rotation.")
|
||||
sys.exit(1)
|
||||
sys.exit(0 if rotate() else 1)
|
||||
elif args.rollback:
|
||||
sys.exit(0 if rollback(args.rollback) else 1)
|
||||
elif args.list_rotations:
|
||||
list_rotations()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
177
tests/test_rotate_secrets.py
Normal file
177
tests/test_rotate_secrets.py
Normal file
@@ -0,0 +1,177 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for fleet secrets rotation CLI.
|
||||
|
||||
Tests pre-flight checks, argument parsing, and integration points.
|
||||
Does NOT execute actual rotations — uses mocks for ansible commands.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Add scripts dir to path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
|
||||
import rotate_secrets
|
||||
|
||||
|
||||
class TestDependencyCheck(unittest.TestCase):
|
||||
"""Test dependency verification."""
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_missing_ansible_playbook(self, mock_run):
|
||||
mock_run.return_value = (1, "", "not found")
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertIn("ansible-playbook", missing)
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_all_deps_present(self, mock_run):
|
||||
mock_run.return_value = (0, "/usr/bin/ansible-playbook", "")
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertEqual(missing, [])
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_missing_ansible_vault(self, mock_run):
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "ansible-vault" in cmd:
|
||||
return (1, "", "not found")
|
||||
return (0, "/usr/bin/ansible-playbook", "")
|
||||
mock_run.side_effect = side_effect
|
||||
missing = rotate_secrets.check_dependencies()
|
||||
self.assertIn("ansible-vault", missing)
|
||||
|
||||
|
||||
class TestInventoryCheck(unittest.TestCase):
|
||||
"""Test inventory file validation."""
|
||||
|
||||
def test_missing_inventory(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Override paths to temp dir
|
||||
original_inventory = rotate_secrets.INVENTORY
|
||||
original_vault = rotate_secrets.VAULT_FILE
|
||||
original_vars = rotate_secrets.FLEET_VARS
|
||||
|
||||
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
|
||||
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
|
||||
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
|
||||
|
||||
errors = rotate_secrets.check_inventory()
|
||||
self.assertEqual(len(errors), 3)
|
||||
self.assertTrue(any("hosts.ini" in e for e in errors))
|
||||
|
||||
# Restore
|
||||
rotate_secrets.INVENTORY = original_inventory
|
||||
rotate_secrets.VAULT_FILE = original_vault
|
||||
rotate_secrets.FLEET_VARS = original_vars
|
||||
|
||||
def test_all_files_present(self):
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create dummy files
|
||||
for name in ["hosts.ini", "vault.yml", "fleet.yml"]:
|
||||
(Path(tmpdir) / name).write_text("# placeholder")
|
||||
|
||||
original_inventory = rotate_secrets.INVENTORY
|
||||
original_vault = rotate_secrets.VAULT_FILE
|
||||
original_vars = rotate_secrets.FLEET_VARS
|
||||
|
||||
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
|
||||
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
|
||||
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
|
||||
|
||||
with patch("rotate_secrets.run_cmd") as mock_run:
|
||||
mock_run.return_value = (0, "vault content", "")
|
||||
errors = rotate_secrets.check_inventory()
|
||||
self.assertEqual(errors, [])
|
||||
|
||||
rotate_secrets.INVENTORY = original_inventory
|
||||
rotate_secrets.VAULT_FILE = original_vault
|
||||
rotate_secrets.FLEET_VARS = original_vars
|
||||
|
||||
|
||||
class TestConnectivity(unittest.TestCase):
|
||||
"""Test host connectivity checks."""
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_all_hosts_reachable(self, mock_run):
|
||||
mock_run.return_value = (0, "SUCCESS", "")
|
||||
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
|
||||
self.assertTrue(results["ezra"])
|
||||
self.assertTrue(results["bezalel"])
|
||||
|
||||
@patch("rotate_secrets.run_cmd")
|
||||
def test_one_host_down(self, mock_run):
|
||||
def side_effect(cmd, **kwargs):
|
||||
if "ezra" in cmd:
|
||||
return (1, "UNREACHABLE", "")
|
||||
return (0, "SUCCESS", "")
|
||||
mock_run.side_effect = side_effect
|
||||
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
|
||||
self.assertFalse(results["ezra"])
|
||||
self.assertTrue(results["bezalel"])
|
||||
|
||||
|
||||
class TestRunCmd(unittest.TestCase):
|
||||
"""Test command runner."""
|
||||
|
||||
def test_successful_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["echo", "hello"])
|
||||
self.assertEqual(code, 0)
|
||||
self.assertEqual(out.strip(), "hello")
|
||||
|
||||
def test_failing_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["false"])
|
||||
self.assertEqual(code, 1)
|
||||
|
||||
def test_missing_command(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["nonexistent_command_xyz"])
|
||||
self.assertEqual(code, -2)
|
||||
|
||||
def test_timeout(self):
|
||||
code, out, err = rotate_secrets.run_cmd(["sleep", "30"], timeout=1)
|
||||
self.assertEqual(code, -1)
|
||||
|
||||
|
||||
class TestTelegramNotification(unittest.TestCase):
|
||||
"""Test Telegram notification (no-op when not configured)."""
|
||||
|
||||
def test_no_token_returns_false(self):
|
||||
with patch.dict(os.environ, {"TELEGRAM_BOT_TOKEN": "", "TELEGRAM_CHAT_ID": ""}):
|
||||
result = rotate_secrets.send_telegram("test")
|
||||
self.assertFalse(result)
|
||||
|
||||
|
||||
class TestJsonOutput(unittest.TestCase):
|
||||
"""Test machine-readable JSON output."""
|
||||
|
||||
@patch("rotate_secrets.check_dependencies")
|
||||
@patch("rotate_secrets.check_inventory")
|
||||
@patch("rotate_secrets.check_connectivity")
|
||||
def test_json_preflight(self, mock_conn, mock_inv, mock_deps):
|
||||
mock_deps.return_value = []
|
||||
mock_inv.return_value = []
|
||||
mock_conn.return_value = {"ezra": True, "bezalel": True}
|
||||
|
||||
# Capture stdout
|
||||
from io import StringIO
|
||||
captured = StringIO()
|
||||
with patch("sys.stdout", captured):
|
||||
result = {
|
||||
"dependencies": True,
|
||||
"inventory_errors": [],
|
||||
"hosts": {"ezra": True, "bezalel": True},
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
output = captured.getvalue()
|
||||
parsed = json.loads(output)
|
||||
self.assertTrue(parsed["dependencies"])
|
||||
self.assertEqual(parsed["inventory_errors"], [])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user