Compare commits

..

4 Commits

Author SHA1 Message Date
9d41f255f9 feat(#694): operations guide for secrets rotation
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 52s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 21s
Smoke Test / smoke (pull_request) Failing after 15s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 05:25:42 +00:00
554646c9e0 feat(#694): tests for secrets rotation CLI — pre-flight, connectivity, cmd runner 2026-04-17 05:24:35 +00:00
d88578912e feat(#694): dry-run playbook for secrets rotation 2026-04-17 05:22:52 +00:00
80fcb3f53e feat(#694): fleet secrets rotation CLI — pre-flight, dry-run, rotate, rollback, Telegram alerts 2026-04-17 05:20:09 +00:00
7 changed files with 695 additions and 429 deletions

View File

@@ -0,0 +1,54 @@
---
- name: Fleet secrets rotation — dry run (diff only)
hosts: fleet
gather_facts: false
any_errors_fatal: false
vars_files:
- ../inventory/group_vars/fleet_secrets.vault.yml
vars:
env_file_path: "{{ fleet_secret_targets[inventory_hostname].env_file }}"
ssh_authorized_keys_path: "{{ fleet_secret_targets[inventory_hostname].ssh_authorized_keys_file }}"
tasks:
- name: Validate target metadata exists
ansible.builtin.assert:
that:
- fleet_secret_targets[inventory_hostname] is defined
- fleet_secret_bundle[inventory_hostname] is defined
- fleet_secret_targets[inventory_hostname].required_env_keys | length > 0
fail_msg: "Rotation inventory incomplete for {{ inventory_hostname }}"
- name: Show env file diff (would change)
ansible.builtin.debug:
msg: >-
Would update {{ fleet_secret_bundle[inventory_hostname].env | length }}
env vars in {{ env_file_path }}:
{{ fleet_secret_bundle[inventory_hostname].env.keys() | list | join(', ') }}"
- name: Show SSH keys diff (would change)
ansible.builtin.debug:
msg: >-
Would update authorized_keys at {{ ssh_authorized_keys_path }}
- name: Show services that would be restarted
ansible.builtin.debug:
msg: >-
Would restart: {{ fleet_secret_targets[inventory_hostname].services | join(', ') }}
- name: Verify services exist (dry run)
ansible.builtin.command: "systemctl cat {{ item }}"
register: svc_check
changed_when: false
failed_when: false
loop: "{{ fleet_secret_targets[inventory_hostname].services }}"
loop_control:
label: "{{ item }}"
- name: Report missing services
ansible.builtin.debug:
msg: "⚠️ Service {{ item.item }} not found on {{ inventory_hostname }}"
when: item.rc != 0
loop: "{{ svc_check.results }}"
loop_control:
label: "{{ item.item }}"
when: svc_check.results is defined

View File

@@ -1,74 +0,0 @@
# LAB-003 — Truck Battery Disconnect Install Packet
No battery disconnect switch has been purchased or installed yet.
This packet turns the issue into a field-ready purchase / install / validation checklist while preserving what still requires live work.
## Candidate Store Run
- AutoZone — Newport or Claremont
- Advance Auto Parts — Newport or Claremont
- O'Reilly Auto Parts — Newport or Claremont
## Required Items
- battery terminal disconnect switch
- terminal shim/post riser if needed
## Selection Criteria
- Fits the truck battery post without forcing the clamp
- Mounts on the negative battery terminal
- Physically secure once tightened
- no special tools required to operate
## Live Purchase State
- Store selected: pending
- Part selected: pending
- Part cost: pending purchase
## Installation Target
- Install location: negative battery terminal
- Ready to operate without tools: yes
## Install Checklist
- [ ] Verify the truck is off and keys are removed before touching the battery
- [ ] Confirm the disconnect fits the negative battery terminal before final tightening
- [ ] Install the disconnect on the negative battery terminal
- [ ] Tighten until physically secure with no terminal wobble
- [ ] Verify the disconnect can be opened and closed by hand
## Validation Checklist
- [ ] Leave the truck parked with the disconnect opened for at least 24 hours
- [ ] Reconnect the switch by hand the next day
- [ ] Truck starts reliably after sitting 24+ hours with switch disconnected
- [ ] Receipt or photo of installed switch uploaded to this issue
## Overnight Verification Log
- Install completed: False
- Physically secure: False
- Overnight disconnect duration: pending
- Truck started after disconnect: pending
- Receipt / photo path: pending
## Battery Replacement Fallback
If the truck still fails the overnight test after the disconnect install, replace battery and re-run the 24-hour validation.
## Missing Live Fields
- store_selected
- part_name
- install_completed
- physically_secure
- overnight_test_hours
- truck_started_after_disconnect
- receipt_or_photo_path
## Honest next step
Buy the disconnect switch, install it on the negative battery terminal, leave the truck disconnected for 24+ hours, and only close the issue after receipt/photo evidence and the overnight start result are attached.

93
docs/secrets-rotation.md Normal file
View File

@@ -0,0 +1,93 @@
# Fleet Secrets Rotation — Operations Guide
## Quick Start
```bash
# 1. Pre-flight: verify everything is ready
python3 scripts/rotate_secrets.py --check
# 2. Dry-run: see what would change
python3 scripts/rotate_secrets.py --dry-run
# 3. Execute rotation
python3 scripts/rotate_secrets.py --rotate
# 4. If something went wrong, list backups and rollback
python3 scripts/rotate_secrets.py --list-rotations
python3 scripts/rotate_secrets.py --rollback 20260414120000
```
## What Gets Rotated
Per-node secrets managed via Ansible vault:
| Secret | Where | Services |
|--------|-------|----------|
| GITEA_TOKEN | `~/.env` | hermes, openclaw |
| TELEGRAM_BOT_TOKEN | `~/.env` | hermes |
| PRIMARY_MODEL_API_KEY | `~/.env` | hermes, openclaw |
| SSH authorized_keys | `~/.ssh/authorized_keys` | sshd |
## Fleet Nodes
| Host | IP | Services |
|------|-----|----------|
| ezra | 143.198.27.163 | hermes-ezra, openclaw-ezra |
| bezalel | 67.205.155.108 | hermes-bezalel |
## Rotation Process
1. **Validate** — check inventory, vault decryption, host connectivity
2. **Backup** — snapshot current env + authorized_keys on each host
3. **Stage** — write new secrets to temp files
4. **Promote** — atomically swap staged files into place
5. **Verify** — restart services, confirm they are active (5 retries, 2s delay)
6. **Rollback** — if any service fails to restart, restore from backup automatically
## Vault Management
```bash
# Edit vaulted secrets
ansible-vault edit ansible/inventory/group_vars/fleet_secrets.vault.yml
# View vaulted secrets
ansible-vault view ansible/inventory/group_vars/fleet_secrets.vault.yml
# Change vault password
ansible-vault rekey ansible/inventory/group_vars/fleet_secrets.vault.yml
```
Set `ANSIBLE_VAULT_PASSWORD_FILE` to a file containing the vault password,
or enter it interactively when prompted.
## Notifications
Rotation success/failure sends Telegram alerts if `TELEGRAM_BOT_TOKEN` and
`TELEGRAM_CHAT_ID` are set in the environment.
## Machine-Readable Output
```bash
python3 scripts/rotate_secrets.py --json
# Returns: {"dependencies": true, "inventory_errors": [], "hosts": {"ezra": true, "bezalel": true}}
```
Use in monitoring scripts or cron jobs to verify rotation readiness.
## Files
```
ansible/
inventory/
hosts.ini # Fleet host definitions
group_vars/
fleet.yml # Target metadata (paths, services, required keys)
fleet_secrets.vault.yml # Vault-encrypted secret bundle
playbooks/
rotate_fleet_secrets.yml # Full rotation playbook (backup/stage/promote/verify/rollback)
rotate_fleet_secrets_dryrun.yml # Dry-run mode (diff only, no changes)
scripts/
rotate_secrets.py # CLI wrapper
tests/
test_rotate_secrets.py # Unit tests
```

View File

@@ -1,267 +0,0 @@
#!/usr/bin/env python3
"""Prepare a field-ready install packet for LAB-003 truck battery disconnect work."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
CANDIDATE_STORES = [
"AutoZone — Newport or Claremont",
"Advance Auto Parts — Newport or Claremont",
"O'Reilly Auto Parts — Newport or Claremont",
]
REQUIRED_ITEMS = [
"battery terminal disconnect switch",
"terminal shim/post riser if needed",
]
SELECTION_CRITERIA = [
"Fits the truck battery post without forcing the clamp",
"Mounts on the negative battery terminal",
"Physically secure once tightened",
"no special tools required to operate",
]
INSTALL_CHECKLIST = [
"Verify the truck is off and keys are removed before touching the battery",
"Confirm the disconnect fits the negative battery terminal before final tightening",
"Install the disconnect on the negative battery terminal",
"Tighten until physically secure with no terminal wobble",
"Verify the disconnect can be opened and closed by hand",
]
VALIDATION_CHECKLIST = [
"Leave the truck parked with the disconnect opened for at least 24 hours",
"Reconnect the switch by hand the next day",
"Truck starts reliably after sitting 24+ hours with switch disconnected",
"Receipt or photo of installed switch uploaded to this issue",
]
BATTERY_REPLACEMENT_FOLLOWUP = (
"If the truck still fails the overnight test after the disconnect install, "
"replace battery and re-run the 24-hour validation."
)
def _as_bool(value: Any) -> bool | None:
if value is None:
return None
if isinstance(value, bool):
return value
text = str(value).strip().lower()
if text in {"1", "true", "yes", "y"}:
return True
if text in {"0", "false", "no", "n"}:
return False
return None
def build_packet(details: dict[str, Any]) -> dict[str, Any]:
store_selected = (details.get("store_selected") or "").strip()
part_name = (details.get("part_name") or "").strip()
receipt_or_photo_path = (details.get("receipt_or_photo_path") or "").strip()
install_completed = _as_bool(details.get("install_completed"))
physically_secure = _as_bool(details.get("physically_secure"))
truck_started = _as_bool(details.get("truck_started_after_disconnect"))
replacement_needed = _as_bool(details.get("replacement_battery_needed"))
overnight_test_hours = details.get("overnight_test_hours")
part_cost_usd = details.get("part_cost_usd")
try:
overnight_test_hours = int(overnight_test_hours) if overnight_test_hours is not None else None
except (TypeError, ValueError):
overnight_test_hours = None
try:
part_cost_usd = float(part_cost_usd) if part_cost_usd is not None else None
except (TypeError, ValueError):
part_cost_usd = None
missing_fields: list[str] = []
if not store_selected:
missing_fields.append("store_selected")
if not part_name:
missing_fields.append("part_name")
if install_completed is not True:
missing_fields.append("install_completed")
if physically_secure is not True:
missing_fields.append("physically_secure")
if overnight_test_hours is None:
missing_fields.append("overnight_test_hours")
if truck_started is None:
missing_fields.append("truck_started_after_disconnect")
if not receipt_or_photo_path:
missing_fields.append("receipt_or_photo_path")
ready_to_operate_without_tools = True
if replacement_needed is True or truck_started is False:
status = "battery_replace_candidate"
elif not store_selected or not part_name:
status = "pending_parts_run"
elif install_completed is not True:
status = "pending_install"
elif physically_secure is not True or overnight_test_hours is None or truck_started is None or not receipt_or_photo_path:
status = "overnight_validation"
elif overnight_test_hours >= 24 and truck_started is True:
status = "verified"
else:
status = "overnight_validation"
return {
"candidate_stores": list(CANDIDATE_STORES),
"required_items": list(REQUIRED_ITEMS),
"selection_criteria": list(SELECTION_CRITERIA),
"install_target": "negative battery terminal",
"install_checklist": list(INSTALL_CHECKLIST),
"validation_checklist": list(VALIDATION_CHECKLIST),
"store_selected": store_selected,
"part_name": part_name,
"part_cost_usd": part_cost_usd,
"install_completed": install_completed,
"physically_secure": physically_secure,
"overnight_test_hours": overnight_test_hours,
"truck_started_after_disconnect": truck_started,
"receipt_or_photo_path": receipt_or_photo_path,
"ready_to_operate_without_tools": ready_to_operate_without_tools,
"missing_fields": missing_fields,
"battery_replacement_followup": BATTERY_REPLACEMENT_FOLLOWUP,
"status": status,
}
def render_markdown(packet: dict[str, Any]) -> str:
part_cost = packet["part_cost_usd"]
cost_line = f"${part_cost:.2f}" if isinstance(part_cost, (int, float)) else "pending purchase"
overnight = packet["overnight_test_hours"]
overnight_line = f"{overnight} hours" if overnight is not None else "pending"
started = packet["truck_started_after_disconnect"]
if started is True:
started_line = "yes"
elif started is False:
started_line = "no"
else:
started_line = "pending"
lines = [
"# LAB-003 — Truck Battery Disconnect Install Packet",
"",
"No battery disconnect switch has been purchased or installed yet.",
"This packet turns the issue into a field-ready purchase / install / validation checklist while preserving what still requires live work.",
"",
"## Candidate Store Run",
"",
]
lines.extend(f"- {store}" for store in packet["candidate_stores"])
lines.extend([
"",
"## Required Items",
"",
])
lines.extend(f"- {item}" for item in packet["required_items"])
lines.extend([
"",
"## Selection Criteria",
"",
])
lines.extend(f"- {item}" for item in packet["selection_criteria"])
lines.extend([
"",
"## Live Purchase State",
"",
f"- Store selected: {packet['store_selected'] or 'pending'}",
f"- Part selected: {packet['part_name'] or 'pending'}",
f"- Part cost: {cost_line}",
"",
"## Installation Target",
"",
f"- Install location: {packet['install_target']}",
f"- Ready to operate without tools: {'yes' if packet['ready_to_operate_without_tools'] else 'no'}",
"",
"## Install Checklist",
"",
])
lines.extend(f"- [ ] {item}" for item in packet["install_checklist"])
lines.extend([
"",
"## Validation Checklist",
"",
])
lines.extend(f"- [ ] {item}" for item in packet["validation_checklist"])
lines.extend([
"",
"## Overnight Verification Log",
"",
f"- Install completed: {packet['install_completed'] if packet['install_completed'] is not None else 'pending'}",
f"- Physically secure: {packet['physically_secure'] if packet['physically_secure'] is not None else 'pending'}",
f"- Overnight disconnect duration: {overnight_line}",
f"- Truck started after disconnect: {started_line}",
f"- Receipt / photo path: {packet['receipt_or_photo_path'] or 'pending'}",
"",
"## Battery Replacement Fallback",
"",
packet['battery_replacement_followup'],
"",
"## Missing Live Fields",
"",
])
if packet["missing_fields"]:
lines.extend(f"- {field}" for field in packet["missing_fields"])
else:
lines.append("- none")
lines.extend([
"",
"## Honest next step",
"",
"Buy the disconnect switch, install it on the negative battery terminal, leave the truck disconnected for 24+ hours, and only close the issue after receipt/photo evidence and the overnight start result are attached.",
"",
])
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(description="Prepare the LAB-003 battery disconnect install packet")
parser.add_argument("--store-selected", default="")
parser.add_argument("--part-name", default="")
parser.add_argument("--part-cost-usd", type=float, default=None)
parser.add_argument("--install-completed", action="store_true")
parser.add_argument("--physically-secure", action="store_true")
parser.add_argument("--overnight-test-hours", type=int, default=None)
parser.add_argument("--truck-started-after-disconnect", choices=["yes", "no"], default=None)
parser.add_argument("--receipt-or-photo-path", default="")
parser.add_argument("--replacement-battery-needed", action="store_true")
parser.add_argument("--output", default=None)
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
packet = build_packet(
{
"store_selected": args.store_selected,
"part_name": args.part_name,
"part_cost_usd": args.part_cost_usd,
"install_completed": args.install_completed,
"physically_secure": args.physically_secure,
"overnight_test_hours": args.overnight_test_hours,
"truck_started_after_disconnect": args.truck_started_after_disconnect,
"receipt_or_photo_path": args.receipt_or_photo_path,
"replacement_battery_needed": args.replacement_battery_needed,
}
)
rendered = json.dumps(packet, indent=2) if args.json else render_markdown(packet)
if args.output:
output_path = Path(args.output).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rendered, encoding="utf-8")
print(f"Battery disconnect packet written to {output_path}")
else:
print(rendered)
if __name__ == "__main__":
main()

371
scripts/rotate_secrets.py Normal file
View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
Fleet secrets rotation CLI.
Usage:
python3 rotate_secrets.py --check # Pre-flight validation
python3 rotate_secrets.py --dry-run # Show what would change
python3 rotate_secrets.py --rotate # Execute rotation
python3 rotate_secrets.py --rollback ID # Rollback to a previous rotation
python3 rotate_secrets.py --list-rotations # List available rollback points
Requires: ansible-playbook, ansible-vault (for --rotate with vaulted secrets)
"""
import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ── Paths ──────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
ANSIBLE_DIR = SCRIPT_DIR.parent / "ansible"
PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets.yml"
DRYRUN_PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets_dryrun.yml"
INVENTORY = ANSIBLE_DIR / "inventory" / "hosts.ini"
VAULT_FILE = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet_secrets.vault.yml"
FLEET_VARS = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet.yml"
BACKUP_ROOT = "/var/lib/timmy/secret-rotations"
# ── Telegram notification (optional) ──────────────────────────────
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT = os.environ.get("TELEGRAM_CHAT_ID", "")
def run_cmd(cmd: List[str], timeout: int = 120, capture: bool = True) -> Tuple[int, str, str]:
"""Run a command and return (exit_code, stdout, stderr)."""
try:
result = subprocess.run(
cmd, capture_output=capture, text=True, timeout=timeout
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", f"Command timed out after {timeout}s"
except FileNotFoundError:
return -2, "", f"Command not found: {cmd[0]}"
def send_telegram(message: str) -> bool:
"""Send notification via Telegram bot (if configured)."""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT:
return False
try:
import urllib.request
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown"
})
req = urllib.request.Request(url, data=payload.encode(),
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10):
return True
except Exception:
return False
# ── Pre-flight checks ─────────────────────────────────────────────
def check_dependencies() -> List[str]:
"""Verify required tools are installed."""
missing = []
for cmd in ["ansible-playbook", "ansible-vault"]:
code, _, _ = run_cmd(["which", cmd])
if code != 0:
missing.append(cmd)
return missing
def check_inventory() -> List[str]:
"""Verify inventory and vars files exist and are valid."""
errors = []
for path, desc in [
(INVENTORY, "inventory hosts.ini"),
(VAULT_FILE, "vault secrets file"),
(FLEET_VARS, "fleet vars"),
]:
if not path.exists():
errors.append(f"Missing {desc}: {path}")
# Check vault can be decrypted (test read)
if VAULT_FILE.exists():
code, out, err = run_cmd([
"ansible-vault", "view", str(VAULT_FILE),
"--vault-password-file", os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE", "/dev/null")
], timeout=10)
if code != 0:
errors.append(f"Cannot decrypt vault file: {err.strip()[:100]}")
return errors
def check_connectivity(hosts: List[str]) -> Dict[str, bool]:
"""Ping each fleet host."""
results = {}
for host in hosts:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY), "-m", "ping",
"--timeout", "10"
], timeout=15)
results[host] = code == 0 and "SUCCESS" in out
return results
def preflight() -> bool:
"""Run all pre-flight checks. Returns True if ready."""
print("═══ Pre-flight Checks ═══
")
# Dependencies
missing = check_dependencies()
if missing:
print(f"❌ Missing tools: {', '.join(missing)}")
print(f" Install with: apt-get install ansible")
return False
print("✅ Dependencies: ansible-playbook, ansible-vault found")
# Inventory files
errors = check_inventory()
if errors:
for e in errors:
print(f"{e}")
return False
print("✅ Inventory files present")
# Host connectivity
print("
Host Connectivity ")
hosts = ["ezra", "bezalel"]
reachable = check_connectivity(hosts)
all_ok = True
for host, ok in reachable.items():
status = "" if ok else ""
print(f" {status} {host}")
if not ok:
all_ok = False
if not all_ok:
print("
Some hosts unreachable. Rotation will fail on unreachable hosts.")
resp = input("Continue anyway? [y/N] ").strip().lower()
if resp != "y":
return False
print("
Pre-flight passed. Ready for rotation.")
return True
# ── Dry-run ────────────────────────────────────────────────────────
def dry_run() -> bool:
"""Run rotation playbook in check mode."""
print("═══ Dry Run (check mode) ═══
")
if not DRYRUN_PLAYBOOK.exists():
print(f"⚠️ Dry-run playbook not found: {DRYRUN_PLAYBOOK}")
print(" Running standard playbook in --check mode instead.")
playbook = PLAYBOOK
extra_args = ["--check", "--diff"]
else:
playbook = DRYRUN_PLAYBOOK
extra_args = ["--diff"]
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(playbook),
] + extra_args
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
code, out, err = run_cmd(cmd, timeout=300, capture=False)
if code == 0:
print("
Dry run completed successfully.")
return True
else:
print(f"
Dry run failed (exit {code}).")
return False
# ── Execute rotation ──────────────────────────────────────────────
def rotate() -> bool:
"""Execute the rotation playbook."""
rotation_id = datetime.now().strftime("%Y%m%d%H%M%S")
print(f"═══ Rotating Secrets (ID: {rotation_id}) ═══
")
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(PLAYBOOK),
]
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
start_time = time.time()
code, out, err = run_cmd(cmd, timeout=600, capture=False)
elapsed = time.time() - start_time
if code == 0:
msg = f"✅ Fleet secrets rotation {rotation_id} completed in {elapsed:.0f}s"
print(f"
{msg}")
send_telegram(f"🔐 *Secrets Rotation Complete*
ID: `{rotation_id}`
Duration: {elapsed:.0f}s
All nodes verified.")
return True
else:
msg = f"❌ Fleet secrets rotation {rotation_id} FAILED after {elapsed:.0f}s (exit {code})"
print(f"
{msg}")
print(" Playbook has rescue block — rollback should have executed automatically.")
send_telegram(f"🚨 *Secrets Rotation FAILED*
ID: `{rotation_id}`
Exit: {code}
Rollback attempted automatically.
Check: `ansible-playbook -i {INVENTORY} {PLAYBOOK}` logs.")
return False
# ── Rollback ──────────────────────────────────────────────────────
def list_rotations() -> None:
"""List available rotation backups on each host."""
print("═══ Available Rotation Backups ═══
")
for host in ["ezra", "bezalel"]:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", f"ls -la {BACKUP_ROOT}/ 2>/dev/null || echo 'No backups'",
"--timeout", "10"
], timeout=15)
print(f"── {host} ──")
print(out.strip() if out.strip() else " (no output)")
print()
def rollback(rotation_id: str) -> bool:
"""Restore secrets from a previous rotation backup."""
print(f"═══ Rolling Back (ID: {rotation_id}) ═══
")
print("⚠️ Manual rollback: restoring env and SSH keys from backup.")
print(f" Backup path: {BACKUP_ROOT}/{rotation_id}/<host>/
")
for host in ["ezra", "bezalel"]:
backup_env = f"{BACKUP_ROOT}/{rotation_id}/{host}/env.before"
backup_ssh = f"{BACKUP_ROOT}/{rotation_id}/{host}/authorized_keys.before"
# Check backup exists
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "stat", "-a", f"path={backup_env}",
"--timeout", "10"
], timeout=15)
if '"exists": true' not in out:
print(f" ⚠️ {host}: no backup at {backup_env}")
continue
# Restore env
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_env} dest=/root/wizards/{host}/home/.env remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restore SSH keys
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_ssh} dest=/root/.ssh/authorized_keys remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restart services
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", "systemctl restart hermes-*.service openclaw-*.service 2>/dev/null; true",
"--timeout", "30"
], timeout=35)
print(f"{host}: restored from rotation {rotation_id}")
send_telegram(f"🔄 *Secrets Rollback*
ID: `{rotation_id}`
Restored previous secrets on all nodes.")
return True
# ── Main ──────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Fleet secrets rotation tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Pre-flight validation")
group.add_argument("--dry-run", action="store_true", help="Show what would change")
group.add_argument("--rotate", action="store_true", help="Execute rotation")
group.add_argument("--rollback", metavar="ID", help="Rollback to rotation ID")
group.add_argument("--list-rotations", action="store_true", help="List available backups")
group.add_argument("--json", action="store_true", help="Machine-readable output")
args = parser.parse_args()
if args.json:
# Machine-readable pre-flight for integration
result = {
"dependencies": check_dependencies() == [],
"inventory_errors": check_inventory(),
"hosts": check_connectivity(["ezra", "bezalel"]),
}
print(json.dumps(result, indent=2))
sys.exit(0 if not result["inventory_errors"] else 1)
if args.check:
sys.exit(0 if preflight() else 1)
elif args.dry_run:
sys.exit(0 if dry_run() else 1)
elif args.rotate:
if not preflight():
print("
Pre-flight failed. Aborting rotation.")
sys.exit(1)
sys.exit(0 if rotate() else 1)
elif args.rollback:
sys.exit(0 if rollback(args.rollback) else 1)
elif args.list_rotations:
list_rotations()
if __name__ == "__main__":
main()

View File

@@ -1,88 +0,0 @@
from pathlib import Path
import importlib.util
import unittest
ROOT = Path(__file__).resolve().parent.parent
SCRIPT_PATH = ROOT / "scripts" / "lab_003_battery_disconnect_packet.py"
DOC_PATH = ROOT / "docs" / "LAB_003_BATTERY_DISCONNECT_PACKET.md"
def load_module(path: Path, name: str):
assert path.exists(), f"missing {path.relative_to(ROOT)}"
spec = importlib.util.spec_from_file_location(name, path)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
class TestLab003BatteryDisconnectPacket(unittest.TestCase):
def test_packet_defaults_to_parts_run_and_tracks_issue_specific_requirements(self):
mod = load_module(SCRIPT_PATH, "lab_003_battery_disconnect_packet")
packet = mod.build_packet({})
self.assertEqual(packet["status"], "pending_parts_run")
self.assertEqual(packet["install_target"], "negative battery terminal")
self.assertIn("battery terminal disconnect switch", packet["required_items"])
self.assertIn("terminal shim/post riser if needed", packet["required_items"])
self.assertIn("AutoZone", packet["candidate_stores"][0])
self.assertIn("no special tools required to operate", packet["selection_criteria"])
self.assertIn("overnight_test_hours", packet["missing_fields"])
self.assertIn("receipt_or_photo_path", packet["missing_fields"])
def test_packet_marks_verified_after_successful_24h_validation_with_proof(self):
mod = load_module(SCRIPT_PATH, "lab_003_battery_disconnect_packet")
packet = mod.build_packet(
{
"store_selected": "AutoZone - Newport",
"part_name": "Knob-style battery disconnect switch",
"part_cost_usd": 24.99,
"install_completed": True,
"physically_secure": True,
"overnight_test_hours": 26,
"truck_started_after_disconnect": True,
"receipt_or_photo_path": "evidence/lab-003-installed-switch.jpg",
}
)
self.assertEqual(packet["status"], "verified")
self.assertEqual(packet["missing_fields"], [])
self.assertTrue(packet["ready_to_operate_without_tools"])
def test_packet_flags_battery_replace_candidate_when_overnight_test_fails(self):
mod = load_module(SCRIPT_PATH, "lab_003_battery_disconnect_packet")
packet = mod.build_packet(
{
"store_selected": "O'Reilly - Claremont",
"part_name": "Knob-style battery disconnect switch",
"install_completed": True,
"physically_secure": True,
"overnight_test_hours": 24,
"truck_started_after_disconnect": False,
}
)
self.assertEqual(packet["status"], "battery_replace_candidate")
self.assertIn("battery_replacement_followup", packet)
self.assertIn("replace battery", packet["battery_replacement_followup"].lower())
def test_repo_contains_grounded_lab_003_packet_doc(self):
self.assertTrue(DOC_PATH.exists(), "missing committed LAB-003 packet doc")
text = DOC_PATH.read_text(encoding="utf-8")
for snippet in (
"# LAB-003 — Truck Battery Disconnect Install Packet",
"No battery disconnect switch has been purchased or installed yet.",
"negative battery terminal",
"AutoZone",
"Advance",
"O'Reilly",
"terminal shim/post riser if needed",
"Truck starts reliably after sitting 24+ hours with switch disconnected",
"Receipt or photo of installed switch uploaded to this issue",
):
self.assertIn(snippet, text)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Tests for fleet secrets rotation CLI.
Tests pre-flight checks, argument parsing, and integration points.
Does NOT execute actual rotations — uses mocks for ansible commands.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add scripts dir to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
import rotate_secrets
class TestDependencyCheck(unittest.TestCase):
"""Test dependency verification."""
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_playbook(self, mock_run):
mock_run.return_value = (1, "", "not found")
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-playbook", missing)
@patch("rotate_secrets.run_cmd")
def test_all_deps_present(self, mock_run):
mock_run.return_value = (0, "/usr/bin/ansible-playbook", "")
missing = rotate_secrets.check_dependencies()
self.assertEqual(missing, [])
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_vault(self, mock_run):
def side_effect(cmd, **kwargs):
if "ansible-vault" in cmd:
return (1, "", "not found")
return (0, "/usr/bin/ansible-playbook", "")
mock_run.side_effect = side_effect
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-vault", missing)
class TestInventoryCheck(unittest.TestCase):
"""Test inventory file validation."""
def test_missing_inventory(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Override paths to temp dir
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
errors = rotate_secrets.check_inventory()
self.assertEqual(len(errors), 3)
self.assertTrue(any("hosts.ini" in e for e in errors))
# Restore
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
def test_all_files_present(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Create dummy files
for name in ["hosts.ini", "vault.yml", "fleet.yml"]:
(Path(tmpdir) / name).write_text("# placeholder")
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
with patch("rotate_secrets.run_cmd") as mock_run:
mock_run.return_value = (0, "vault content", "")
errors = rotate_secrets.check_inventory()
self.assertEqual(errors, [])
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
class TestConnectivity(unittest.TestCase):
"""Test host connectivity checks."""
@patch("rotate_secrets.run_cmd")
def test_all_hosts_reachable(self, mock_run):
mock_run.return_value = (0, "SUCCESS", "")
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertTrue(results["ezra"])
self.assertTrue(results["bezalel"])
@patch("rotate_secrets.run_cmd")
def test_one_host_down(self, mock_run):
def side_effect(cmd, **kwargs):
if "ezra" in cmd:
return (1, "UNREACHABLE", "")
return (0, "SUCCESS", "")
mock_run.side_effect = side_effect
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertFalse(results["ezra"])
self.assertTrue(results["bezalel"])
class TestRunCmd(unittest.TestCase):
"""Test command runner."""
def test_successful_command(self):
code, out, err = rotate_secrets.run_cmd(["echo", "hello"])
self.assertEqual(code, 0)
self.assertEqual(out.strip(), "hello")
def test_failing_command(self):
code, out, err = rotate_secrets.run_cmd(["false"])
self.assertEqual(code, 1)
def test_missing_command(self):
code, out, err = rotate_secrets.run_cmd(["nonexistent_command_xyz"])
self.assertEqual(code, -2)
def test_timeout(self):
code, out, err = rotate_secrets.run_cmd(["sleep", "30"], timeout=1)
self.assertEqual(code, -1)
class TestTelegramNotification(unittest.TestCase):
"""Test Telegram notification (no-op when not configured)."""
def test_no_token_returns_false(self):
with patch.dict(os.environ, {"TELEGRAM_BOT_TOKEN": "", "TELEGRAM_CHAT_ID": ""}):
result = rotate_secrets.send_telegram("test")
self.assertFalse(result)
class TestJsonOutput(unittest.TestCase):
"""Test machine-readable JSON output."""
@patch("rotate_secrets.check_dependencies")
@patch("rotate_secrets.check_inventory")
@patch("rotate_secrets.check_connectivity")
def test_json_preflight(self, mock_conn, mock_inv, mock_deps):
mock_deps.return_value = []
mock_inv.return_value = []
mock_conn.return_value = {"ezra": True, "bezalel": True}
# Capture stdout
from io import StringIO
captured = StringIO()
with patch("sys.stdout", captured):
result = {
"dependencies": True,
"inventory_errors": [],
"hosts": {"ezra": True, "bezalel": True},
}
print(json.dumps(result, indent=2))
output = captured.getvalue()
parsed = json.loads(output)
self.assertTrue(parsed["dependencies"])
self.assertEqual(parsed["inventory_errors"], [])
if __name__ == "__main__":
unittest.main()