Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
cb46d56147 feat: add autonomous incident creation scaffold (#553)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 22s
2026-04-15 00:40:14 -04:00
8 changed files with 522 additions and 455 deletions

View File

@@ -14,7 +14,7 @@ jobs:
- name: Parse check
run: |
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
find . -name '*.json' -print0 | xargs -0 -r -n1 python3 -m json.tool > /dev/null
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
find . -name '*.py' | xargs -r python3 -m py_compile
find . -name '*.sh' | xargs -r bash -n
echo "PASS: All files parse"
@@ -22,6 +22,3 @@ jobs:
run: |
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v '.gitea' | grep -v 'detect_secrets' | grep -v 'test_trajectory_sanitize'; then exit 1; fi
echo "PASS: No secrets"
- name: Backup pipeline regression test
run: |
python3 -m unittest discover -s tests -p 'test_backup_pipeline.py' -v

View File

@@ -1,98 +0,0 @@
# Encrypted Hermes Backup Pipeline
Issue: `timmy-home#693`
This pipeline creates a nightly encrypted archive of `~/.hermes`, stores a local encrypted copy, uploads it to remote storage, and supports restore verification.
## What gets backed up
By default the pipeline archives:
- `~/.hermes/config.yaml`
- `~/.hermes/state.db`
- `~/.hermes/sessions/`
- `~/.hermes/cron/`
- any other files under `~/.hermes`
Override the source with `BACKUP_SOURCE_DIR=/path/to/.hermes`.
## Backup command
```bash
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups \
bash scripts/backup_pipeline.sh
```
The script writes:
- local encrypted copy: `~/.timmy-backups/hermes/<timestamp>/hermes-backup-<timestamp>.tar.gz.enc`
- local manifest: `~/.timmy-backups/hermes/<timestamp>/hermes-backup-<timestamp>.json`
- log file: `~/.timmy-backups/hermes/logs/backup_pipeline.log`
## Nightly schedule
Run every night at 03:00:
```cron
0 3 * * * cd /Users/apayne/.timmy/timmy-home && BACKUP_PASSPHRASE_FILE=/Users/apayne/.config/timmy/backup.passphrase BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups bash scripts/backup_pipeline.sh >> /Users/apayne/.timmy-backups/hermes/logs/cron.log 2>&1
```
## Remote targets
At least one remote target must be configured.
### Local NAS
Use a mounted path:
```bash
BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups
```
The pipeline copies the encrypted archive and manifest into `<BACKUP_NAS_TARGET>/<timestamp>/`.
### S3-compatible storage
```bash
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
BACKUP_S3_URI=s3://timmy-backups/hermes \
AWS_ENDPOINT_URL=https://minio.example.com \
bash scripts/backup_pipeline.sh
```
Notes:
- `aws` CLI must be installed if `BACKUP_S3_URI` is set.
- `AWS_ENDPOINT_URL` is optional and is used for MinIO, R2, and other S3-compatible endpoints.
## Restore playbook
Restore an encrypted archive into a clean target root:
```bash
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
bash scripts/restore_backup.sh \
/Volumes/timmy-nas/hermes-backups/20260415-030000/hermes-backup-20260415-030000.tar.gz.enc \
/tmp/hermes-restore
```
Result:
- restored tree lands at `/tmp/hermes-restore/.hermes`
- if a sibling manifest exists, the restore script verifies the archive SHA256 before decrypting
## End-to-end verification
Run the regression suite:
```bash
python3 -m unittest discover -s tests -p 'test_backup_pipeline.py' -v
```
This proves:
1. the backup output is encrypted
2. plaintext archives do not leak into the backup destinations
3. the restore script recreates the original `.hermes` tree end-to-end
4. the pipeline refuses to run without a remote target

View File

@@ -12,8 +12,6 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
| Backup Hermes state | timmy-home | `BACKUP_PASSPHRASE_FILE=... BACKUP_NAS_TARGET=... bash scripts/backup_pipeline.sh` |
| Restore Hermes state | timmy-home | `BACKUP_PASSPHRASE_FILE=... bash scripts/restore_backup.sh <archive> <restore-root>` |
## the-nexus (Frontend + Brain)

View File

@@ -0,0 +1,329 @@
#!/usr/bin/env python3
"""Create or refresh fleet incidents on Gitea from local infrastructure signals.
Refs: timmy-home #553
"""
from __future__ import annotations
import argparse
import json
import os
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from urllib import request
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
DEFAULT_OWNER = "Timmy_Foundation"
DEFAULT_REPO = "timmy-home"
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
DEFAULT_FAILOVER_STATUS = Path.home() / ".timmy" / "failover_status.json"
DEFAULT_RESTART_STATE_DIR = Path("/var/lib/timmy/restarts")
DEFAULT_HEARTBEAT_FILE = Path("/var/lib/timmy/heartbeats/fleet_health.last")
@dataclass(frozen=True)
class Incident:
fingerprint: str
title: str
body: str
def latest_evidence(self) -> str:
lines = [line for line in self.body.splitlines() if line.strip()]
if lines and lines[0].startswith("Fingerprint: "):
lines = lines[1:]
return "\n".join(lines).strip()
class GiteaClient:
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
self.token = token
self.owner = owner
self.repo = repo
self.base_url = base_url.rstrip("/")
def _request(self, path: str, *, method: str = "GET", data: dict | None = None):
payload = None if data is None else json.dumps(data).encode()
headers = {"Authorization": f"token {self.token}"}
if payload is not None:
headers["Content-Type"] = "application/json"
req = request.Request(f"{self.base_url}{path}", data=payload, headers=headers, method=method)
with request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def list_open_issues(self):
issues = self._request(f"/repos/{self.owner}/{self.repo}/issues?state=open&limit=100")
return [issue for issue in issues if not issue.get("pull_request")]
def create_issue(self, title: str, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues",
method="POST",
data={"title": title, "body": body},
)
def comment_issue(self, issue_number: int, body: str):
return self._request(
f"/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments",
method="POST",
data={"body": body},
)
def load_json(path: Path):
if not path.exists():
return None
return json.loads(path.read_text())
def load_restart_counts(state_dir: Path) -> dict[str, int]:
if not state_dir.exists():
return {}
counts: dict[str, int] = {}
for path in sorted(state_dir.glob("*.count")):
try:
counts[path.stem] = int(path.read_text().strip())
except ValueError:
continue
return counts
def heartbeat_is_stale(path: Path, *, now: datetime | None = None, max_age_seconds: int = 900) -> bool:
if now is None:
now = datetime.now(timezone.utc)
if not path.exists():
return True
age = now.timestamp() - path.stat().st_mtime
return age > max_age_seconds
def _iso(dt: datetime) -> str:
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
def _build_body(fingerprint: str, *details: str) -> str:
detail_lines = [detail for detail in details if detail]
return "\n".join([f"Fingerprint: {fingerprint}", *detail_lines])
def build_incidents(
*,
failover_status: dict | None,
restart_counts: dict[str, int],
heartbeat_stale: bool,
now: datetime | None = None,
restart_escalation_threshold: int = 3,
) -> list[Incident]:
if now is None:
now = datetime.now(timezone.utc)
incidents: list[Incident] = []
failover_timestamp = None
fleet = {}
if failover_status:
failover_timestamp = failover_status.get("timestamp")
fleet = failover_status.get("fleet") or {}
for host, status in sorted(fleet.items()):
if str(status).upper() == "ONLINE":
continue
fingerprint = f"host-offline:{host}"
failover_detail = f"Failover status timestamp: {failover_timestamp}" if failover_timestamp is not None else "Failover status timestamp: unknown"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Fleet host offline: {host}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
failover_detail,
f"Host `{host}` reported `{status}` by failover monitor.",
),
)
)
for process_name, count in sorted(restart_counts.items()):
if count <= restart_escalation_threshold:
continue
fingerprint = f"restart-escalation:{process_name}"
incidents.append(
Incident(
fingerprint=fingerprint,
title=f"[AUTO] Restart escalation: {process_name}",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
f"Process `{process_name}` has crossed the restart escalation threshold with count={count}.",
),
)
)
if heartbeat_stale:
fingerprint = "probe-stale:fleet-health"
incidents.append(
Incident(
fingerprint=fingerprint,
title="[AUTO] Fleet health probe stale",
body=_build_body(
fingerprint,
f"Detected at: {_iso(now)}",
"Heartbeat missing or older than the configured fleet health maximum age.",
),
)
)
return incidents
def find_matching_issue(incident: Incident, open_issues: Iterable[dict]) -> dict | None:
for issue in open_issues:
haystack = "\n".join([issue.get("title") or "", issue.get("body") or ""])
if incident.fingerprint in haystack or incident.title == issue.get("title"):
return issue
return None
def build_repeat_comment(incident: Incident) -> str:
return (
"Autonomous infrastructure detector saw the same incident again.\n\n"
f"Fingerprint: {incident.fingerprint}\n\n"
f"Latest evidence:\n{incident.latest_evidence()}"
)
def sync_incidents(
incidents: Iterable[Incident],
client: GiteaClient,
*,
apply: bool = False,
comment_existing: bool = True,
):
open_issues = list(client.list_open_issues())
results = []
for incident in incidents:
existing = find_matching_issue(incident, open_issues)
if existing:
action = "existing"
if apply and comment_existing:
client.comment_issue(existing["number"], build_repeat_comment(incident))
action = "commented"
results.append(
{
"action": action,
"fingerprint": incident.fingerprint,
"issue_number": existing["number"],
"title": existing.get("title"),
}
)
continue
if apply:
created = client.create_issue(incident.title, incident.body)
open_issues.append(created)
results.append(
{
"action": "created",
"fingerprint": incident.fingerprint,
"issue_number": created["number"],
"title": created.get("title"),
}
)
else:
results.append(
{
"action": "would_create",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
)
return results
def parse_args():
parser = argparse.ArgumentParser(description="Create or refresh fleet incidents on Gitea from local infrastructure signals.")
parser.add_argument("--owner", default=DEFAULT_OWNER)
parser.add_argument("--repo", default=DEFAULT_REPO)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE)
parser.add_argument("--failover-status", type=Path, default=DEFAULT_FAILOVER_STATUS)
parser.add_argument("--restart-state-dir", type=Path, default=DEFAULT_RESTART_STATE_DIR)
parser.add_argument("--heartbeat-file", type=Path, default=DEFAULT_HEARTBEAT_FILE)
parser.add_argument("--heartbeat-max-age-seconds", type=int, default=900)
parser.add_argument("--restart-escalation-threshold", type=int, default=3)
parser.add_argument("--apply", action="store_true", help="Create/comment issues instead of reporting what would happen.")
parser.add_argument("--no-comment-existing", action="store_true", help="Do not comment on existing matching issues.")
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output.")
return parser.parse_args()
def main():
args = parse_args()
now = datetime.now(timezone.utc)
failover_status = load_json(args.failover_status)
restart_counts = load_restart_counts(args.restart_state_dir)
heartbeat_stale = heartbeat_is_stale(
args.heartbeat_file,
now=now,
max_age_seconds=args.heartbeat_max_age_seconds,
)
incidents = build_incidents(
failover_status=failover_status,
restart_counts=restart_counts,
heartbeat_stale=heartbeat_stale,
now=now,
restart_escalation_threshold=args.restart_escalation_threshold,
)
payload = {
"generated_at": _iso(now),
"incidents": [incident.__dict__ for incident in incidents],
"results": [],
}
token = None
if args.token_file.exists():
token = args.token_file.read_text().strip()
if args.apply and not token:
raise SystemExit(f"Token file not found: {args.token_file}")
if token:
client = GiteaClient(token=token, owner=args.owner, repo=args.repo, base_url=args.base_url)
payload["results"] = sync_incidents(
incidents,
client,
apply=args.apply,
comment_existing=not args.no_comment_existing,
)
else:
payload["results"] = [
{
"action": "local_only",
"fingerprint": incident.fingerprint,
"issue_number": None,
"title": incident.title,
}
for incident in incidents
]
if args.json:
print(json.dumps(payload, indent=2))
else:
print(f"Generated at: {payload['generated_at']}")
if not incidents:
print("No autonomous infrastructure incidents detected.")
for incident in incidents:
print(f"- {incident.title} [{incident.fingerprint}]")
for result in payload["results"]:
print(f" -> {result['action']}: {result['title']}")
if __name__ == "__main__":
main()

View File

@@ -1,170 +1,80 @@
#!/usr/bin/env bash
# backup_pipeline.sh — Nightly encrypted Hermes backup pipeline
# Refs: timmy-home #693, timmy-home #561
# backup_pipeline.sh — Daily fleet backup pipeline (FLEET-008)
# Refs: timmy-home #561
set -euo pipefail
DATESTAMP="${BACKUP_TIMESTAMP:-$(date +%Y%m%d-%H%M%S)}"
BACKUP_SOURCE_DIR="${BACKUP_SOURCE_DIR:-${HOME}/.hermes}"
BACKUP_ROOT="${BACKUP_ROOT:-${HOME}/.timmy-backups/hermes}"
BACKUP_LOG_DIR="${BACKUP_LOG_DIR:-${BACKUP_ROOT}/logs}"
BACKUP_RETENTION_DAYS="${BACKUP_RETENTION_DAYS:-14}"
BACKUP_S3_URI="${BACKUP_S3_URI:-}"
BACKUP_NAS_TARGET="${BACKUP_NAS_TARGET:-}"
AWS_ENDPOINT_URL="${AWS_ENDPOINT_URL:-}"
BACKUP_NAME="hermes-backup-${DATESTAMP}"
LOCAL_BACKUP_DIR="${BACKUP_ROOT}/${DATESTAMP}"
STAGE_DIR="$(mktemp -d "${TMPDIR:-/tmp}/timmy-backup.XXXXXX")"
PLAINTEXT_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz"
ENCRYPTED_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz.enc"
MANIFEST_PATH="${STAGE_DIR}/${BACKUP_NAME}.json"
ALERT_LOG="${BACKUP_LOG_DIR}/backup_pipeline.log"
PASSFILE_CLEANUP=""
BACKUP_ROOT="/backups/timmy"
DATESTAMP=$(date +%Y%m%d-%H%M%S)
BACKUP_DIR="${BACKUP_ROOT}/${DATESTAMP}"
LOG_DIR="/var/log/timmy"
ALERT_LOG="${LOG_DIR}/backup_pipeline.log"
mkdir -p "$BACKUP_DIR" "$LOG_DIR"
mkdir -p "$BACKUP_LOG_DIR"
TELEGRAM_BOT_TOKEN="${TELEGRAM_BOT_TOKEN:-}"
TELEGRAM_CHAT_ID="${TELEGRAM_CHAT_ID:-}"
OFFSITE_TARGET="${OFFSITE_TARGET:-}"
log() {
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
}
log() { echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"; }
fail() {
log "ERROR: $1"
exit 1
}
cleanup() {
rm -f "$PLAINTEXT_ARCHIVE"
rm -rf "$STAGE_DIR"
if [[ -n "$PASSFILE_CLEANUP" && -f "$PASSFILE_CLEANUP" ]]; then
rm -f "$PASSFILE_CLEANUP"
send_telegram() {
local msg="$1"
if [[ -n "$TELEGRAM_BOT_TOKEN" && -n "$TELEGRAM_CHAT_ID" ]]; then
curl -s -X POST "https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage" \
-d "chat_id=${TELEGRAM_CHAT_ID}" -d "text=${msg}" >/dev/null 2>&1 || true
fi
}
trap cleanup EXIT
resolve_passphrase_file() {
if [[ -n "${BACKUP_PASSPHRASE_FILE:-}" ]]; then
[[ -f "$BACKUP_PASSPHRASE_FILE" ]] || fail "BACKUP_PASSPHRASE_FILE does not exist: $BACKUP_PASSPHRASE_FILE"
echo "$BACKUP_PASSPHRASE_FILE"
return
status=0
# --- Gitea repositories ---
if [[ -d /root/gitea ]]; then
tar czf "${BACKUP_DIR}/gitea-repos.tar.gz" -C /root gitea 2>/dev/null || true
log "Backed up Gitea repos"
fi
# --- Agent configs and state ---
for wiz in bezalel allegro ezra timmy; do
if [[ -d "/root/wizards/${wiz}" ]]; then
tar czf "${BACKUP_DIR}/${wiz}-home.tar.gz" -C /root/wizards "${wiz}" 2>/dev/null || true
log "Backed up ${wiz} home"
fi
done
if [[ -n "${BACKUP_PASSPHRASE:-}" ]]; then
PASSFILE_CLEANUP="${STAGE_DIR}/backup.passphrase"
printf '%s' "$BACKUP_PASSPHRASE" > "$PASSFILE_CLEANUP"
chmod 600 "$PASSFILE_CLEANUP"
echo "$PASSFILE_CLEANUP"
return
fi
# --- System configs ---
cp /etc/crontab "${BACKUP_DIR}/crontab" 2>/dev/null || true
cp -r /etc/systemd/system "${BACKUP_DIR}/systemd" 2>/dev/null || true
log "Backed up system configs"
fail "Set BACKUP_PASSPHRASE_FILE or BACKUP_PASSPHRASE before running the backup pipeline."
}
# --- Evennia worlds (if present) ---
if [[ -d /root/evennia ]]; then
tar czf "${BACKUP_DIR}/evennia-worlds.tar.gz" -C /root evennia 2>/dev/null || true
log "Backed up Evennia worlds"
fi
sha256_file() {
local path="$1"
if command -v shasum >/dev/null 2>&1; then
shasum -a 256 "$path" | awk '{print $1}'
elif command -v sha256sum >/dev/null 2>&1; then
sha256sum "$path" | awk '{print $1}'
# --- Manifest ---
find "$BACKUP_DIR" -type f > "${BACKUP_DIR}/manifest.txt"
log "Backup manifest written"
# --- Offsite sync ---
if [[ -n "$OFFSITE_TARGET" ]]; then
if rsync -az --delete "${BACKUP_DIR}/" "${OFFSITE_TARGET}/${DATESTAMP}/" 2>/dev/null; then
log "Offsite sync completed"
else
python3 - <<'PY' "$path"
import hashlib
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
h = hashlib.sha256()
with path.open('rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
h.update(chunk)
print(h.hexdigest())
PY
log "WARNING: Offsite sync failed"
status=1
fi
}
write_manifest() {
python3 - <<'PY' "$1" "$2" "$3" "$4" "$5" "$6" "$7" "$8"
import json
import sys
manifest_path, source_dir, archive_name, archive_sha256, local_dir, s3_uri, nas_target, created_at = sys.argv[1:]
manifest = {
"created_at": created_at,
"source_dir": source_dir,
"archive_name": archive_name,
"archive_sha256": archive_sha256,
"encryption": {
"type": "openssl",
"cipher": "aes-256-cbc",
"pbkdf2": True,
"iterations": 200000,
},
"destinations": {
"local_dir": local_dir,
"s3_uri": s3_uri or None,
"nas_target": nas_target or None,
},
}
with open(manifest_path, 'w', encoding='utf-8') as handle:
json.dump(manifest, handle, indent=2)
handle.write('\n')
PY
}
upload_to_nas() {
local archive_path="$1"
local manifest_path="$2"
local target_root="$3"
local target_dir="${target_root%/}/${DATESTAMP}"
mkdir -p "$target_dir"
cp "$archive_path" "$manifest_path" "$target_dir/"
log "Uploaded backup to NAS target: $target_dir"
}
upload_to_s3() {
local archive_path="$1"
local manifest_path="$2"
command -v aws >/dev/null 2>&1 || fail "BACKUP_S3_URI is set but aws CLI is not installed."
local args=()
if [[ -n "$AWS_ENDPOINT_URL" ]]; then
args+=(--endpoint-url "$AWS_ENDPOINT_URL")
fi
aws "${args[@]}" s3 cp "$archive_path" "${BACKUP_S3_URI%/}/$(basename "$archive_path")"
aws "${args[@]}" s3 cp "$manifest_path" "${BACKUP_S3_URI%/}/$(basename "$manifest_path")"
log "Uploaded backup to S3 target: $BACKUP_S3_URI"
}
[[ -d "$BACKUP_SOURCE_DIR" ]] || fail "BACKUP_SOURCE_DIR does not exist: $BACKUP_SOURCE_DIR"
[[ -n "$BACKUP_NAS_TARGET" || -n "$BACKUP_S3_URI" ]] || fail "Set BACKUP_NAS_TARGET or BACKUP_S3_URI for remote backup storage."
PASSFILE="$(resolve_passphrase_file)"
mkdir -p "$LOCAL_BACKUP_DIR"
log "Creating archive from $BACKUP_SOURCE_DIR"
tar -czf "$PLAINTEXT_ARCHIVE" -C "$(dirname "$BACKUP_SOURCE_DIR")" "$(basename "$BACKUP_SOURCE_DIR")"
log "Encrypting archive"
openssl enc -aes-256-cbc -salt -pbkdf2 -iter 200000 \
-pass "file:${PASSFILE}" \
-in "$PLAINTEXT_ARCHIVE" \
-out "$ENCRYPTED_ARCHIVE"
ARCHIVE_SHA256="$(sha256_file "$ENCRYPTED_ARCHIVE")"
CREATED_AT="$(date -u '+%Y-%m-%dT%H:%M:%SZ')"
write_manifest "$MANIFEST_PATH" "$BACKUP_SOURCE_DIR" "$(basename "$ENCRYPTED_ARCHIVE")" "$ARCHIVE_SHA256" "$LOCAL_BACKUP_DIR" "$BACKUP_S3_URI" "$BACKUP_NAS_TARGET" "$CREATED_AT"
cp "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$LOCAL_BACKUP_DIR/"
rm -f "$PLAINTEXT_ARCHIVE"
log "Encrypted backup stored locally: ${LOCAL_BACKUP_DIR}/$(basename "$ENCRYPTED_ARCHIVE")"
if [[ -n "$BACKUP_NAS_TARGET" ]]; then
upload_to_nas "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$BACKUP_NAS_TARGET"
fi
if [[ -n "$BACKUP_S3_URI" ]]; then
upload_to_s3 "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH"
# --- Retention: keep last 7 days ---
find "$BACKUP_ROOT" -mindepth 1 -maxdepth 1 -type d -mtime +7 -exec rm -rf {} + 2>/dev/null || true
log "Retention applied (7 days)"
if [[ "$status" -eq 0 ]]; then
log "Backup pipeline completed: ${BACKUP_DIR}"
send_telegram "✅ Daily backup completed: ${DATESTAMP}"
else
log "Backup pipeline completed with WARNINGS: ${BACKUP_DIR}"
send_telegram "⚠️ Daily backup completed with warnings: ${DATESTAMP}"
fi
find "$BACKUP_ROOT" -mindepth 1 -maxdepth 1 -type d -name '20*' -mtime "+${BACKUP_RETENTION_DAYS}" -exec rm -rf {} + 2>/dev/null || true
log "Retention applied (${BACKUP_RETENTION_DAYS} days)"
log "Backup pipeline completed successfully"
exit "$status"

View File

@@ -1,97 +0,0 @@
#!/usr/bin/env bash
# restore_backup.sh — Restore an encrypted Hermes backup archive
# Usage: restore_backup.sh /path/to/hermes-backup-YYYYmmdd-HHMMSS.tar.gz.enc /restore/root
set -euo pipefail
ARCHIVE_PATH="${1:-}"
RESTORE_ROOT="${2:-}"
STAGE_DIR="$(mktemp -d "${TMPDIR:-/tmp}/timmy-restore.XXXXXX")"
PLAINTEXT_ARCHIVE="${STAGE_DIR}/restore.tar.gz"
PASSFILE_CLEANUP=""
cleanup() {
rm -f "$PLAINTEXT_ARCHIVE"
rm -rf "$STAGE_DIR"
if [[ -n "$PASSFILE_CLEANUP" && -f "$PASSFILE_CLEANUP" ]]; then
rm -f "$PASSFILE_CLEANUP"
fi
}
trap cleanup EXIT
fail() {
echo "ERROR: $1" >&2
exit 1
}
resolve_passphrase_file() {
if [[ -n "${BACKUP_PASSPHRASE_FILE:-}" ]]; then
[[ -f "$BACKUP_PASSPHRASE_FILE" ]] || fail "BACKUP_PASSPHRASE_FILE does not exist: $BACKUP_PASSPHRASE_FILE"
echo "$BACKUP_PASSPHRASE_FILE"
return
fi
if [[ -n "${BACKUP_PASSPHRASE:-}" ]]; then
PASSFILE_CLEANUP="${STAGE_DIR}/backup.passphrase"
printf '%s' "$BACKUP_PASSPHRASE" > "$PASSFILE_CLEANUP"
chmod 600 "$PASSFILE_CLEANUP"
echo "$PASSFILE_CLEANUP"
return
fi
fail "Set BACKUP_PASSPHRASE_FILE or BACKUP_PASSPHRASE before restoring a backup."
}
sha256_file() {
local path="$1"
if command -v shasum >/dev/null 2>&1; then
shasum -a 256 "$path" | awk '{print $1}'
elif command -v sha256sum >/dev/null 2>&1; then
sha256sum "$path" | awk '{print $1}'
else
python3 - <<'PY' "$path"
import hashlib
import pathlib
import sys
path = pathlib.Path(sys.argv[1])
h = hashlib.sha256()
with path.open('rb') as f:
for chunk in iter(lambda: f.read(1024 * 1024), b''):
h.update(chunk)
print(h.hexdigest())
PY
fi
}
[[ -n "$ARCHIVE_PATH" ]] || fail "Usage: restore_backup.sh /path/to/archive.tar.gz.enc /restore/root"
[[ -n "$RESTORE_ROOT" ]] || fail "Usage: restore_backup.sh /path/to/archive.tar.gz.enc /restore/root"
[[ -f "$ARCHIVE_PATH" ]] || fail "Archive not found: $ARCHIVE_PATH"
if [[ "$ARCHIVE_PATH" == *.tar.gz.enc ]]; then
MANIFEST_PATH="${ARCHIVE_PATH%.tar.gz.enc}.json"
else
MANIFEST_PATH=""
fi
if [[ -n "$MANIFEST_PATH" && -f "$MANIFEST_PATH" ]]; then
EXPECTED_SHA="$(python3 - <<'PY' "$MANIFEST_PATH"
import json
import sys
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
manifest = json.load(handle)
print(manifest['archive_sha256'])
PY
)"
ACTUAL_SHA="$(sha256_file "$ARCHIVE_PATH")"
[[ "$EXPECTED_SHA" == "$ACTUAL_SHA" ]] || fail "Archive SHA256 mismatch: expected $EXPECTED_SHA got $ACTUAL_SHA"
fi
PASSFILE="$(resolve_passphrase_file)"
mkdir -p "$RESTORE_ROOT"
openssl enc -d -aes-256-cbc -salt -pbkdf2 -iter 200000 \
-pass "file:${PASSFILE}" \
-in "$ARCHIVE_PATH" \
-out "$PLAINTEXT_ARCHIVE"
tar -xzf "$PLAINTEXT_ARCHIVE" -C "$RESTORE_ROOT"
echo "Restored backup into $RESTORE_ROOT"

View File

@@ -0,0 +1,131 @@
from datetime import datetime, timezone
from pathlib import Path
import pytest
from scripts.autonomous_issue_creator import (
Incident,
build_incidents,
heartbeat_is_stale,
load_restart_counts,
sync_incidents,
)
class FakeGiteaClient:
def __init__(self, open_issues=None):
self._open_issues = list(open_issues or [])
self.created = []
self.commented = []
def list_open_issues(self):
return list(self._open_issues)
def create_issue(self, title, body):
issue = {"number": 100 + len(self.created), "title": title, "body": body}
self.created.append(issue)
return issue
def comment_issue(self, issue_number, body):
self.commented.append({"issue_number": issue_number, "body": body})
def test_load_restart_counts_reads_only_count_files(tmp_path):
(tmp_path / "act_runner.count").write_text("4\n")
(tmp_path / "worker.count").write_text("2\n")
(tmp_path / "notes.txt").write_text("ignore me")
(tmp_path / "bad.count").write_text("not-an-int")
counts = load_restart_counts(tmp_path)
assert counts == {"act_runner": 4, "worker": 2}
def test_heartbeat_is_stale_handles_missing_and_old_files(tmp_path):
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
missing = heartbeat_is_stale(tmp_path / "missing.last", now=now, max_age_seconds=900)
assert missing is True
heartbeat = tmp_path / "fleet_health.last"
heartbeat.write_text("")
old = now.timestamp() - 1800
recent = now.timestamp() - 60
heartbeat.touch()
os = __import__("os")
os.utime(heartbeat, (old, old))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is True
os.utime(heartbeat, (recent, recent))
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is False
def test_build_incidents_captures_offline_hosts_restart_escalations_and_stale_probe():
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
failover_status = {
"timestamp": 1713148800.0,
"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"},
}
incidents = build_incidents(
failover_status=failover_status,
restart_counts={"act_runner": 4, "worker": 2},
heartbeat_stale=True,
now=now,
restart_escalation_threshold=3,
)
fingerprints = {incident.fingerprint for incident in incidents}
assert fingerprints == {
"host-offline:bezalel",
"restart-escalation:act_runner",
"probe-stale:fleet-health",
}
titles = {incident.title for incident in incidents}
assert "[AUTO] Fleet host offline: bezalel" in titles
assert "[AUTO] Restart escalation: act_runner" in titles
assert "[AUTO] Fleet health probe stale" in titles
def test_sync_incidents_reuses_open_issues_and_creates_missing_ones():
client = FakeGiteaClient(
open_issues=[
{
"number": 71,
"title": "[AUTO] Fleet host offline: bezalel",
"body": "Fingerprint: host-offline:bezalel\n",
}
]
)
incidents = [
Incident(
fingerprint="host-offline:bezalel",
title="[AUTO] Fleet host offline: bezalel",
body="Fingerprint: host-offline:bezalel\nHost unreachable",
),
Incident(
fingerprint="probe-stale:fleet-health",
title="[AUTO] Fleet health probe stale",
body="Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
),
]
results = sync_incidents(incidents, client, apply=True, comment_existing=True)
assert [result["action"] for result in results] == ["commented", "created"]
assert client.commented == [
{
"issue_number": 71,
"body": "Autonomous infrastructure detector saw the same incident again.\n\nFingerprint: host-offline:bezalel\n\nLatest evidence:\nHost unreachable",
}
]
assert client.created == [
{
"number": 100,
"title": "[AUTO] Fleet health probe stale",
"body": "Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
}
]

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env python3
import os
import subprocess
import tempfile
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
BACKUP_SCRIPT = ROOT / "scripts" / "backup_pipeline.sh"
RESTORE_SCRIPT = ROOT / "scripts" / "restore_backup.sh"
class TestBackupPipeline(unittest.TestCase):
def setUp(self) -> None:
self.tempdir = tempfile.TemporaryDirectory()
self.base = Path(self.tempdir.name)
self.home = self.base / "home"
self.source_dir = self.home / ".hermes"
self.source_dir.mkdir(parents=True)
(self.source_dir / "sessions").mkdir()
(self.source_dir / "cron").mkdir()
(self.source_dir / "config.yaml").write_text("model: local-first\n")
(self.source_dir / "sessions" / "session.jsonl").write_text('{"role":"assistant","content":"hello"}\n')
(self.source_dir / "cron" / "jobs.json").write_text('{"jobs": 1}\n')
(self.source_dir / "state.db").write_bytes(b"sqlite-state")
self.backup_root = self.base / "backup-root"
self.nas_target = self.base / "nas-target"
self.restore_root = self.base / "restore-root"
self.log_dir = self.base / "logs"
self.passphrase_file = self.base / "backup.passphrase"
self.passphrase_file.write_text("correct horse battery staple\n")
def tearDown(self) -> None:
self.tempdir.cleanup()
def _env(self, *, include_remote: bool = True) -> dict[str, str]:
env = os.environ.copy()
env.update(
{
"HOME": str(self.home),
"BACKUP_SOURCE_DIR": str(self.source_dir),
"BACKUP_ROOT": str(self.backup_root),
"BACKUP_LOG_DIR": str(self.log_dir),
"BACKUP_PASSPHRASE_FILE": str(self.passphrase_file),
}
)
if include_remote:
env["BACKUP_NAS_TARGET"] = str(self.nas_target)
return env
def test_backup_encrypts_and_restore_round_trips(self) -> None:
backup = subprocess.run(
["bash", str(BACKUP_SCRIPT)],
capture_output=True,
text=True,
env=self._env(),
cwd=ROOT,
)
self.assertEqual(backup.returncode, 0, msg=backup.stdout + backup.stderr)
encrypted_archives = sorted(self.nas_target.rglob("*.tar.gz.enc"))
self.assertEqual(len(encrypted_archives), 1, msg=f"expected one encrypted archive, found: {encrypted_archives}")
archive_path = encrypted_archives[0]
self.assertNotIn(b"model: local-first", archive_path.read_bytes())
manifests = sorted(self.nas_target.rglob("*.json"))
self.assertEqual(len(manifests), 1, msg=f"expected one manifest, found: {manifests}")
plaintext_archives = sorted(self.backup_root.rglob("*.tar.gz")) + sorted(self.nas_target.rglob("*.tar.gz"))
self.assertEqual(plaintext_archives, [], msg=f"plaintext archives leaked: {plaintext_archives}")
restore = subprocess.run(
["bash", str(RESTORE_SCRIPT), str(archive_path), str(self.restore_root)],
capture_output=True,
text=True,
env=self._env(),
cwd=ROOT,
)
self.assertEqual(restore.returncode, 0, msg=restore.stdout + restore.stderr)
restored_hermes = self.restore_root / ".hermes"
self.assertTrue(restored_hermes.exists())
self.assertEqual((restored_hermes / "config.yaml").read_text(), "model: local-first\n")
self.assertEqual((restored_hermes / "sessions" / "session.jsonl").read_text(), '{"role":"assistant","content":"hello"}\n')
self.assertEqual((restored_hermes / "cron" / "jobs.json").read_text(), '{"jobs": 1}\n')
self.assertEqual((restored_hermes / "state.db").read_bytes(), b"sqlite-state")
def test_backup_requires_remote_target(self) -> None:
backup = subprocess.run(
["bash", str(BACKUP_SCRIPT)],
capture_output=True,
text=True,
env=self._env(include_remote=False),
cwd=ROOT,
)
self.assertNotEqual(backup.returncode, 0)
self.assertIn("BACKUP_NAS_TARGET or BACKUP_S3_URI", backup.stdout + backup.stderr)
if __name__ == "__main__":
unittest.main(verbosity=2)