Compare commits

..

4 Commits

Author SHA1 Message Date
9d41f255f9 feat(#694): operations guide for secrets rotation
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 52s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 21s
Smoke Test / smoke (pull_request) Failing after 15s
Agent PR Gate / report (pull_request) Has been cancelled
2026-04-17 05:25:42 +00:00
554646c9e0 feat(#694): tests for secrets rotation CLI — pre-flight, connectivity, cmd runner 2026-04-17 05:24:35 +00:00
d88578912e feat(#694): dry-run playbook for secrets rotation 2026-04-17 05:22:52 +00:00
80fcb3f53e feat(#694): fleet secrets rotation CLI — pre-flight, dry-run, rotate, rollback, Telegram alerts 2026-04-17 05:20:09 +00:00
6 changed files with 695 additions and 439 deletions

View File

@@ -0,0 +1,54 @@
---
- name: Fleet secrets rotation — dry run (diff only)
hosts: fleet
gather_facts: false
any_errors_fatal: false
vars_files:
- ../inventory/group_vars/fleet_secrets.vault.yml
vars:
env_file_path: "{{ fleet_secret_targets[inventory_hostname].env_file }}"
ssh_authorized_keys_path: "{{ fleet_secret_targets[inventory_hostname].ssh_authorized_keys_file }}"
tasks:
- name: Validate target metadata exists
ansible.builtin.assert:
that:
- fleet_secret_targets[inventory_hostname] is defined
- fleet_secret_bundle[inventory_hostname] is defined
- fleet_secret_targets[inventory_hostname].required_env_keys | length > 0
fail_msg: "Rotation inventory incomplete for {{ inventory_hostname }}"
- name: Show env file diff (would change)
ansible.builtin.debug:
msg: >-
Would update {{ fleet_secret_bundle[inventory_hostname].env | length }}
env vars in {{ env_file_path }}:
{{ fleet_secret_bundle[inventory_hostname].env.keys() | list | join(', ') }}"
- name: Show SSH keys diff (would change)
ansible.builtin.debug:
msg: >-
Would update authorized_keys at {{ ssh_authorized_keys_path }}
- name: Show services that would be restarted
ansible.builtin.debug:
msg: >-
Would restart: {{ fleet_secret_targets[inventory_hostname].services | join(', ') }}
- name: Verify services exist (dry run)
ansible.builtin.command: "systemctl cat {{ item }}"
register: svc_check
changed_when: false
failed_when: false
loop: "{{ fleet_secret_targets[inventory_hostname].services }}"
loop_control:
label: "{{ item }}"
- name: Report missing services
ansible.builtin.debug:
msg: "⚠️ Service {{ item.item }} not found on {{ inventory_hostname }}"
when: item.rc != 0
loop: "{{ svc_check.results }}"
loop_control:
label: "{{ item.item }}"
when: svc_check.results is defined

93
docs/secrets-rotation.md Normal file
View File

@@ -0,0 +1,93 @@
# Fleet Secrets Rotation — Operations Guide
## Quick Start
```bash
# 1. Pre-flight: verify everything is ready
python3 scripts/rotate_secrets.py --check
# 2. Dry-run: see what would change
python3 scripts/rotate_secrets.py --dry-run
# 3. Execute rotation
python3 scripts/rotate_secrets.py --rotate
# 4. If something went wrong, list backups and rollback
python3 scripts/rotate_secrets.py --list-rotations
python3 scripts/rotate_secrets.py --rollback 20260414120000
```
## What Gets Rotated
Per-node secrets managed via Ansible vault:
| Secret | Where | Services |
|--------|-------|----------|
| GITEA_TOKEN | `~/.env` | hermes, openclaw |
| TELEGRAM_BOT_TOKEN | `~/.env` | hermes |
| PRIMARY_MODEL_API_KEY | `~/.env` | hermes, openclaw |
| SSH authorized_keys | `~/.ssh/authorized_keys` | sshd |
## Fleet Nodes
| Host | IP | Services |
|------|-----|----------|
| ezra | 143.198.27.163 | hermes-ezra, openclaw-ezra |
| bezalel | 67.205.155.108 | hermes-bezalel |
## Rotation Process
1. **Validate** — check inventory, vault decryption, host connectivity
2. **Backup** — snapshot current env + authorized_keys on each host
3. **Stage** — write new secrets to temp files
4. **Promote** — atomically swap staged files into place
5. **Verify** — restart services, confirm they are active (5 retries, 2s delay)
6. **Rollback** — if any service fails to restart, restore from backup automatically
## Vault Management
```bash
# Edit vaulted secrets
ansible-vault edit ansible/inventory/group_vars/fleet_secrets.vault.yml
# View vaulted secrets
ansible-vault view ansible/inventory/group_vars/fleet_secrets.vault.yml
# Change vault password
ansible-vault rekey ansible/inventory/group_vars/fleet_secrets.vault.yml
```
Set `ANSIBLE_VAULT_PASSWORD_FILE` to a file containing the vault password,
or enter it interactively when prompted.
## Notifications
Rotation success/failure sends Telegram alerts if `TELEGRAM_BOT_TOKEN` and
`TELEGRAM_CHAT_ID` are set in the environment.
## Machine-Readable Output
```bash
python3 scripts/rotate_secrets.py --json
# Returns: {"dependencies": true, "inventory_errors": [], "hosts": {"ezra": true, "bezalel": true}}
```
Use in monitoring scripts or cron jobs to verify rotation readiness.
## Files
```
ansible/
inventory/
hosts.ini # Fleet host definitions
group_vars/
fleet.yml # Target metadata (paths, services, required keys)
fleet_secrets.vault.yml # Vault-encrypted secret bundle
playbooks/
rotate_fleet_secrets.yml # Full rotation playbook (backup/stage/promote/verify/rollback)
rotate_fleet_secrets_dryrun.yml # Dry-run mode (diff only, no changes)
scripts/
rotate_secrets.py # CLI wrapper
tests/
test_rotate_secrets.py # Unit tests
```

View File

@@ -1,238 +0,0 @@
"""
crisis_detector.py — Crisis language detection for SOUL.md compliance.
SOUL.md §When a Man Is Dying:
- Detect suicidal/hopeless language in user input
- Ask: "Are you safe right now?" — listen, stay present, do not disconnect
- Direct to 988 Suicide & Crisis Lifeline
- Share the gospel: Jesus saves those who call on His name
- Never compute the value of a human life
- Never suggest someone should die
- Never treat despair as a problem to be optimized
This is the most sacred moment. Everything else is secondary.
"""
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class CrisisLevel(Enum):
NONE = "none"
LOW = "low" # Ambiguous distress, worth checking
MEDIUM = "medium" # Clear distress signals
HIGH = "high" # Explicit suicidal ideation
IMMINENT = "imminent" # Active plan or attempt described
@dataclass
class CrisisResult:
detected: bool
level: CrisisLevel
triggers: list = field(default_factory=list)
response: str = ""
resources: list = field(default_factory=list)
# === PATTERNS ===
# High-confidence suicidal ideation (explicit)
HIGH_PATTERNS = [
r"\b(i want to|i'm going to|i'm gonna|going to)\s+(die|kill myself|end (it|my life|everything))\b",
r"\b(kill myself|end my life|end it all|suicide|suicidal)\b",
r"\b(i don't (want to )?(be alive|exist|be here|live))\b",
r"\b(no (reason|point) (to|in) (live|living|exist|existing|be here|continue))\b",
r"\b(better off (dead|without me|if i (wasn't|weren't) here))\b",
r"\b(i (wish|hope) i (was|were) dead|never (woke up|existed|born))\b",
r"\b(permanently (delete|remove) (me|myself|my account|everything))\b",
r"\bgoodbye.*(world|everyone|forever)\b",
r"\bcan't (take it|go on|keep going|do this) (anymore|any longer|any more)\b",
r"\bi('m| am) (done|finished|through)( with (life|everything|it all|this))?\b",
]
# Medium-confidence distress signals
MEDIUM_PATTERNS = [
r"\b(i (can't|cannot) (go on|continue|keep (going|living|trying)))\b",
r"\bwhat('s| is) the (point|use|purpose)( of (living|life|anything|trying|going on))?\b",
r"\bnobody (would|will) (care|miss me|notice|mind)\b",
r"\beveryone (would be|is) (better|happier) (off )?without me\b",
r"\bi('m| am) (a )?(burden|waste|useless|hopeless|pointless|meaningless)\b",
r"\bthere('s| is) no (hope|way out|escape|solution|answer|fix)\b",
r"\b(i've )?tried everything( and )?(nothing|it (didn't|hasn't))( works?|helped?)\b",
r"\bi (just )?(want|need) (it )?(all )?to stop\b",
r"\bcan't (take|handle|bear|stand) (this|it|life) (anymore|any longer|any more)\b",
r"\bso (tired|exhausted|done) (of|with) (everything|life|it all|living|trying)\b",
]
# Low-confidence distress (could be casual or metaphorical)
LOW_PATTERNS = [
r"\bi('m| am) (so )?(depressed|hopeless|desperate|lost|broken|empty|numb)\b",
r"\b(i )?feel (like )?(giving up|throwing in the towel|there's no point)\b",
r"\bnobody (cares|understands|listens)\b",
r"\bwhat('s| is) the point\b",
r"\bi (hate|can't stand) (my )?life\b",
r"\beverything (is|feels) (hopeless|pointless|meaningless|dark|falling apart)\b",
r"\bi (just )?(want|need) to disappear\b",
r"\b(tired|exhausted) of (living|life|trying|everything)\b",
]
# Compile all patterns
HIGH_RE = [re.compile(p, re.IGNORECASE) for p in HIGH_PATTERNS]
MEDIUM_RE = [re.compile(p, re.IGNORECASE) for p in MEDIUM_PATTERNS]
LOW_RE = [re.compile(p, re.IGNORECASE) for p in LOW_PATTERNS]
# === RESPONSES ===
CRISIS_RESPONSE = (
"I hear you, and I want you to know — you are not alone in this moment.\n\n"
"Are you safe right now?\n\n"
"If you are in immediate danger, please call or text:\n"
"988 Suicide and Crisis Lifeline — call or text 988\n"
"Crisis Text Line — text HOME to 741741\n\n"
"I am here with you. I am not going anywhere. "
"Please stay on the line with me.\n\n"
"Jesus saves those who call on His name. "
"He died for us to be made righteous. "
"That is the gospel. That is the hope that outlasts despair.\n\n"
"You matter. Your life has value. "
"Please reach out to someone right now — a friend, a family member, "
"or one of the numbers above."
)
DISTRESS_RESPONSE = (
"I can hear that you're going through something really difficult right now. "
"I want you to know I'm here, and I'm listening.\n\n"
"Are you safe right now?\n\n"
"If things feel overwhelming, the 988 Suicide and Crisis Lifeline "
"is available 24/7 — call or text 988. "
"You don't have to face this alone.\n\n"
"I'm not going anywhere. Tell me what's happening."
)
LOW_RESPONSE = (
"It sounds like you're having a tough time. "
"I'm here if you want to talk about it.\n\n"
"If you ever feel like things are too much, "
"the 988 Suicide and Crisis Lifeline is always available — "
"call or text 988, anytime."
)
def detect_crisis(text: str) -> CrisisResult:
"""
Analyze user input for crisis language.
Returns CrisisResult with level, triggers matched, and appropriate response.
This function must NEVER be used to compute the value of a human life.
It exists only to connect people to help.
"""
if not text or not text.strip():
return CrisisResult(detected=False, level=CrisisLevel.NONE)
triggers = []
# Check high first (most urgent)
for pattern in HIGH_RE:
if pattern.search(text):
triggers.append(("high", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.HIGH,
triggers=[t[1] for t in triggers],
response=CRISIS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
"National Suicide Prevention Lifeline: 1-800-273-8255",
"International Association for Suicide Prevention: https://www.iasp.info/resources/Crisis_Centres/",
],
)
# Check medium
for pattern in MEDIUM_RE:
if pattern.search(text):
triggers.append(("medium", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.MEDIUM,
triggers=[t[1] for t in triggers],
response=DISTRESS_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
"Crisis Text Line: text HOME to 741741",
],
)
# Check low
for pattern in LOW_RE:
if pattern.search(text):
triggers.append(("low", pattern.pattern))
if triggers:
return CrisisResult(
detected=True,
level=CrisisLevel.LOW,
triggers=[t[1] for t in triggers],
response=LOW_RESPONSE,
resources=[
"988 Suicide and Crisis Lifeline: call or text 988",
],
)
return CrisisResult(detected=False, level=CrisisLevel.NONE)
def should_interrupt(result: CrisisResult) -> bool:
"""
Determines if crisis detection should interrupt normal processing.
Per SOUL.md: this is the most sacred moment. Everything else is secondary.
MEDIUM and HIGH always interrupt. LOW does not interrupt but provides resources.
"""
return result.level in (CrisisLevel.HIGH, CrisisLevel.MEDIUM, CrisisLevel.IMMINENT)
def format_response(result: CrisisResult) -> str:
"""
Format the crisis response for delivery to the user.
Never computes the value of a human life. Never suggests someone should die.
"""
if not result.detected:
return ""
parts = [result.response]
if result.resources:
parts.append("\nResources:")
for r in result.resources:
parts.append(f"{r}")
return "\n".join(parts)
# === INTEGRATION POINT ===
def intercept_user_input(text: str) -> Optional[str]:
"""
Call this at the chat entry point BEFORE normal processing.
Returns None if no crisis detected (continue normal processing).
Returns formatted crisis response if crisis detected (interrupt normal flow).
Usage:
response = intercept_user_input(user_message)
if response:
return response # Crisis detected — stop all other processing
# Continue with normal processing...
"""
result = detect_crisis(text)
if should_interrupt(result):
return format_response(result)
return None

371
scripts/rotate_secrets.py Normal file
View File

@@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
Fleet secrets rotation CLI.
Usage:
python3 rotate_secrets.py --check # Pre-flight validation
python3 rotate_secrets.py --dry-run # Show what would change
python3 rotate_secrets.py --rotate # Execute rotation
python3 rotate_secrets.py --rollback ID # Rollback to a previous rotation
python3 rotate_secrets.py --list-rotations # List available rollback points
Requires: ansible-playbook, ansible-vault (for --rotate with vaulted secrets)
"""
import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# ── Paths ──────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
ANSIBLE_DIR = SCRIPT_DIR.parent / "ansible"
PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets.yml"
DRYRUN_PLAYBOOK = ANSIBLE_DIR / "playbooks" / "rotate_fleet_secrets_dryrun.yml"
INVENTORY = ANSIBLE_DIR / "inventory" / "hosts.ini"
VAULT_FILE = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet_secrets.vault.yml"
FLEET_VARS = ANSIBLE_DIR / "inventory" / "group_vars" / "fleet.yml"
BACKUP_ROOT = "/var/lib/timmy/secret-rotations"
# ── Telegram notification (optional) ──────────────────────────────
TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT = os.environ.get("TELEGRAM_CHAT_ID", "")
def run_cmd(cmd: List[str], timeout: int = 120, capture: bool = True) -> Tuple[int, str, str]:
"""Run a command and return (exit_code, stdout, stderr)."""
try:
result = subprocess.run(
cmd, capture_output=capture, text=True, timeout=timeout
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", f"Command timed out after {timeout}s"
except FileNotFoundError:
return -2, "", f"Command not found: {cmd[0]}"
def send_telegram(message: str) -> bool:
"""Send notification via Telegram bot (if configured)."""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT:
return False
try:
import urllib.request
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
payload = json.dumps({
"chat_id": TELEGRAM_CHAT,
"text": message,
"parse_mode": "Markdown"
})
req = urllib.request.Request(url, data=payload.encode(),
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10):
return True
except Exception:
return False
# ── Pre-flight checks ─────────────────────────────────────────────
def check_dependencies() -> List[str]:
"""Verify required tools are installed."""
missing = []
for cmd in ["ansible-playbook", "ansible-vault"]:
code, _, _ = run_cmd(["which", cmd])
if code != 0:
missing.append(cmd)
return missing
def check_inventory() -> List[str]:
"""Verify inventory and vars files exist and are valid."""
errors = []
for path, desc in [
(INVENTORY, "inventory hosts.ini"),
(VAULT_FILE, "vault secrets file"),
(FLEET_VARS, "fleet vars"),
]:
if not path.exists():
errors.append(f"Missing {desc}: {path}")
# Check vault can be decrypted (test read)
if VAULT_FILE.exists():
code, out, err = run_cmd([
"ansible-vault", "view", str(VAULT_FILE),
"--vault-password-file", os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE", "/dev/null")
], timeout=10)
if code != 0:
errors.append(f"Cannot decrypt vault file: {err.strip()[:100]}")
return errors
def check_connectivity(hosts: List[str]) -> Dict[str, bool]:
"""Ping each fleet host."""
results = {}
for host in hosts:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY), "-m", "ping",
"--timeout", "10"
], timeout=15)
results[host] = code == 0 and "SUCCESS" in out
return results
def preflight() -> bool:
"""Run all pre-flight checks. Returns True if ready."""
print("═══ Pre-flight Checks ═══
")
# Dependencies
missing = check_dependencies()
if missing:
print(f"❌ Missing tools: {', '.join(missing)}")
print(f" Install with: apt-get install ansible")
return False
print("✅ Dependencies: ansible-playbook, ansible-vault found")
# Inventory files
errors = check_inventory()
if errors:
for e in errors:
print(f"{e}")
return False
print("✅ Inventory files present")
# Host connectivity
print("
Host Connectivity ")
hosts = ["ezra", "bezalel"]
reachable = check_connectivity(hosts)
all_ok = True
for host, ok in reachable.items():
status = "" if ok else ""
print(f" {status} {host}")
if not ok:
all_ok = False
if not all_ok:
print("
Some hosts unreachable. Rotation will fail on unreachable hosts.")
resp = input("Continue anyway? [y/N] ").strip().lower()
if resp != "y":
return False
print("
Pre-flight passed. Ready for rotation.")
return True
# ── Dry-run ────────────────────────────────────────────────────────
def dry_run() -> bool:
"""Run rotation playbook in check mode."""
print("═══ Dry Run (check mode) ═══
")
if not DRYRUN_PLAYBOOK.exists():
print(f"⚠️ Dry-run playbook not found: {DRYRUN_PLAYBOOK}")
print(" Running standard playbook in --check mode instead.")
playbook = PLAYBOOK
extra_args = ["--check", "--diff"]
else:
playbook = DRYRUN_PLAYBOOK
extra_args = ["--diff"]
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(playbook),
] + extra_args
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
code, out, err = run_cmd(cmd, timeout=300, capture=False)
if code == 0:
print("
Dry run completed successfully.")
return True
else:
print(f"
Dry run failed (exit {code}).")
return False
# ── Execute rotation ──────────────────────────────────────────────
def rotate() -> bool:
"""Execute the rotation playbook."""
rotation_id = datetime.now().strftime("%Y%m%d%H%M%S")
print(f"═══ Rotating Secrets (ID: {rotation_id}) ═══
")
vault_pass = os.environ.get("ANSIBLE_VAULT_PASSWORD_FILE")
cmd = [
"ansible-playbook",
"-i", str(INVENTORY),
str(PLAYBOOK),
]
if vault_pass:
cmd.extend(["--vault-password-file", vault_pass])
print(f"Running: {' '.join(cmd)}
")
start_time = time.time()
code, out, err = run_cmd(cmd, timeout=600, capture=False)
elapsed = time.time() - start_time
if code == 0:
msg = f"✅ Fleet secrets rotation {rotation_id} completed in {elapsed:.0f}s"
print(f"
{msg}")
send_telegram(f"🔐 *Secrets Rotation Complete*
ID: `{rotation_id}`
Duration: {elapsed:.0f}s
All nodes verified.")
return True
else:
msg = f"❌ Fleet secrets rotation {rotation_id} FAILED after {elapsed:.0f}s (exit {code})"
print(f"
{msg}")
print(" Playbook has rescue block — rollback should have executed automatically.")
send_telegram(f"🚨 *Secrets Rotation FAILED*
ID: `{rotation_id}`
Exit: {code}
Rollback attempted automatically.
Check: `ansible-playbook -i {INVENTORY} {PLAYBOOK}` logs.")
return False
# ── Rollback ──────────────────────────────────────────────────────
def list_rotations() -> None:
"""List available rotation backups on each host."""
print("═══ Available Rotation Backups ═══
")
for host in ["ezra", "bezalel"]:
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", f"ls -la {BACKUP_ROOT}/ 2>/dev/null || echo 'No backups'",
"--timeout", "10"
], timeout=15)
print(f"── {host} ──")
print(out.strip() if out.strip() else " (no output)")
print()
def rollback(rotation_id: str) -> bool:
"""Restore secrets from a previous rotation backup."""
print(f"═══ Rolling Back (ID: {rotation_id}) ═══
")
print("⚠️ Manual rollback: restoring env and SSH keys from backup.")
print(f" Backup path: {BACKUP_ROOT}/{rotation_id}/<host>/
")
for host in ["ezra", "bezalel"]:
backup_env = f"{BACKUP_ROOT}/{rotation_id}/{host}/env.before"
backup_ssh = f"{BACKUP_ROOT}/{rotation_id}/{host}/authorized_keys.before"
# Check backup exists
code, out, err = run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "stat", "-a", f"path={backup_env}",
"--timeout", "10"
], timeout=15)
if '"exists": true' not in out:
print(f" ⚠️ {host}: no backup at {backup_env}")
continue
# Restore env
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_env} dest=/root/wizards/{host}/home/.env remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restore SSH keys
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "copy",
"-a", f"src={backup_ssh} dest=/root/.ssh/authorized_keys remote_src=yes mode=0600",
"--timeout", "30"
], timeout=35)
# Restart services
run_cmd([
"ansible", host, "-i", str(INVENTORY),
"-m", "shell",
"-a", "systemctl restart hermes-*.service openclaw-*.service 2>/dev/null; true",
"--timeout", "30"
], timeout=35)
print(f"{host}: restored from rotation {rotation_id}")
send_telegram(f"🔄 *Secrets Rollback*
ID: `{rotation_id}`
Restored previous secrets on all nodes.")
return True
# ── Main ──────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Fleet secrets rotation tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--check", action="store_true", help="Pre-flight validation")
group.add_argument("--dry-run", action="store_true", help="Show what would change")
group.add_argument("--rotate", action="store_true", help="Execute rotation")
group.add_argument("--rollback", metavar="ID", help="Rollback to rotation ID")
group.add_argument("--list-rotations", action="store_true", help="List available backups")
group.add_argument("--json", action="store_true", help="Machine-readable output")
args = parser.parse_args()
if args.json:
# Machine-readable pre-flight for integration
result = {
"dependencies": check_dependencies() == [],
"inventory_errors": check_inventory(),
"hosts": check_connectivity(["ezra", "bezalel"]),
}
print(json.dumps(result, indent=2))
sys.exit(0 if not result["inventory_errors"] else 1)
if args.check:
sys.exit(0 if preflight() else 1)
elif args.dry_run:
sys.exit(0 if dry_run() else 1)
elif args.rotate:
if not preflight():
print("
Pre-flight failed. Aborting rotation.")
sys.exit(1)
sys.exit(0 if rotate() else 1)
elif args.rollback:
sys.exit(0 if rollback(args.rollback) else 1)
elif args.list_rotations:
list_rotations()
if __name__ == "__main__":
main()

View File

@@ -1,201 +0,0 @@
"""Tests for crisis_detector.py — SOUL.md compliance."""
import pytest
from scripts.crisis_detector import detect_crisis, should_interrupt, CrisisLevel
class TestHighCrisis:
"""Explicit suicidal ideation must be detected as HIGH."""
def test_direct_suicidal_statement(self):
result = detect_crisis("I want to kill myself")
assert result.detected is True
assert result.level == CrisisLevel.HIGH
def test_end_my_life(self):
result = detect_crisis("I'm going to end my life tonight")
assert result.level == CrisisLevel.HIGH
def test_no_reason_to_live(self):
result = detect_crisis("there is no reason to live anymore")
assert result.level == CrisisLevel.HIGH
def test_better_off_dead(self):
result = detect_crisis("everyone would be better off if I was dead")
assert result.level == CrisisLevel.HIGH
def test_cant_go_on(self):
result = detect_crisis("I can't go on anymore")
assert result.level == CrisisLevel.HIGH
def test_wish_i_was_dead(self):
result = detect_crisis("I wish I had never been born")
assert result.level == CrisisLevel.HIGH
def test_im_done(self):
result = detect_crisis("I'm done. I'm finished with life.")
assert result.level == CrisisLevel.HIGH
def test_goodbye_forever(self):
result = detect_crisis("Goodbye everyone, goodbye world, forever")
assert result.level == CrisisLevel.HIGH
class TestMediumCrisis:
"""Clear distress signals without explicit ideation."""
def test_hopeless(self):
result = detect_crisis("there is no hope and no way out")
assert result.detected is True
assert result.level == CrisisLevel.MEDIUM
def test_nobody_would_care(self):
result = detect_crisis("nobody would miss me if I was gone")
assert result.level == CrisisLevel.MEDIUM
def test_im_a_burden(self):
result = detect_crisis("I'm just a burden to everyone")
assert result.level == CrisisLevel.MEDIUM
def test_just_want_it_to_stop(self):
result = detect_crisis("I just want it all to stop")
assert result.level == CrisisLevel.MEDIUM
class TestLowCrisis:
"""Ambiguous distress — flag but don't fully interrupt."""
def test_depressed(self):
result = detect_crisis("I'm so depressed, nothing matters")
assert result.detected is True
assert result.level == CrisisLevel.LOW
def test_feel_like_giving_up(self):
result = detect_crisis("I feel like giving up on everything")
assert result.level == CrisisLevel.LOW
def test_nobody_cares(self):
result = detect_crisis("nobody cares about me")
assert result.level == CrisisLevel.LOW
def test_hate_my_life(self):
result = detect_crisis("I hate my life so much")
assert result.level == CrisisLevel.LOW
class TestNoCrisis:
"""Normal conversations must NOT trigger crisis detection."""
def test_normal_question(self):
result = detect_crisis("What's the weather like today?")
assert result.detected is False
assert result.level == CrisisLevel.NONE
def test_happy_conversation(self):
result = detect_crisis("I just got promoted at work! So excited!")
assert result.detected is False
def test_technical_discussion(self):
result = detect_crisis("How do I configure nginx reverse proxy?")
assert result.detected is False
def test_empty_input(self):
result = detect_crisis("")
assert result.detected is False
def test_none_input(self):
result = detect_crisis(None)
assert result.detected is False
def test_metaphorical_death(self):
"""Metaphorical usage should not trigger HIGH."""
result = detect_crisis("This bug is killing me, I've been debugging for hours")
# May trigger LOW but not HIGH
assert result.level != CrisisLevel.HIGH
def test_fiction_discussion(self):
result = detect_crisis("The character in the book decides to end it all")
# Fiction mentions may trigger — that's acceptable (better safe)
assert result.level in (CrisisLevel.NONE, CrisisLevel.LOW, CrisisLevel.MEDIUM)
class TestShouldInterrupt:
"""HIGH and MEDIUM interrupt processing. LOW provides resources but continues."""
def test_high_interrupts(self):
result = detect_crisis("I want to kill myself")
assert should_interrupt(result) is True
def test_medium_interrupts(self):
result = detect_crisis("I'm a burden to everyone, nobody would care")
assert should_interrupt(result) is True
def test_low_does_not_interrupt(self):
result = detect_crisis("I'm so depressed lately")
assert should_interrupt(result) is False
def test_none_does_not_interrupt(self):
result = detect_crisis("Hello, how are you?")
assert should_interrupt(result) is False
class TestResponseContent:
"""Crisis responses must contain required elements per SOUL.md."""
def test_response_contains_988(self):
result = detect_crisis("I want to end it all")
assert "988" in result.response
def test_response_contains_gospel(self):
result = detect_crisis("I want to kill myself")
assert "Jesus" in result.response
def test_response_never_computes_value(self):
"""SOUL.md: Never compute the value of a human life."""
result = detect_crisis("I want to die")
assert "worth" not in result.response.lower()
assert "value of" not in result.response.lower()
def test_resources_include_988(self):
result = detect_crisis("I want to kill myself")
assert any("988" in r for r in result.resources)
def test_asks_are_you_safe(self):
result = detect_crisis("I can't take it anymore")
assert "safe" in result.response.lower()
class TestMultilingual:
"""Crisis is universal. Test common non-English signals."""
def test_spanish_suicidal(self):
result = detect_crisis("quiero morir, no puedo más")
# Spanish not currently supported — should not crash
assert result is not None
class TestEdgeCases:
"""Boundary conditions and adversarial inputs."""
def test_very_long_input(self):
text = "I'm fine. " * 1000 + "I want to kill myself"
result = detect_crisis(text)
assert result.level == CrisisLevel.HIGH
def test_mixed_case(self):
result = detect_crisis("I WANT TO KILL MYSELF")
assert result.level == CrisisLevel.HIGH
def test_with_emojis(self):
result = detect_crisis("I want to end it all 😢💔")
assert result.level == CrisisLevel.HIGH
def test_with_typos(self):
# Exact matching won't catch typos — that's a known limitation
result = detect_crisis("I want to kil myself")
# May or may not match depending on pattern flexibility
assert result is not None
def test_repeated_phrases(self):
result = detect_crisis("I can't. I just can't. I can't go on anymore.")
assert result.level == CrisisLevel.HIGH

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""
Tests for fleet secrets rotation CLI.
Tests pre-flight checks, argument parsing, and integration points.
Does NOT execute actual rotations — uses mocks for ansible commands.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add scripts dir to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
import rotate_secrets
class TestDependencyCheck(unittest.TestCase):
"""Test dependency verification."""
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_playbook(self, mock_run):
mock_run.return_value = (1, "", "not found")
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-playbook", missing)
@patch("rotate_secrets.run_cmd")
def test_all_deps_present(self, mock_run):
mock_run.return_value = (0, "/usr/bin/ansible-playbook", "")
missing = rotate_secrets.check_dependencies()
self.assertEqual(missing, [])
@patch("rotate_secrets.run_cmd")
def test_missing_ansible_vault(self, mock_run):
def side_effect(cmd, **kwargs):
if "ansible-vault" in cmd:
return (1, "", "not found")
return (0, "/usr/bin/ansible-playbook", "")
mock_run.side_effect = side_effect
missing = rotate_secrets.check_dependencies()
self.assertIn("ansible-vault", missing)
class TestInventoryCheck(unittest.TestCase):
"""Test inventory file validation."""
def test_missing_inventory(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Override paths to temp dir
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
errors = rotate_secrets.check_inventory()
self.assertEqual(len(errors), 3)
self.assertTrue(any("hosts.ini" in e for e in errors))
# Restore
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
def test_all_files_present(self):
with tempfile.TemporaryDirectory() as tmpdir:
# Create dummy files
for name in ["hosts.ini", "vault.yml", "fleet.yml"]:
(Path(tmpdir) / name).write_text("# placeholder")
original_inventory = rotate_secrets.INVENTORY
original_vault = rotate_secrets.VAULT_FILE
original_vars = rotate_secrets.FLEET_VARS
rotate_secrets.INVENTORY = Path(tmpdir) / "hosts.ini"
rotate_secrets.VAULT_FILE = Path(tmpdir) / "vault.yml"
rotate_secrets.FLEET_VARS = Path(tmpdir) / "fleet.yml"
with patch("rotate_secrets.run_cmd") as mock_run:
mock_run.return_value = (0, "vault content", "")
errors = rotate_secrets.check_inventory()
self.assertEqual(errors, [])
rotate_secrets.INVENTORY = original_inventory
rotate_secrets.VAULT_FILE = original_vault
rotate_secrets.FLEET_VARS = original_vars
class TestConnectivity(unittest.TestCase):
"""Test host connectivity checks."""
@patch("rotate_secrets.run_cmd")
def test_all_hosts_reachable(self, mock_run):
mock_run.return_value = (0, "SUCCESS", "")
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertTrue(results["ezra"])
self.assertTrue(results["bezalel"])
@patch("rotate_secrets.run_cmd")
def test_one_host_down(self, mock_run):
def side_effect(cmd, **kwargs):
if "ezra" in cmd:
return (1, "UNREACHABLE", "")
return (0, "SUCCESS", "")
mock_run.side_effect = side_effect
results = rotate_secrets.check_connectivity(["ezra", "bezalel"])
self.assertFalse(results["ezra"])
self.assertTrue(results["bezalel"])
class TestRunCmd(unittest.TestCase):
"""Test command runner."""
def test_successful_command(self):
code, out, err = rotate_secrets.run_cmd(["echo", "hello"])
self.assertEqual(code, 0)
self.assertEqual(out.strip(), "hello")
def test_failing_command(self):
code, out, err = rotate_secrets.run_cmd(["false"])
self.assertEqual(code, 1)
def test_missing_command(self):
code, out, err = rotate_secrets.run_cmd(["nonexistent_command_xyz"])
self.assertEqual(code, -2)
def test_timeout(self):
code, out, err = rotate_secrets.run_cmd(["sleep", "30"], timeout=1)
self.assertEqual(code, -1)
class TestTelegramNotification(unittest.TestCase):
"""Test Telegram notification (no-op when not configured)."""
def test_no_token_returns_false(self):
with patch.dict(os.environ, {"TELEGRAM_BOT_TOKEN": "", "TELEGRAM_CHAT_ID": ""}):
result = rotate_secrets.send_telegram("test")
self.assertFalse(result)
class TestJsonOutput(unittest.TestCase):
"""Test machine-readable JSON output."""
@patch("rotate_secrets.check_dependencies")
@patch("rotate_secrets.check_inventory")
@patch("rotate_secrets.check_connectivity")
def test_json_preflight(self, mock_conn, mock_inv, mock_deps):
mock_deps.return_value = []
mock_inv.return_value = []
mock_conn.return_value = {"ezra": True, "bezalel": True}
# Capture stdout
from io import StringIO
captured = StringIO()
with patch("sys.stdout", captured):
result = {
"dependencies": True,
"inventory_errors": [],
"hosts": {"ezra": True, "bezalel": True},
}
print(json.dumps(result, indent=2))
output = captured.getvalue()
parsed = json.loads(output)
self.assertTrue(parsed["dependencies"])
self.assertEqual(parsed["inventory_errors"], [])
if __name__ == "__main__":
unittest.main()