Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb46d56147 |
@@ -14,7 +14,7 @@ jobs:
|
||||
- name: Parse check
|
||||
run: |
|
||||
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
|
||||
find . -name '*.json' -print0 | xargs -0 -r -n1 python3 -m json.tool > /dev/null
|
||||
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
|
||||
find . -name '*.py' | xargs -r python3 -m py_compile
|
||||
find . -name '*.sh' | xargs -r bash -n
|
||||
echo "PASS: All files parse"
|
||||
@@ -22,6 +22,3 @@ jobs:
|
||||
run: |
|
||||
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v '.gitea' | grep -v 'detect_secrets' | grep -v 'test_trajectory_sanitize'; then exit 1; fi
|
||||
echo "PASS: No secrets"
|
||||
- name: Backup pipeline regression test
|
||||
run: |
|
||||
python3 -m unittest discover -s tests -p 'test_backup_pipeline.py' -v
|
||||
|
||||
@@ -1,98 +0,0 @@
|
||||
# Encrypted Hermes Backup Pipeline
|
||||
|
||||
Issue: `timmy-home#693`
|
||||
|
||||
This pipeline creates a nightly encrypted archive of `~/.hermes`, stores a local encrypted copy, uploads it to remote storage, and supports restore verification.
|
||||
|
||||
## What gets backed up
|
||||
|
||||
By default the pipeline archives:
|
||||
|
||||
- `~/.hermes/config.yaml`
|
||||
- `~/.hermes/state.db`
|
||||
- `~/.hermes/sessions/`
|
||||
- `~/.hermes/cron/`
|
||||
- any other files under `~/.hermes`
|
||||
|
||||
Override the source with `BACKUP_SOURCE_DIR=/path/to/.hermes`.
|
||||
|
||||
## Backup command
|
||||
|
||||
```bash
|
||||
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
|
||||
BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups \
|
||||
bash scripts/backup_pipeline.sh
|
||||
```
|
||||
|
||||
The script writes:
|
||||
|
||||
- local encrypted copy: `~/.timmy-backups/hermes/<timestamp>/hermes-backup-<timestamp>.tar.gz.enc`
|
||||
- local manifest: `~/.timmy-backups/hermes/<timestamp>/hermes-backup-<timestamp>.json`
|
||||
- log file: `~/.timmy-backups/hermes/logs/backup_pipeline.log`
|
||||
|
||||
## Nightly schedule
|
||||
|
||||
Run every night at 03:00:
|
||||
|
||||
```cron
|
||||
0 3 * * * cd /Users/apayne/.timmy/timmy-home && BACKUP_PASSPHRASE_FILE=/Users/apayne/.config/timmy/backup.passphrase BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups bash scripts/backup_pipeline.sh >> /Users/apayne/.timmy-backups/hermes/logs/cron.log 2>&1
|
||||
```
|
||||
|
||||
## Remote targets
|
||||
|
||||
At least one remote target must be configured.
|
||||
|
||||
### Local NAS
|
||||
|
||||
Use a mounted path:
|
||||
|
||||
```bash
|
||||
BACKUP_NAS_TARGET=/Volumes/timmy-nas/hermes-backups
|
||||
```
|
||||
|
||||
The pipeline copies the encrypted archive and manifest into `<BACKUP_NAS_TARGET>/<timestamp>/`.
|
||||
|
||||
### S3-compatible storage
|
||||
|
||||
```bash
|
||||
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
|
||||
BACKUP_S3_URI=s3://timmy-backups/hermes \
|
||||
AWS_ENDPOINT_URL=https://minio.example.com \
|
||||
bash scripts/backup_pipeline.sh
|
||||
```
|
||||
|
||||
Notes:
|
||||
|
||||
- `aws` CLI must be installed if `BACKUP_S3_URI` is set.
|
||||
- `AWS_ENDPOINT_URL` is optional and is used for MinIO, R2, and other S3-compatible endpoints.
|
||||
|
||||
## Restore playbook
|
||||
|
||||
Restore an encrypted archive into a clean target root:
|
||||
|
||||
```bash
|
||||
BACKUP_PASSPHRASE_FILE=~/.config/timmy/backup.passphrase \
|
||||
bash scripts/restore_backup.sh \
|
||||
/Volumes/timmy-nas/hermes-backups/20260415-030000/hermes-backup-20260415-030000.tar.gz.enc \
|
||||
/tmp/hermes-restore
|
||||
```
|
||||
|
||||
Result:
|
||||
|
||||
- restored tree lands at `/tmp/hermes-restore/.hermes`
|
||||
- if a sibling manifest exists, the restore script verifies the archive SHA256 before decrypting
|
||||
|
||||
## End-to-end verification
|
||||
|
||||
Run the regression suite:
|
||||
|
||||
```bash
|
||||
python3 -m unittest discover -s tests -p 'test_backup_pipeline.py' -v
|
||||
```
|
||||
|
||||
This proves:
|
||||
|
||||
1. the backup output is encrypted
|
||||
2. plaintext archives do not leak into the backup destinations
|
||||
3. the restore script recreates the original `.hermes` tree end-to-end
|
||||
4. the pipeline refuses to run without a remote target
|
||||
@@ -12,8 +12,6 @@ Quick-reference index for common operational tasks across the Timmy Foundation i
|
||||
| Check fleet health | fleet-ops | `python3 scripts/fleet_readiness.py` |
|
||||
| Agent scorecard | fleet-ops | `python3 scripts/agent_scorecard.py` |
|
||||
| View fleet manifest | fleet-ops | `cat manifest.yaml` |
|
||||
| Backup Hermes state | timmy-home | `BACKUP_PASSPHRASE_FILE=... BACKUP_NAS_TARGET=... bash scripts/backup_pipeline.sh` |
|
||||
| Restore Hermes state | timmy-home | `BACKUP_PASSPHRASE_FILE=... bash scripts/restore_backup.sh <archive> <restore-root>` |
|
||||
|
||||
## the-nexus (Frontend + Brain)
|
||||
|
||||
|
||||
329
scripts/autonomous_issue_creator.py
Normal file
329
scripts/autonomous_issue_creator.py
Normal file
@@ -0,0 +1,329 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Create or refresh fleet incidents on Gitea from local infrastructure signals.
|
||||
|
||||
Refs: timmy-home #553
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
from urllib import request
|
||||
|
||||
DEFAULT_BASE_URL = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
DEFAULT_OWNER = "Timmy_Foundation"
|
||||
DEFAULT_REPO = "timmy-home"
|
||||
DEFAULT_TOKEN_FILE = Path.home() / ".config" / "gitea" / "token"
|
||||
DEFAULT_FAILOVER_STATUS = Path.home() / ".timmy" / "failover_status.json"
|
||||
DEFAULT_RESTART_STATE_DIR = Path("/var/lib/timmy/restarts")
|
||||
DEFAULT_HEARTBEAT_FILE = Path("/var/lib/timmy/heartbeats/fleet_health.last")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Incident:
|
||||
fingerprint: str
|
||||
title: str
|
||||
body: str
|
||||
|
||||
def latest_evidence(self) -> str:
|
||||
lines = [line for line in self.body.splitlines() if line.strip()]
|
||||
if lines and lines[0].startswith("Fingerprint: "):
|
||||
lines = lines[1:]
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str, owner: str = DEFAULT_OWNER, repo: str = DEFAULT_REPO, base_url: str = DEFAULT_BASE_URL):
|
||||
self.token = token
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.base_url = base_url.rstrip("/")
|
||||
|
||||
def _request(self, path: str, *, method: str = "GET", data: dict | None = None):
|
||||
payload = None if data is None else json.dumps(data).encode()
|
||||
headers = {"Authorization": f"token {self.token}"}
|
||||
if payload is not None:
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = request.Request(f"{self.base_url}{path}", data=payload, headers=headers, method=method)
|
||||
with request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
def list_open_issues(self):
|
||||
issues = self._request(f"/repos/{self.owner}/{self.repo}/issues?state=open&limit=100")
|
||||
return [issue for issue in issues if not issue.get("pull_request")]
|
||||
|
||||
def create_issue(self, title: str, body: str):
|
||||
return self._request(
|
||||
f"/repos/{self.owner}/{self.repo}/issues",
|
||||
method="POST",
|
||||
data={"title": title, "body": body},
|
||||
)
|
||||
|
||||
def comment_issue(self, issue_number: int, body: str):
|
||||
return self._request(
|
||||
f"/repos/{self.owner}/{self.repo}/issues/{issue_number}/comments",
|
||||
method="POST",
|
||||
data={"body": body},
|
||||
)
|
||||
|
||||
|
||||
def load_json(path: Path):
|
||||
if not path.exists():
|
||||
return None
|
||||
return json.loads(path.read_text())
|
||||
|
||||
|
||||
def load_restart_counts(state_dir: Path) -> dict[str, int]:
|
||||
if not state_dir.exists():
|
||||
return {}
|
||||
|
||||
counts: dict[str, int] = {}
|
||||
for path in sorted(state_dir.glob("*.count")):
|
||||
try:
|
||||
counts[path.stem] = int(path.read_text().strip())
|
||||
except ValueError:
|
||||
continue
|
||||
return counts
|
||||
|
||||
|
||||
def heartbeat_is_stale(path: Path, *, now: datetime | None = None, max_age_seconds: int = 900) -> bool:
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
if not path.exists():
|
||||
return True
|
||||
age = now.timestamp() - path.stat().st_mtime
|
||||
return age > max_age_seconds
|
||||
|
||||
|
||||
def _iso(dt: datetime) -> str:
|
||||
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
|
||||
|
||||
|
||||
def _build_body(fingerprint: str, *details: str) -> str:
|
||||
detail_lines = [detail for detail in details if detail]
|
||||
return "\n".join([f"Fingerprint: {fingerprint}", *detail_lines])
|
||||
|
||||
|
||||
def build_incidents(
|
||||
*,
|
||||
failover_status: dict | None,
|
||||
restart_counts: dict[str, int],
|
||||
heartbeat_stale: bool,
|
||||
now: datetime | None = None,
|
||||
restart_escalation_threshold: int = 3,
|
||||
) -> list[Incident]:
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
incidents: list[Incident] = []
|
||||
failover_timestamp = None
|
||||
fleet = {}
|
||||
if failover_status:
|
||||
failover_timestamp = failover_status.get("timestamp")
|
||||
fleet = failover_status.get("fleet") or {}
|
||||
|
||||
for host, status in sorted(fleet.items()):
|
||||
if str(status).upper() == "ONLINE":
|
||||
continue
|
||||
fingerprint = f"host-offline:{host}"
|
||||
failover_detail = f"Failover status timestamp: {failover_timestamp}" if failover_timestamp is not None else "Failover status timestamp: unknown"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title=f"[AUTO] Fleet host offline: {host}",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
failover_detail,
|
||||
f"Host `{host}` reported `{status}` by failover monitor.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
for process_name, count in sorted(restart_counts.items()):
|
||||
if count <= restart_escalation_threshold:
|
||||
continue
|
||||
fingerprint = f"restart-escalation:{process_name}"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title=f"[AUTO] Restart escalation: {process_name}",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
f"Process `{process_name}` has crossed the restart escalation threshold with count={count}.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
if heartbeat_stale:
|
||||
fingerprint = "probe-stale:fleet-health"
|
||||
incidents.append(
|
||||
Incident(
|
||||
fingerprint=fingerprint,
|
||||
title="[AUTO] Fleet health probe stale",
|
||||
body=_build_body(
|
||||
fingerprint,
|
||||
f"Detected at: {_iso(now)}",
|
||||
"Heartbeat missing or older than the configured fleet health maximum age.",
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
return incidents
|
||||
|
||||
|
||||
def find_matching_issue(incident: Incident, open_issues: Iterable[dict]) -> dict | None:
|
||||
for issue in open_issues:
|
||||
haystack = "\n".join([issue.get("title") or "", issue.get("body") or ""])
|
||||
if incident.fingerprint in haystack or incident.title == issue.get("title"):
|
||||
return issue
|
||||
return None
|
||||
|
||||
|
||||
def build_repeat_comment(incident: Incident) -> str:
|
||||
return (
|
||||
"Autonomous infrastructure detector saw the same incident again.\n\n"
|
||||
f"Fingerprint: {incident.fingerprint}\n\n"
|
||||
f"Latest evidence:\n{incident.latest_evidence()}"
|
||||
)
|
||||
|
||||
|
||||
def sync_incidents(
|
||||
incidents: Iterable[Incident],
|
||||
client: GiteaClient,
|
||||
*,
|
||||
apply: bool = False,
|
||||
comment_existing: bool = True,
|
||||
):
|
||||
open_issues = list(client.list_open_issues())
|
||||
results = []
|
||||
|
||||
for incident in incidents:
|
||||
existing = find_matching_issue(incident, open_issues)
|
||||
if existing:
|
||||
action = "existing"
|
||||
if apply and comment_existing:
|
||||
client.comment_issue(existing["number"], build_repeat_comment(incident))
|
||||
action = "commented"
|
||||
results.append(
|
||||
{
|
||||
"action": action,
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": existing["number"],
|
||||
"title": existing.get("title"),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if apply:
|
||||
created = client.create_issue(incident.title, incident.body)
|
||||
open_issues.append(created)
|
||||
results.append(
|
||||
{
|
||||
"action": "created",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": created["number"],
|
||||
"title": created.get("title"),
|
||||
}
|
||||
)
|
||||
else:
|
||||
results.append(
|
||||
{
|
||||
"action": "would_create",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": None,
|
||||
"title": incident.title,
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Create or refresh fleet incidents on Gitea from local infrastructure signals.")
|
||||
parser.add_argument("--owner", default=DEFAULT_OWNER)
|
||||
parser.add_argument("--repo", default=DEFAULT_REPO)
|
||||
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
|
||||
parser.add_argument("--token-file", type=Path, default=DEFAULT_TOKEN_FILE)
|
||||
parser.add_argument("--failover-status", type=Path, default=DEFAULT_FAILOVER_STATUS)
|
||||
parser.add_argument("--restart-state-dir", type=Path, default=DEFAULT_RESTART_STATE_DIR)
|
||||
parser.add_argument("--heartbeat-file", type=Path, default=DEFAULT_HEARTBEAT_FILE)
|
||||
parser.add_argument("--heartbeat-max-age-seconds", type=int, default=900)
|
||||
parser.add_argument("--restart-escalation-threshold", type=int, default=3)
|
||||
parser.add_argument("--apply", action="store_true", help="Create/comment issues instead of reporting what would happen.")
|
||||
parser.add_argument("--no-comment-existing", action="store_true", help="Do not comment on existing matching issues.")
|
||||
parser.add_argument("--json", action="store_true", help="Emit machine-readable JSON output.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
failover_status = load_json(args.failover_status)
|
||||
restart_counts = load_restart_counts(args.restart_state_dir)
|
||||
heartbeat_stale = heartbeat_is_stale(
|
||||
args.heartbeat_file,
|
||||
now=now,
|
||||
max_age_seconds=args.heartbeat_max_age_seconds,
|
||||
)
|
||||
incidents = build_incidents(
|
||||
failover_status=failover_status,
|
||||
restart_counts=restart_counts,
|
||||
heartbeat_stale=heartbeat_stale,
|
||||
now=now,
|
||||
restart_escalation_threshold=args.restart_escalation_threshold,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"generated_at": _iso(now),
|
||||
"incidents": [incident.__dict__ for incident in incidents],
|
||||
"results": [],
|
||||
}
|
||||
|
||||
token = None
|
||||
if args.token_file.exists():
|
||||
token = args.token_file.read_text().strip()
|
||||
|
||||
if args.apply and not token:
|
||||
raise SystemExit(f"Token file not found: {args.token_file}")
|
||||
|
||||
if token:
|
||||
client = GiteaClient(token=token, owner=args.owner, repo=args.repo, base_url=args.base_url)
|
||||
payload["results"] = sync_incidents(
|
||||
incidents,
|
||||
client,
|
||||
apply=args.apply,
|
||||
comment_existing=not args.no_comment_existing,
|
||||
)
|
||||
else:
|
||||
payload["results"] = [
|
||||
{
|
||||
"action": "local_only",
|
||||
"fingerprint": incident.fingerprint,
|
||||
"issue_number": None,
|
||||
"title": incident.title,
|
||||
}
|
||||
for incident in incidents
|
||||
]
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(payload, indent=2))
|
||||
else:
|
||||
print(f"Generated at: {payload['generated_at']}")
|
||||
if not incidents:
|
||||
print("No autonomous infrastructure incidents detected.")
|
||||
for incident in incidents:
|
||||
print(f"- {incident.title} [{incident.fingerprint}]")
|
||||
for result in payload["results"]:
|
||||
print(f" -> {result['action']}: {result['title']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,170 +1,80 @@
|
||||
#!/usr/bin/env bash
|
||||
# backup_pipeline.sh — Nightly encrypted Hermes backup pipeline
|
||||
# Refs: timmy-home #693, timmy-home #561
|
||||
# backup_pipeline.sh — Daily fleet backup pipeline (FLEET-008)
|
||||
# Refs: timmy-home #561
|
||||
set -euo pipefail
|
||||
|
||||
DATESTAMP="${BACKUP_TIMESTAMP:-$(date +%Y%m%d-%H%M%S)}"
|
||||
BACKUP_SOURCE_DIR="${BACKUP_SOURCE_DIR:-${HOME}/.hermes}"
|
||||
BACKUP_ROOT="${BACKUP_ROOT:-${HOME}/.timmy-backups/hermes}"
|
||||
BACKUP_LOG_DIR="${BACKUP_LOG_DIR:-${BACKUP_ROOT}/logs}"
|
||||
BACKUP_RETENTION_DAYS="${BACKUP_RETENTION_DAYS:-14}"
|
||||
BACKUP_S3_URI="${BACKUP_S3_URI:-}"
|
||||
BACKUP_NAS_TARGET="${BACKUP_NAS_TARGET:-}"
|
||||
AWS_ENDPOINT_URL="${AWS_ENDPOINT_URL:-}"
|
||||
BACKUP_NAME="hermes-backup-${DATESTAMP}"
|
||||
LOCAL_BACKUP_DIR="${BACKUP_ROOT}/${DATESTAMP}"
|
||||
STAGE_DIR="$(mktemp -d "${TMPDIR:-/tmp}/timmy-backup.XXXXXX")"
|
||||
PLAINTEXT_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz"
|
||||
ENCRYPTED_ARCHIVE="${STAGE_DIR}/${BACKUP_NAME}.tar.gz.enc"
|
||||
MANIFEST_PATH="${STAGE_DIR}/${BACKUP_NAME}.json"
|
||||
ALERT_LOG="${BACKUP_LOG_DIR}/backup_pipeline.log"
|
||||
PASSFILE_CLEANUP=""
|
||||
BACKUP_ROOT="/backups/timmy"
|
||||
DATESTAMP=$(date +%Y%m%d-%H%M%S)
|
||||
BACKUP_DIR="${BACKUP_ROOT}/${DATESTAMP}"
|
||||
LOG_DIR="/var/log/timmy"
|
||||
ALERT_LOG="${LOG_DIR}/backup_pipeline.log"
|
||||
mkdir -p "$BACKUP_DIR" "$LOG_DIR"
|
||||
|
||||
mkdir -p "$BACKUP_LOG_DIR"
|
||||
TELEGRAM_BOT_TOKEN="${TELEGRAM_BOT_TOKEN:-}"
|
||||
TELEGRAM_CHAT_ID="${TELEGRAM_CHAT_ID:-}"
|
||||
OFFSITE_TARGET="${OFFSITE_TARGET:-}"
|
||||
|
||||
log() {
|
||||
echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"
|
||||
}
|
||||
log() { echo "[$(date -Iseconds)] $1" | tee -a "$ALERT_LOG"; }
|
||||
|
||||
fail() {
|
||||
log "ERROR: $1"
|
||||
exit 1
|
||||
}
|
||||
|
||||
cleanup() {
|
||||
rm -f "$PLAINTEXT_ARCHIVE"
|
||||
rm -rf "$STAGE_DIR"
|
||||
if [[ -n "$PASSFILE_CLEANUP" && -f "$PASSFILE_CLEANUP" ]]; then
|
||||
rm -f "$PASSFILE_CLEANUP"
|
||||
send_telegram() {
|
||||
local msg="$1"
|
||||
if [[ -n "$TELEGRAM_BOT_TOKEN" && -n "$TELEGRAM_CHAT_ID" ]]; then
|
||||
curl -s -X POST "https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}/sendMessage" \
|
||||
-d "chat_id=${TELEGRAM_CHAT_ID}" -d "text=${msg}" >/dev/null 2>&1 || true
|
||||
fi
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
resolve_passphrase_file() {
|
||||
if [[ -n "${BACKUP_PASSPHRASE_FILE:-}" ]]; then
|
||||
[[ -f "$BACKUP_PASSPHRASE_FILE" ]] || fail "BACKUP_PASSPHRASE_FILE does not exist: $BACKUP_PASSPHRASE_FILE"
|
||||
echo "$BACKUP_PASSPHRASE_FILE"
|
||||
return
|
||||
status=0
|
||||
|
||||
# --- Gitea repositories ---
|
||||
if [[ -d /root/gitea ]]; then
|
||||
tar czf "${BACKUP_DIR}/gitea-repos.tar.gz" -C /root gitea 2>/dev/null || true
|
||||
log "Backed up Gitea repos"
|
||||
fi
|
||||
|
||||
# --- Agent configs and state ---
|
||||
for wiz in bezalel allegro ezra timmy; do
|
||||
if [[ -d "/root/wizards/${wiz}" ]]; then
|
||||
tar czf "${BACKUP_DIR}/${wiz}-home.tar.gz" -C /root/wizards "${wiz}" 2>/dev/null || true
|
||||
log "Backed up ${wiz} home"
|
||||
fi
|
||||
done
|
||||
|
||||
if [[ -n "${BACKUP_PASSPHRASE:-}" ]]; then
|
||||
PASSFILE_CLEANUP="${STAGE_DIR}/backup.passphrase"
|
||||
printf '%s' "$BACKUP_PASSPHRASE" > "$PASSFILE_CLEANUP"
|
||||
chmod 600 "$PASSFILE_CLEANUP"
|
||||
echo "$PASSFILE_CLEANUP"
|
||||
return
|
||||
fi
|
||||
# --- System configs ---
|
||||
cp /etc/crontab "${BACKUP_DIR}/crontab" 2>/dev/null || true
|
||||
cp -r /etc/systemd/system "${BACKUP_DIR}/systemd" 2>/dev/null || true
|
||||
log "Backed up system configs"
|
||||
|
||||
fail "Set BACKUP_PASSPHRASE_FILE or BACKUP_PASSPHRASE before running the backup pipeline."
|
||||
}
|
||||
# --- Evennia worlds (if present) ---
|
||||
if [[ -d /root/evennia ]]; then
|
||||
tar czf "${BACKUP_DIR}/evennia-worlds.tar.gz" -C /root evennia 2>/dev/null || true
|
||||
log "Backed up Evennia worlds"
|
||||
fi
|
||||
|
||||
sha256_file() {
|
||||
local path="$1"
|
||||
if command -v shasum >/dev/null 2>&1; then
|
||||
shasum -a 256 "$path" | awk '{print $1}'
|
||||
elif command -v sha256sum >/dev/null 2>&1; then
|
||||
sha256sum "$path" | awk '{print $1}'
|
||||
# --- Manifest ---
|
||||
find "$BACKUP_DIR" -type f > "${BACKUP_DIR}/manifest.txt"
|
||||
log "Backup manifest written"
|
||||
|
||||
# --- Offsite sync ---
|
||||
if [[ -n "$OFFSITE_TARGET" ]]; then
|
||||
if rsync -az --delete "${BACKUP_DIR}/" "${OFFSITE_TARGET}/${DATESTAMP}/" 2>/dev/null; then
|
||||
log "Offsite sync completed"
|
||||
else
|
||||
python3 - <<'PY' "$path"
|
||||
import hashlib
|
||||
import pathlib
|
||||
import sys
|
||||
path = pathlib.Path(sys.argv[1])
|
||||
h = hashlib.sha256()
|
||||
with path.open('rb') as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b''):
|
||||
h.update(chunk)
|
||||
print(h.hexdigest())
|
||||
PY
|
||||
log "WARNING: Offsite sync failed"
|
||||
status=1
|
||||
fi
|
||||
}
|
||||
|
||||
write_manifest() {
|
||||
python3 - <<'PY' "$1" "$2" "$3" "$4" "$5" "$6" "$7" "$8"
|
||||
import json
|
||||
import sys
|
||||
manifest_path, source_dir, archive_name, archive_sha256, local_dir, s3_uri, nas_target, created_at = sys.argv[1:]
|
||||
manifest = {
|
||||
"created_at": created_at,
|
||||
"source_dir": source_dir,
|
||||
"archive_name": archive_name,
|
||||
"archive_sha256": archive_sha256,
|
||||
"encryption": {
|
||||
"type": "openssl",
|
||||
"cipher": "aes-256-cbc",
|
||||
"pbkdf2": True,
|
||||
"iterations": 200000,
|
||||
},
|
||||
"destinations": {
|
||||
"local_dir": local_dir,
|
||||
"s3_uri": s3_uri or None,
|
||||
"nas_target": nas_target or None,
|
||||
},
|
||||
}
|
||||
with open(manifest_path, 'w', encoding='utf-8') as handle:
|
||||
json.dump(manifest, handle, indent=2)
|
||||
handle.write('\n')
|
||||
PY
|
||||
}
|
||||
|
||||
upload_to_nas() {
|
||||
local archive_path="$1"
|
||||
local manifest_path="$2"
|
||||
local target_root="$3"
|
||||
|
||||
local target_dir="${target_root%/}/${DATESTAMP}"
|
||||
mkdir -p "$target_dir"
|
||||
cp "$archive_path" "$manifest_path" "$target_dir/"
|
||||
log "Uploaded backup to NAS target: $target_dir"
|
||||
}
|
||||
|
||||
upload_to_s3() {
|
||||
local archive_path="$1"
|
||||
local manifest_path="$2"
|
||||
|
||||
command -v aws >/dev/null 2>&1 || fail "BACKUP_S3_URI is set but aws CLI is not installed."
|
||||
|
||||
local args=()
|
||||
if [[ -n "$AWS_ENDPOINT_URL" ]]; then
|
||||
args+=(--endpoint-url "$AWS_ENDPOINT_URL")
|
||||
fi
|
||||
|
||||
aws "${args[@]}" s3 cp "$archive_path" "${BACKUP_S3_URI%/}/$(basename "$archive_path")"
|
||||
aws "${args[@]}" s3 cp "$manifest_path" "${BACKUP_S3_URI%/}/$(basename "$manifest_path")"
|
||||
log "Uploaded backup to S3 target: $BACKUP_S3_URI"
|
||||
}
|
||||
|
||||
[[ -d "$BACKUP_SOURCE_DIR" ]] || fail "BACKUP_SOURCE_DIR does not exist: $BACKUP_SOURCE_DIR"
|
||||
[[ -n "$BACKUP_NAS_TARGET" || -n "$BACKUP_S3_URI" ]] || fail "Set BACKUP_NAS_TARGET or BACKUP_S3_URI for remote backup storage."
|
||||
|
||||
PASSFILE="$(resolve_passphrase_file)"
|
||||
mkdir -p "$LOCAL_BACKUP_DIR"
|
||||
|
||||
log "Creating archive from $BACKUP_SOURCE_DIR"
|
||||
tar -czf "$PLAINTEXT_ARCHIVE" -C "$(dirname "$BACKUP_SOURCE_DIR")" "$(basename "$BACKUP_SOURCE_DIR")"
|
||||
|
||||
log "Encrypting archive"
|
||||
openssl enc -aes-256-cbc -salt -pbkdf2 -iter 200000 \
|
||||
-pass "file:${PASSFILE}" \
|
||||
-in "$PLAINTEXT_ARCHIVE" \
|
||||
-out "$ENCRYPTED_ARCHIVE"
|
||||
|
||||
ARCHIVE_SHA256="$(sha256_file "$ENCRYPTED_ARCHIVE")"
|
||||
CREATED_AT="$(date -u '+%Y-%m-%dT%H:%M:%SZ')"
|
||||
write_manifest "$MANIFEST_PATH" "$BACKUP_SOURCE_DIR" "$(basename "$ENCRYPTED_ARCHIVE")" "$ARCHIVE_SHA256" "$LOCAL_BACKUP_DIR" "$BACKUP_S3_URI" "$BACKUP_NAS_TARGET" "$CREATED_AT"
|
||||
|
||||
cp "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$LOCAL_BACKUP_DIR/"
|
||||
rm -f "$PLAINTEXT_ARCHIVE"
|
||||
log "Encrypted backup stored locally: ${LOCAL_BACKUP_DIR}/$(basename "$ENCRYPTED_ARCHIVE")"
|
||||
|
||||
if [[ -n "$BACKUP_NAS_TARGET" ]]; then
|
||||
upload_to_nas "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH" "$BACKUP_NAS_TARGET"
|
||||
fi
|
||||
|
||||
if [[ -n "$BACKUP_S3_URI" ]]; then
|
||||
upload_to_s3 "$ENCRYPTED_ARCHIVE" "$MANIFEST_PATH"
|
||||
# --- Retention: keep last 7 days ---
|
||||
find "$BACKUP_ROOT" -mindepth 1 -maxdepth 1 -type d -mtime +7 -exec rm -rf {} + 2>/dev/null || true
|
||||
log "Retention applied (7 days)"
|
||||
|
||||
if [[ "$status" -eq 0 ]]; then
|
||||
log "Backup pipeline completed: ${BACKUP_DIR}"
|
||||
send_telegram "✅ Daily backup completed: ${DATESTAMP}"
|
||||
else
|
||||
log "Backup pipeline completed with WARNINGS: ${BACKUP_DIR}"
|
||||
send_telegram "⚠️ Daily backup completed with warnings: ${DATESTAMP}"
|
||||
fi
|
||||
|
||||
find "$BACKUP_ROOT" -mindepth 1 -maxdepth 1 -type d -name '20*' -mtime "+${BACKUP_RETENTION_DAYS}" -exec rm -rf {} + 2>/dev/null || true
|
||||
log "Retention applied (${BACKUP_RETENTION_DAYS} days)"
|
||||
log "Backup pipeline completed successfully"
|
||||
exit "$status"
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# restore_backup.sh — Restore an encrypted Hermes backup archive
|
||||
# Usage: restore_backup.sh /path/to/hermes-backup-YYYYmmdd-HHMMSS.tar.gz.enc /restore/root
|
||||
set -euo pipefail
|
||||
|
||||
ARCHIVE_PATH="${1:-}"
|
||||
RESTORE_ROOT="${2:-}"
|
||||
STAGE_DIR="$(mktemp -d "${TMPDIR:-/tmp}/timmy-restore.XXXXXX")"
|
||||
PLAINTEXT_ARCHIVE="${STAGE_DIR}/restore.tar.gz"
|
||||
PASSFILE_CLEANUP=""
|
||||
|
||||
cleanup() {
|
||||
rm -f "$PLAINTEXT_ARCHIVE"
|
||||
rm -rf "$STAGE_DIR"
|
||||
if [[ -n "$PASSFILE_CLEANUP" && -f "$PASSFILE_CLEANUP" ]]; then
|
||||
rm -f "$PASSFILE_CLEANUP"
|
||||
fi
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
fail() {
|
||||
echo "ERROR: $1" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
resolve_passphrase_file() {
|
||||
if [[ -n "${BACKUP_PASSPHRASE_FILE:-}" ]]; then
|
||||
[[ -f "$BACKUP_PASSPHRASE_FILE" ]] || fail "BACKUP_PASSPHRASE_FILE does not exist: $BACKUP_PASSPHRASE_FILE"
|
||||
echo "$BACKUP_PASSPHRASE_FILE"
|
||||
return
|
||||
fi
|
||||
|
||||
if [[ -n "${BACKUP_PASSPHRASE:-}" ]]; then
|
||||
PASSFILE_CLEANUP="${STAGE_DIR}/backup.passphrase"
|
||||
printf '%s' "$BACKUP_PASSPHRASE" > "$PASSFILE_CLEANUP"
|
||||
chmod 600 "$PASSFILE_CLEANUP"
|
||||
echo "$PASSFILE_CLEANUP"
|
||||
return
|
||||
fi
|
||||
|
||||
fail "Set BACKUP_PASSPHRASE_FILE or BACKUP_PASSPHRASE before restoring a backup."
|
||||
}
|
||||
|
||||
sha256_file() {
|
||||
local path="$1"
|
||||
if command -v shasum >/dev/null 2>&1; then
|
||||
shasum -a 256 "$path" | awk '{print $1}'
|
||||
elif command -v sha256sum >/dev/null 2>&1; then
|
||||
sha256sum "$path" | awk '{print $1}'
|
||||
else
|
||||
python3 - <<'PY' "$path"
|
||||
import hashlib
|
||||
import pathlib
|
||||
import sys
|
||||
path = pathlib.Path(sys.argv[1])
|
||||
h = hashlib.sha256()
|
||||
with path.open('rb') as f:
|
||||
for chunk in iter(lambda: f.read(1024 * 1024), b''):
|
||||
h.update(chunk)
|
||||
print(h.hexdigest())
|
||||
PY
|
||||
fi
|
||||
}
|
||||
|
||||
[[ -n "$ARCHIVE_PATH" ]] || fail "Usage: restore_backup.sh /path/to/archive.tar.gz.enc /restore/root"
|
||||
[[ -n "$RESTORE_ROOT" ]] || fail "Usage: restore_backup.sh /path/to/archive.tar.gz.enc /restore/root"
|
||||
[[ -f "$ARCHIVE_PATH" ]] || fail "Archive not found: $ARCHIVE_PATH"
|
||||
|
||||
if [[ "$ARCHIVE_PATH" == *.tar.gz.enc ]]; then
|
||||
MANIFEST_PATH="${ARCHIVE_PATH%.tar.gz.enc}.json"
|
||||
else
|
||||
MANIFEST_PATH=""
|
||||
fi
|
||||
|
||||
if [[ -n "$MANIFEST_PATH" && -f "$MANIFEST_PATH" ]]; then
|
||||
EXPECTED_SHA="$(python3 - <<'PY' "$MANIFEST_PATH"
|
||||
import json
|
||||
import sys
|
||||
with open(sys.argv[1], 'r', encoding='utf-8') as handle:
|
||||
manifest = json.load(handle)
|
||||
print(manifest['archive_sha256'])
|
||||
PY
|
||||
)"
|
||||
ACTUAL_SHA="$(sha256_file "$ARCHIVE_PATH")"
|
||||
[[ "$EXPECTED_SHA" == "$ACTUAL_SHA" ]] || fail "Archive SHA256 mismatch: expected $EXPECTED_SHA got $ACTUAL_SHA"
|
||||
fi
|
||||
|
||||
PASSFILE="$(resolve_passphrase_file)"
|
||||
mkdir -p "$RESTORE_ROOT"
|
||||
|
||||
openssl enc -d -aes-256-cbc -salt -pbkdf2 -iter 200000 \
|
||||
-pass "file:${PASSFILE}" \
|
||||
-in "$ARCHIVE_PATH" \
|
||||
-out "$PLAINTEXT_ARCHIVE"
|
||||
|
||||
tar -xzf "$PLAINTEXT_ARCHIVE" -C "$RESTORE_ROOT"
|
||||
echo "Restored backup into $RESTORE_ROOT"
|
||||
131
tests/test_autonomous_issue_creator.py
Normal file
131
tests/test_autonomous_issue_creator.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from scripts.autonomous_issue_creator import (
|
||||
Incident,
|
||||
build_incidents,
|
||||
heartbeat_is_stale,
|
||||
load_restart_counts,
|
||||
sync_incidents,
|
||||
)
|
||||
|
||||
|
||||
class FakeGiteaClient:
|
||||
def __init__(self, open_issues=None):
|
||||
self._open_issues = list(open_issues or [])
|
||||
self.created = []
|
||||
self.commented = []
|
||||
|
||||
def list_open_issues(self):
|
||||
return list(self._open_issues)
|
||||
|
||||
def create_issue(self, title, body):
|
||||
issue = {"number": 100 + len(self.created), "title": title, "body": body}
|
||||
self.created.append(issue)
|
||||
return issue
|
||||
|
||||
def comment_issue(self, issue_number, body):
|
||||
self.commented.append({"issue_number": issue_number, "body": body})
|
||||
|
||||
|
||||
def test_load_restart_counts_reads_only_count_files(tmp_path):
|
||||
(tmp_path / "act_runner.count").write_text("4\n")
|
||||
(tmp_path / "worker.count").write_text("2\n")
|
||||
(tmp_path / "notes.txt").write_text("ignore me")
|
||||
(tmp_path / "bad.count").write_text("not-an-int")
|
||||
|
||||
counts = load_restart_counts(tmp_path)
|
||||
|
||||
assert counts == {"act_runner": 4, "worker": 2}
|
||||
|
||||
|
||||
def test_heartbeat_is_stale_handles_missing_and_old_files(tmp_path):
|
||||
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
missing = heartbeat_is_stale(tmp_path / "missing.last", now=now, max_age_seconds=900)
|
||||
assert missing is True
|
||||
|
||||
heartbeat = tmp_path / "fleet_health.last"
|
||||
heartbeat.write_text("")
|
||||
old = now.timestamp() - 1800
|
||||
recent = now.timestamp() - 60
|
||||
|
||||
heartbeat.touch()
|
||||
os = __import__("os")
|
||||
os.utime(heartbeat, (old, old))
|
||||
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is True
|
||||
|
||||
os.utime(heartbeat, (recent, recent))
|
||||
assert heartbeat_is_stale(heartbeat, now=now, max_age_seconds=900) is False
|
||||
|
||||
|
||||
def test_build_incidents_captures_offline_hosts_restart_escalations_and_stale_probe():
|
||||
now = datetime(2026, 4, 15, 4, 0, 0, tzinfo=timezone.utc)
|
||||
failover_status = {
|
||||
"timestamp": 1713148800.0,
|
||||
"fleet": {"ezra": "ONLINE", "bezalel": "OFFLINE"},
|
||||
}
|
||||
|
||||
incidents = build_incidents(
|
||||
failover_status=failover_status,
|
||||
restart_counts={"act_runner": 4, "worker": 2},
|
||||
heartbeat_stale=True,
|
||||
now=now,
|
||||
restart_escalation_threshold=3,
|
||||
)
|
||||
|
||||
fingerprints = {incident.fingerprint for incident in incidents}
|
||||
assert fingerprints == {
|
||||
"host-offline:bezalel",
|
||||
"restart-escalation:act_runner",
|
||||
"probe-stale:fleet-health",
|
||||
}
|
||||
|
||||
titles = {incident.title for incident in incidents}
|
||||
assert "[AUTO] Fleet host offline: bezalel" in titles
|
||||
assert "[AUTO] Restart escalation: act_runner" in titles
|
||||
assert "[AUTO] Fleet health probe stale" in titles
|
||||
|
||||
|
||||
def test_sync_incidents_reuses_open_issues_and_creates_missing_ones():
|
||||
client = FakeGiteaClient(
|
||||
open_issues=[
|
||||
{
|
||||
"number": 71,
|
||||
"title": "[AUTO] Fleet host offline: bezalel",
|
||||
"body": "Fingerprint: host-offline:bezalel\n",
|
||||
}
|
||||
]
|
||||
)
|
||||
incidents = [
|
||||
Incident(
|
||||
fingerprint="host-offline:bezalel",
|
||||
title="[AUTO] Fleet host offline: bezalel",
|
||||
body="Fingerprint: host-offline:bezalel\nHost unreachable",
|
||||
),
|
||||
Incident(
|
||||
fingerprint="probe-stale:fleet-health",
|
||||
title="[AUTO] Fleet health probe stale",
|
||||
body="Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
|
||||
),
|
||||
]
|
||||
|
||||
results = sync_incidents(incidents, client, apply=True, comment_existing=True)
|
||||
|
||||
assert [result["action"] for result in results] == ["commented", "created"]
|
||||
assert client.commented == [
|
||||
{
|
||||
"issue_number": 71,
|
||||
"body": "Autonomous infrastructure detector saw the same incident again.\n\nFingerprint: host-offline:bezalel\n\nLatest evidence:\nHost unreachable",
|
||||
}
|
||||
]
|
||||
assert client.created == [
|
||||
{
|
||||
"number": 100,
|
||||
"title": "[AUTO] Fleet health probe stale",
|
||||
"body": "Fingerprint: probe-stale:fleet-health\nHeartbeat missing",
|
||||
}
|
||||
]
|
||||
@@ -1,103 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
BACKUP_SCRIPT = ROOT / "scripts" / "backup_pipeline.sh"
|
||||
RESTORE_SCRIPT = ROOT / "scripts" / "restore_backup.sh"
|
||||
|
||||
|
||||
class TestBackupPipeline(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.tempdir = tempfile.TemporaryDirectory()
|
||||
self.base = Path(self.tempdir.name)
|
||||
self.home = self.base / "home"
|
||||
self.source_dir = self.home / ".hermes"
|
||||
self.source_dir.mkdir(parents=True)
|
||||
(self.source_dir / "sessions").mkdir()
|
||||
(self.source_dir / "cron").mkdir()
|
||||
(self.source_dir / "config.yaml").write_text("model: local-first\n")
|
||||
(self.source_dir / "sessions" / "session.jsonl").write_text('{"role":"assistant","content":"hello"}\n')
|
||||
(self.source_dir / "cron" / "jobs.json").write_text('{"jobs": 1}\n')
|
||||
(self.source_dir / "state.db").write_bytes(b"sqlite-state")
|
||||
|
||||
self.backup_root = self.base / "backup-root"
|
||||
self.nas_target = self.base / "nas-target"
|
||||
self.restore_root = self.base / "restore-root"
|
||||
self.log_dir = self.base / "logs"
|
||||
self.passphrase_file = self.base / "backup.passphrase"
|
||||
self.passphrase_file.write_text("correct horse battery staple\n")
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.tempdir.cleanup()
|
||||
|
||||
def _env(self, *, include_remote: bool = True) -> dict[str, str]:
|
||||
env = os.environ.copy()
|
||||
env.update(
|
||||
{
|
||||
"HOME": str(self.home),
|
||||
"BACKUP_SOURCE_DIR": str(self.source_dir),
|
||||
"BACKUP_ROOT": str(self.backup_root),
|
||||
"BACKUP_LOG_DIR": str(self.log_dir),
|
||||
"BACKUP_PASSPHRASE_FILE": str(self.passphrase_file),
|
||||
}
|
||||
)
|
||||
if include_remote:
|
||||
env["BACKUP_NAS_TARGET"] = str(self.nas_target)
|
||||
return env
|
||||
|
||||
def test_backup_encrypts_and_restore_round_trips(self) -> None:
|
||||
backup = subprocess.run(
|
||||
["bash", str(BACKUP_SCRIPT)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=self._env(),
|
||||
cwd=ROOT,
|
||||
)
|
||||
self.assertEqual(backup.returncode, 0, msg=backup.stdout + backup.stderr)
|
||||
|
||||
encrypted_archives = sorted(self.nas_target.rglob("*.tar.gz.enc"))
|
||||
self.assertEqual(len(encrypted_archives), 1, msg=f"expected one encrypted archive, found: {encrypted_archives}")
|
||||
archive_path = encrypted_archives[0]
|
||||
self.assertNotIn(b"model: local-first", archive_path.read_bytes())
|
||||
|
||||
manifests = sorted(self.nas_target.rglob("*.json"))
|
||||
self.assertEqual(len(manifests), 1, msg=f"expected one manifest, found: {manifests}")
|
||||
|
||||
plaintext_archives = sorted(self.backup_root.rglob("*.tar.gz")) + sorted(self.nas_target.rglob("*.tar.gz"))
|
||||
self.assertEqual(plaintext_archives, [], msg=f"plaintext archives leaked: {plaintext_archives}")
|
||||
|
||||
restore = subprocess.run(
|
||||
["bash", str(RESTORE_SCRIPT), str(archive_path), str(self.restore_root)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=self._env(),
|
||||
cwd=ROOT,
|
||||
)
|
||||
self.assertEqual(restore.returncode, 0, msg=restore.stdout + restore.stderr)
|
||||
|
||||
restored_hermes = self.restore_root / ".hermes"
|
||||
self.assertTrue(restored_hermes.exists())
|
||||
self.assertEqual((restored_hermes / "config.yaml").read_text(), "model: local-first\n")
|
||||
self.assertEqual((restored_hermes / "sessions" / "session.jsonl").read_text(), '{"role":"assistant","content":"hello"}\n')
|
||||
self.assertEqual((restored_hermes / "cron" / "jobs.json").read_text(), '{"jobs": 1}\n')
|
||||
self.assertEqual((restored_hermes / "state.db").read_bytes(), b"sqlite-state")
|
||||
|
||||
def test_backup_requires_remote_target(self) -> None:
|
||||
backup = subprocess.run(
|
||||
["bash", str(BACKUP_SCRIPT)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
env=self._env(include_remote=False),
|
||||
cwd=ROOT,
|
||||
)
|
||||
self.assertNotEqual(backup.returncode, 0)
|
||||
self.assertIn("BACKUP_NAS_TARGET or BACKUP_S3_URI", backup.stdout + backup.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main(verbosity=2)
|
||||
Reference in New Issue
Block a user