Compare commits

...

1 Commits

Author SHA1 Message Date
4093c74c19 feat(big-brain): auto-stop pod when idle (#577)
Some checks failed
Smoke Test / smoke (pull_request) Failing after 8s
Cost control for L40S pod (/bin/sh.79/hr).

Idle watchdog (cron every 15 min):
- Tracks last inference request timestamp
- If idle > 30 min, stops pod via RunPod GraphQL API
- Logs stop/start events with timestamps to cost log

Auto-resume manager:
- Import before inference to ensure pod is RUNNING
- If stopped, resumes and polls until Ollama responds
- Updates timestamp on each request

Components:
- big_brain_idle_watchdog.py: idle check + pod stop
- big_brain_manager.py: auto-resume + status
- 11 tests covering all states and edge cases

Closes #577
2026-04-13 18:14:01 -04:00
3 changed files with 834 additions and 0 deletions

View File

@@ -0,0 +1,295 @@
#!/usr/bin/env python3
"""
Big Brain Idle Watchdog — Auto-stop RunPod pod when idle.
Checks the last request timestamp to the Big Brain (RunPod L40S Ollama endpoint).
If idle > threshold (default 30 min), stops the pod via RunPod GraphQL API.
Cost: $0.79/hr for L40S. Forgetting the pod for 24h = $19.
Usage:
python3 scripts/big_brain_idle_watchdog.py # check + act
python3 scripts/big_brain_idle_watchdog.py --dry-run # check only
python3 scripts/big_brain_idle_watchdog.py --threshold 60 # custom idle min
python3 scripts/big_brain_idle_watchdog.py --status # show pod status
python3 scripts/big_brain_idle_watchdog.py --json # machine-readable
Environment:
RUNPOD_API_KEY — RunPod API key (default: reads from ~/.config/runpod/access_key)
Timestamp file:
~/.hermes/big_brain_last_request — updated by hermes when inference hits big_brain
If missing, watchdog creates it with current time (grace period).
"""
import argparse
import json
import os
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from pathlib import Path
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
RUNPOD_KEY_FILE = os.path.expanduser("~/.config/runpod/access_key")
RUNPOD_API = "https://api.runpod.io/graphql"
TIMESTAMP_FILE = os.path.expanduser("~/.hermes/big_brain_last_request")
COST_LOG_FILE = os.path.expanduser("~/.hermes/big_brain_cost_log.jsonl")
DEFAULT_POD_ID = "8lfr3j47a5r3gn"
DEFAULT_THRESHOLD_MIN = 30
# Approximate cost per hour for L40S
COST_PER_HOUR = 0.79
def get_runpod_key() -> str:
"""Read RunPod API key from file or env."""
key = os.environ.get("RUNPOD_API_KEY")
if key:
return key.strip()
try:
return open(RUNPOD_KEY_FILE).read().strip()
except FileNotFoundError:
print(f"Error: RunPod API key not found at {RUNPOD_KEY_FILE} or RUNPOD_API_KEY env", file=sys.stderr)
sys.exit(1)
def runpod_graphql(query: str, key: str = None) -> dict:
"""Execute a RunPod GraphQL query/mutation."""
if key is None:
key = get_runpod_key()
req = urllib.request.Request(
RUNPOD_API,
data=json.dumps({"query": query}).encode(),
headers={
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode()
return {"errors": [{"message": body, "code": e.code}]}
except Exception as e:
return {"errors": [{"message": str(e)}]}
# ---------------------------------------------------------------------------
# Pod status
# ---------------------------------------------------------------------------
def get_pod_status(pod_id: str) -> dict:
"""Get pod status via GraphQL."""
query = f"""
query {{
pod(id: "{pod_id}") {{
id
name
desiredStatus
machineId
costPerHr
gpuCount
uptimeSeconds
}}
}}
"""
result = runpod_graphql(query)
if "errors" in result:
return {"error": result["errors"][0].get("message", "unknown")}
return result.get("data", {}).get("pod", {})
def stop_pod(pod_id: str) -> dict:
"""Stop a pod via podStop mutation."""
query = f"""
mutation {{
podStop(input: {{ podId: "{pod_id}" }}) {{
id
desiredStatus
}}
}}
"""
return runpod_graphql(query)
def resume_pod(pod_id: str) -> dict:
"""Resume a stopped pod via podResume mutation."""
query = f"""
mutation {{
podResume(input: {{ podId: "{pod_id}" }}) {{
id
desiredStatus
}}
}}
"""
return runpod_graphql(query)
# ---------------------------------------------------------------------------
# Idle tracking
# ---------------------------------------------------------------------------
def get_last_request_time() -> datetime | None:
"""Read last request timestamp from file."""
try:
ts_str = open(TIMESTAMP_FILE).read().strip()
return datetime.fromisoformat(ts_str)
except (FileNotFoundError, ValueError):
return None
def touch_last_request():
"""Update last request timestamp to now."""
os.makedirs(os.path.dirname(TIMESTAMP_FILE), exist_ok=True)
with open(TIMESTAMP_FILE, "w") as f:
f.write(datetime.now(timezone.utc).isoformat())
def get_idle_minutes() -> float:
"""Get minutes since last request."""
last = get_last_request_time()
if last is None:
return 0.0 # No timestamp = treat as just-active (grace)
delta = datetime.now(timezone.utc) - last
return delta.total_seconds() / 60.0
# ---------------------------------------------------------------------------
# Cost logging
# ---------------------------------------------------------------------------
def log_event(event_type: str, pod_id: str, reason: str = "", idle_minutes: float = 0):
"""Append a cost log entry."""
os.makedirs(os.path.dirname(COST_LOG_FILE), exist_ok=True)
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": event_type,
"pod_id": pod_id,
"reason": reason,
"idle_minutes": round(idle_minutes, 1),
"cost_per_hour": COST_PER_HOUR,
}
with open(COST_LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
return entry
# ---------------------------------------------------------------------------
# Main logic
# ---------------------------------------------------------------------------
def run_watchdog(pod_id: str, threshold_min: int, dry_run: bool = False) -> dict:
"""
Main watchdog logic.
Returns a summary dict for reporting.
"""
idle_min = get_idle_minutes()
pod = get_pod_status(pod_id)
current_status = pod.get("desiredStatus", "UNKNOWN")
error = pod.get("error")
result = {
"pod_id": pod_id,
"pod_status": current_status,
"idle_minutes": round(idle_min, 1),
"threshold_minutes": threshold_min,
"action": "none",
"error": error,
}
if error:
result["action"] = "error"
return result
# Only act if pod is RUNNING and idle exceeds threshold
if current_status == "RUNNING" and idle_min > threshold_min:
reason = f"idle {idle_min:.0f}min > {threshold_min}min threshold"
if dry_run:
result["action"] = "would_stop"
result["reason"] = reason
else:
stop_result = stop_pod(pod_id)
if "errors" in stop_result:
result["action"] = "stop_failed"
result["error"] = stop_result["errors"][0].get("message", "unknown")
else:
result["action"] = "stopped"
result["reason"] = reason
log_event("stop", pod_id, reason, idle_min)
elif current_status == "EXITED" and idle_min > threshold_min:
result["action"] = "already_stopped"
elif current_status == "RUNNING" and idle_min <= threshold_min:
result["action"] = "active"
else:
result["action"] = "no_action"
return result
def main():
parser = argparse.ArgumentParser(description="Big Brain idle watchdog — auto-stop RunPod pod")
parser.add_argument("--pod-id", default=DEFAULT_POD_ID, help="RunPod pod ID")
parser.add_argument("--threshold", type=int, default=DEFAULT_THRESHOLD_MIN, help="Idle minutes before stop")
parser.add_argument("--dry-run", action="store_true", help="Check only, don't stop")
parser.add_argument("--status", action="store_true", help="Show pod status only")
parser.add_argument("--json", dest="as_json", action="store_true", help="JSON output")
parser.add_argument("--touch", action="store_true", help="Update last request timestamp to now")
args = parser.parse_args()
if args.touch:
touch_last_request()
print(f"Last request timestamp updated to now ({TIMESTAMP_FILE})")
return
if args.status:
pod = get_pod_status(args.pod_id)
idle_min = get_idle_minutes()
last_req = get_last_request_time()
if args.as_json:
print(json.dumps({"pod": pod, "idle_minutes": round(idle_min, 1), "last_request": last_req.isoformat() if last_req else None}, indent=2))
else:
print(f"Pod: {args.pod_id}")
print(f"Status: {pod.get('desiredStatus', 'UNKNOWN')}")
print(f"Idle: {idle_min:.1f} min")
print(f"Last req: {last_req.isoformat() if last_req else 'never (grace period)'}")
print(f"Cost/hr: ${COST_PER_HOUR}")
if pod.get("uptimeSeconds"):
hrs = pod["uptimeSeconds"] / 3600
print(f"Uptime: {hrs:.1f} hrs (${hrs * COST_PER_HOUR:.2f})")
return
result = run_watchdog(args.pod_id, args.threshold, args.dry_run)
if args.as_json:
print(json.dumps(result, indent=2))
else:
status_emoji = {
"none": "---",
"active": "ACTIVE",
"already_stopped": "STOPPED",
"stopped": "STOPPED",
"would_stop": "WOULD STOP",
"stop_failed": "STOP FAILED",
"error": "ERROR",
"no_action": "OK",
}
label = status_emoji.get(result["action"], result["action"])
print(f"[Big Brain] {label} | pod={result['pod_status']} | idle={result['idle_minutes']}m | threshold={result['threshold_minutes']}m")
if result.get("reason"):
print(f" Reason: {result['reason']}")
if result.get("error"):
print(f" Error: {result['error']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,194 @@
#!/usr/bin/env python3
"""
Big Brain Manager — Auto-resume pod before inference.
Import this module or run standalone to ensure the Big Brain pod is
RUNNING before sending inference requests. If the pod is stopped,
resumes it and polls until ready.
Usage:
python3 scripts/big_brain_manager.py --ensure-running # resume if stopped
python3 scripts/big_brain_manager.py --status # just check status
python3 scripts/big_brain_manager.py --stop # manually stop
python3 scripts/big_brain_manager.py --resume # manually resume
python3 scripts/big_brain_manager.py --json # machine-readable
As a library:
from big_brain_manager import ensure_running, touch_request
ensure_running() # blocks until pod is RUNNING
touch_request() # update idle timestamp
"""
import json
import os
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone
# Reuse watchdog internals
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from big_brain_idle_watchdog import (
get_pod_status, stop_pod, resume_pod, get_runpod_key,
touch_last_request, get_last_request_time, log_event,
DEFAULT_POD_ID, COST_PER_HOUR, TIMESTAMP_FILE,
)
OLLAMA_ENDPOINT = "https://{pod_id}-11434.proxy.runpod.net"
POLL_INTERVAL_SEC = 10
MAX_POLL_ATTEMPTS = 30 # 5 minutes max wait
def is_ollama_ready(pod_id: str) -> bool:
"""Check if Ollama is responding on the pod endpoint."""
url = OLLAMA_ENDPOINT.format(pod_id=pod_id) + "/api/tags"
try:
req = urllib.request.Request(url, method="GET")
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status == 200
except Exception:
return False
def ensure_running(pod_id: str = DEFAULT_POD_ID, timeout_sec: int = 300) -> dict:
"""
Ensure the Big Brain pod is running. Resume if stopped, wait until ready.
Returns status dict with action taken and final state.
"""
pod = get_pod_status(pod_id)
status = pod.get("desiredStatus", "UNKNOWN")
result = {"pod_id": pod_id, "initial_status": status, "action": "none"}
if pod.get("error"):
result["error"] = pod["error"]
result["action"] = "error"
return result
if status == "RUNNING":
# Verify Ollama is actually serving
if is_ollama_ready(pod_id):
result["action"] = "already_running"
result["final_status"] = "RUNNING"
touch_last_request()
return result
else:
# Pod is running but Ollama not ready yet — wait
result["action"] = "waiting_for_ollama"
elif status in ("EXITED", "STOPPED"):
# Resume the pod
resume_result = resume_pod(pod_id)
if "errors" in resume_result:
result["action"] = "resume_failed"
result["error"] = resume_result["errors"][0].get("message", "unknown")
return result
result["action"] = "resumed"
log_event("resume", pod_id, "auto-resume before inference")
else:
result["action"] = "unexpected_status"
result["final_status"] = status
return result
# Poll until Ollama is ready
attempts = 0
while attempts < (timeout_sec / POLL_INTERVAL_SEC):
time.sleep(POLL_INTERVAL_SEC)
attempts += 1
if is_ollama_ready(pod_id):
result["final_status"] = "RUNNING"
result["wait_seconds"] = attempts * POLL_INTERVAL_SEC
touch_last_request()
return result
result["action"] = "timeout"
result["final_status"] = get_pod_status(pod_id).get("desiredStatus", "UNKNOWN")
return result
def touch_request():
"""Public wrapper for updating the last request timestamp."""
touch_last_request()
def main():
import argparse
parser = argparse.ArgumentParser(description="Big Brain pod manager — auto-resume for inference")
parser.add_argument("--pod-id", default=DEFAULT_POD_ID, help="RunPod pod ID")
parser.add_argument("--ensure-running", action="store_true", help="Resume if stopped, wait for ready")
parser.add_argument("--status", action="store_true", help="Show pod status")
parser.add_argument("--stop", action="store_true", help="Stop the pod")
parser.add_argument("--resume", action="store_true", help="Resume the pod")
parser.add_argument("--touch", action="store_true", help="Update last request timestamp")
parser.add_argument("--json", dest="as_json", action="store_true", help="JSON output")
args = parser.parse_args()
if args.touch:
touch_request()
print(f"Timestamp updated: {TIMESTAMP_FILE}")
return
if args.ensure_running:
result = ensure_running(args.pod_id)
if args.as_json:
print(json.dumps(result, indent=2))
else:
action = result["action"]
if action == "already_running":
print(f"[Big Brain] Already RUNNING and Ollama ready")
elif action == "resumed":
wait = result.get("wait_seconds", 0)
print(f"[Big Brain] Resumed pod, Ollama ready in {wait}s")
elif action == "resume_failed":
print(f"[Big Brain] RESUME FAILED: {result.get('error', 'unknown')}", file=sys.stderr)
sys.exit(1)
elif action == "timeout":
print(f"[Big Brain] Timed out waiting for Ollama", file=sys.stderr)
sys.exit(1)
return
if args.stop:
result = stop_pod(args.pod_id)
if "errors" in result:
print(f"Stop failed: {result['errors'][0].get('message')}", file=sys.stderr)
sys.exit(1)
log_event("stop", args.pod_id, "manual stop")
print(f"Pod {args.pod_id} stopped")
return
if args.resume:
result = resume_pod(args.pod_id)
if "errors" in result:
print(f"Resume failed: {result['errors'][0].get('message')}", file=sys.stderr)
sys.exit(1)
log_event("resume", args.pod_id, "manual resume")
print(f"Pod {args.pod_id} resuming...")
return
# Default: status
pod = get_pod_status(args.pod_id)
last_req = get_last_request_time()
ollama_ok = is_ollama_ready(args.pod_id) if pod.get("desiredStatus") == "RUNNING" else False
if args.as_json:
print(json.dumps({
"pod": pod,
"ollama_ready": ollama_ok,
"last_request": last_req.isoformat() if last_req else None,
}, indent=2))
else:
print(f"Pod: {args.pod_id}")
print(f"Status: {pod.get('desiredStatus', 'UNKNOWN')}")
print(f"Ollama: {'ready' if ollama_ok else 'not responding'}")
print(f"Last request: {last_req.isoformat() if last_req else 'never'}")
print(f"Cost/hr: ${COST_PER_HOUR}")
if pod.get("uptimeSeconds"):
hrs = pod["uptimeSeconds"] / 3600
print(f"Uptime: {hrs:.1f} hrs (${hrs * COST_PER_HOUR:.2f})")
if __name__ == "__main__":
main()

345
tests/test_big_brain.py Normal file
View File

@@ -0,0 +1,345 @@
#!/usr/bin/env python3
"""
Big Brain Idle Watchdog + Manager Tests
Run:
python3 tests/test_big_brain.py
pytest tests/test_big_brain.py -v
"""
import json
import os
import sys
import tempfile
import shutil
from datetime import datetime, timezone, timedelta
from unittest.mock import patch, MagicMock
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import time
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts"))
# ---------------------------------------------------------------------------
# Test timestamp tracking
# ---------------------------------------------------------------------------
def test_touch_and_read_timestamp():
"""Write and read back last request timestamp."""
from big_brain_idle_watchdog import touch_last_request, get_last_request_time, TIMESTAMP_FILE
# Override timestamp file to temp location
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
assert wd.get_last_request_time() is None # No file yet
wd.touch_last_request()
ts = wd.get_last_request_time()
assert ts is not None
delta = datetime.now(timezone.utc) - ts
assert delta.total_seconds() < 5 # Within 5 seconds
print("PASS: test_touch_and_read_timestamp")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_idle_minutes_no_file():
"""No timestamp file = 0 idle (grace period)."""
from big_brain_idle_watchdog import get_idle_minutes, TIMESTAMP_FILE
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "nonexistent")
try:
assert wd.get_idle_minutes() == 0.0
print("PASS: test_idle_minutes_no_file")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_idle_minutes_old_timestamp():
"""Old timestamp = high idle minutes."""
from big_brain_idle_watchdog import get_idle_minutes
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
# Write a timestamp 45 minutes ago
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
with open(wd.TIMESTAMP_FILE, "w") as f:
f.write(old_time.isoformat())
idle = wd.get_idle_minutes()
assert 44 < idle < 46, f"Expected ~45 min idle, got {idle}"
print("PASS: test_idle_minutes_old_timestamp")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
# ---------------------------------------------------------------------------
# Test cost logging
# ---------------------------------------------------------------------------
def test_cost_log_write():
"""Write and verify a cost log entry."""
from big_brain_idle_watchdog import log_event, COST_LOG_FILE
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_log = wd.COST_LOG_FILE
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
try:
entry = wd.log_event("stop", "test-pod-123", "idle 45min", 45.2)
assert entry["event"] == "stop"
assert entry["pod_id"] == "test-pod-123"
assert entry["idle_minutes"] == 45.2
# Verify file was written
with open(wd.COST_LOG_FILE) as f:
lines = f.readlines()
assert len(lines) == 1
loaded = json.loads(lines[0])
assert loaded["event"] == "stop"
print("PASS: test_cost_log_write")
finally:
wd.COST_LOG_FILE = orig_log
shutil.rmtree(tmpdir)
# ---------------------------------------------------------------------------
# Test watchdog logic (mocked API)
# ---------------------------------------------------------------------------
def test_watchdog_stops_when_idle():
"""Pod RUNNING + idle > threshold → should stop."""
from big_brain_idle_watchdog import run_watchdog
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
orig_log = wd.COST_LOG_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
try:
# Set old timestamp
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
with open(wd.TIMESTAMP_FILE, "w") as f:
f.write(old_time.isoformat())
# Mock get_pod_status to return RUNNING
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
mock_stop = {"data": {"podStop": {"id": "test-pod", "desiredStatus": "STOPPED"}}}
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod), \
patch("big_brain_idle_watchdog.stop_pod", return_value=mock_stop):
result = wd.run_watchdog("test-pod", 30, dry_run=False)
assert result["action"] == "stopped"
assert result["idle_minutes"] > 30
print("PASS: test_watchdog_stops_when_idle")
finally:
wd.TIMESTAMP_FILE = orig_ts
wd.COST_LOG_FILE = orig_log
shutil.rmtree(tmpdir)
def test_watchdog_dry_run():
"""Dry run should NOT stop the pod."""
from big_brain_idle_watchdog import run_watchdog
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
with open(wd.TIMESTAMP_FILE, "w") as f:
f.write(old_time.isoformat())
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod), \
patch("big_brain_idle_watchdog.stop_pod") as mock_stop:
result = wd.run_watchdog("test-pod", 30, dry_run=True)
assert result["action"] == "would_stop"
mock_stop.assert_not_called()
print("PASS: test_watchdog_dry_run")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_watchdog_no_stop_when_active():
"""Pod RUNNING + idle < threshold → no action."""
from big_brain_idle_watchdog import run_watchdog
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
# Recent timestamp
wd.touch_last_request()
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
result = wd.run_watchdog("test-pod", 30, dry_run=False)
assert result["action"] == "active"
print("PASS: test_watchdog_no_stop_when_active")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_watchdog_already_stopped():
"""Pod already EXITED + idle > threshold → already_stopped."""
from big_brain_idle_watchdog import run_watchdog
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
with open(wd.TIMESTAMP_FILE, "w") as f:
f.write(old_time.isoformat())
mock_pod = {"id": "test-pod", "desiredStatus": "EXITED"}
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
result = wd.run_watchdog("test-pod", 30, dry_run=False)
assert result["action"] == "already_stopped"
print("PASS: test_watchdog_already_stopped")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_watchdog_api_error():
"""API error should be reported, not crash."""
from big_brain_idle_watchdog import run_watchdog
import big_brain_idle_watchdog as wd
mock_pod = {"error": "pod not found"}
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
result = wd.run_watchdog("bad-pod", 30, dry_run=False)
assert result["action"] == "error"
assert "error" in result
print("PASS: test_watchdog_api_error")
# ---------------------------------------------------------------------------
# Test manager auto-resume
# ---------------------------------------------------------------------------
def test_manager_already_running():
"""Pod already RUNNING + Ollama ready → no action needed."""
from big_brain_manager import ensure_running
import big_brain_manager as mgr
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
try:
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
with patch("big_brain_manager.get_pod_status", return_value=mock_pod), \
patch("big_brain_manager.is_ollama_ready", return_value=True):
result = mgr.ensure_running("test-pod")
assert result["action"] == "already_running"
assert result["final_status"] == "RUNNING"
print("PASS: test_manager_already_running")
finally:
wd.TIMESTAMP_FILE = orig_ts
shutil.rmtree(tmpdir)
def test_manager_resumes_stopped_pod():
"""Pod EXITED → should resume and wait."""
from big_brain_manager import ensure_running
import big_brain_manager as mgr
import big_brain_idle_watchdog as wd
tmpdir = tempfile.mkdtemp()
orig_ts = wd.TIMESTAMP_FILE
orig_log = wd.COST_LOG_FILE
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
try:
mock_pod = {"id": "test-pod", "desiredStatus": "EXITED"}
mock_resume = {"data": {"podResume": {"id": "test-pod", "desiredStatus": "RUNNING"}}}
with patch("big_brain_manager.get_pod_status", return_value=mock_pod), \
patch("big_brain_manager.resume_pod", return_value=mock_resume), \
patch("big_brain_manager.is_ollama_ready", return_value=True):
result = mgr.ensure_running("test-pod", timeout_sec=20)
assert result["action"] == "resumed"
print("PASS: test_manager_resumes_stopped_pod")
finally:
wd.TIMESTAMP_FILE = orig_ts
wd.COST_LOG_FILE = orig_log
shutil.rmtree(tmpdir)
# ---------------------------------------------------------------------------
# Runner
# ---------------------------------------------------------------------------
if __name__ == "__main__":
tests = [
test_touch_and_read_timestamp,
test_idle_minutes_no_file,
test_idle_minutes_old_timestamp,
test_cost_log_write,
test_watchdog_stops_when_idle,
test_watchdog_dry_run,
test_watchdog_no_stop_when_active,
test_watchdog_already_stopped,
test_watchdog_api_error,
test_manager_already_running,
test_manager_resumes_stopped_pod,
]
passed = 0
failed = 0
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f"FAIL: {test.__name__}: {e}")
import traceback
traceback.print_exc()
failed += 1
print(f"\n{'='*40}")
print(f"Results: {passed} passed, {failed} failed, {len(tests)} total")
if failed > 0:
sys.exit(1)