Compare commits
1 Commits
fix/551
...
burn/577-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 4093c74c19 |
295
scripts/big_brain_idle_watchdog.py
Normal file
295
scripts/big_brain_idle_watchdog.py
Normal file
@@ -0,0 +1,295 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Big Brain Idle Watchdog — Auto-stop RunPod pod when idle.
|
||||
|
||||
Checks the last request timestamp to the Big Brain (RunPod L40S Ollama endpoint).
|
||||
If idle > threshold (default 30 min), stops the pod via RunPod GraphQL API.
|
||||
|
||||
Cost: $0.79/hr for L40S. Forgetting the pod for 24h = $19.
|
||||
|
||||
Usage:
|
||||
python3 scripts/big_brain_idle_watchdog.py # check + act
|
||||
python3 scripts/big_brain_idle_watchdog.py --dry-run # check only
|
||||
python3 scripts/big_brain_idle_watchdog.py --threshold 60 # custom idle min
|
||||
python3 scripts/big_brain_idle_watchdog.py --status # show pod status
|
||||
python3 scripts/big_brain_idle_watchdog.py --json # machine-readable
|
||||
|
||||
Environment:
|
||||
RUNPOD_API_KEY — RunPod API key (default: reads from ~/.config/runpod/access_key)
|
||||
|
||||
Timestamp file:
|
||||
~/.hermes/big_brain_last_request — updated by hermes when inference hits big_brain
|
||||
If missing, watchdog creates it with current time (grace period).
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
RUNPOD_KEY_FILE = os.path.expanduser("~/.config/runpod/access_key")
|
||||
RUNPOD_API = "https://api.runpod.io/graphql"
|
||||
TIMESTAMP_FILE = os.path.expanduser("~/.hermes/big_brain_last_request")
|
||||
COST_LOG_FILE = os.path.expanduser("~/.hermes/big_brain_cost_log.jsonl")
|
||||
DEFAULT_POD_ID = "8lfr3j47a5r3gn"
|
||||
DEFAULT_THRESHOLD_MIN = 30
|
||||
|
||||
# Approximate cost per hour for L40S
|
||||
COST_PER_HOUR = 0.79
|
||||
|
||||
|
||||
def get_runpod_key() -> str:
|
||||
"""Read RunPod API key from file or env."""
|
||||
key = os.environ.get("RUNPOD_API_KEY")
|
||||
if key:
|
||||
return key.strip()
|
||||
try:
|
||||
return open(RUNPOD_KEY_FILE).read().strip()
|
||||
except FileNotFoundError:
|
||||
print(f"Error: RunPod API key not found at {RUNPOD_KEY_FILE} or RUNPOD_API_KEY env", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def runpod_graphql(query: str, key: str = None) -> dict:
|
||||
"""Execute a RunPod GraphQL query/mutation."""
|
||||
if key is None:
|
||||
key = get_runpod_key()
|
||||
req = urllib.request.Request(
|
||||
RUNPOD_API,
|
||||
data=json.dumps({"query": query}).encode(),
|
||||
headers={
|
||||
"Authorization": f"Bearer {key}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read())
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode()
|
||||
return {"errors": [{"message": body, "code": e.code}]}
|
||||
except Exception as e:
|
||||
return {"errors": [{"message": str(e)}]}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Pod status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_pod_status(pod_id: str) -> dict:
|
||||
"""Get pod status via GraphQL."""
|
||||
query = f"""
|
||||
query {{
|
||||
pod(id: "{pod_id}") {{
|
||||
id
|
||||
name
|
||||
desiredStatus
|
||||
machineId
|
||||
costPerHr
|
||||
gpuCount
|
||||
uptimeSeconds
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
result = runpod_graphql(query)
|
||||
if "errors" in result:
|
||||
return {"error": result["errors"][0].get("message", "unknown")}
|
||||
return result.get("data", {}).get("pod", {})
|
||||
|
||||
|
||||
def stop_pod(pod_id: str) -> dict:
|
||||
"""Stop a pod via podStop mutation."""
|
||||
query = f"""
|
||||
mutation {{
|
||||
podStop(input: {{ podId: "{pod_id}" }}) {{
|
||||
id
|
||||
desiredStatus
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
return runpod_graphql(query)
|
||||
|
||||
|
||||
def resume_pod(pod_id: str) -> dict:
|
||||
"""Resume a stopped pod via podResume mutation."""
|
||||
query = f"""
|
||||
mutation {{
|
||||
podResume(input: {{ podId: "{pod_id}" }}) {{
|
||||
id
|
||||
desiredStatus
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
return runpod_graphql(query)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Idle tracking
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def get_last_request_time() -> datetime | None:
|
||||
"""Read last request timestamp from file."""
|
||||
try:
|
||||
ts_str = open(TIMESTAMP_FILE).read().strip()
|
||||
return datetime.fromisoformat(ts_str)
|
||||
except (FileNotFoundError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def touch_last_request():
|
||||
"""Update last request timestamp to now."""
|
||||
os.makedirs(os.path.dirname(TIMESTAMP_FILE), exist_ok=True)
|
||||
with open(TIMESTAMP_FILE, "w") as f:
|
||||
f.write(datetime.now(timezone.utc).isoformat())
|
||||
|
||||
|
||||
def get_idle_minutes() -> float:
|
||||
"""Get minutes since last request."""
|
||||
last = get_last_request_time()
|
||||
if last is None:
|
||||
return 0.0 # No timestamp = treat as just-active (grace)
|
||||
delta = datetime.now(timezone.utc) - last
|
||||
return delta.total_seconds() / 60.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cost logging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def log_event(event_type: str, pod_id: str, reason: str = "", idle_minutes: float = 0):
|
||||
"""Append a cost log entry."""
|
||||
os.makedirs(os.path.dirname(COST_LOG_FILE), exist_ok=True)
|
||||
entry = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"event": event_type,
|
||||
"pod_id": pod_id,
|
||||
"reason": reason,
|
||||
"idle_minutes": round(idle_minutes, 1),
|
||||
"cost_per_hour": COST_PER_HOUR,
|
||||
}
|
||||
with open(COST_LOG_FILE, "a") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
return entry
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_watchdog(pod_id: str, threshold_min: int, dry_run: bool = False) -> dict:
|
||||
"""
|
||||
Main watchdog logic.
|
||||
|
||||
Returns a summary dict for reporting.
|
||||
"""
|
||||
idle_min = get_idle_minutes()
|
||||
pod = get_pod_status(pod_id)
|
||||
current_status = pod.get("desiredStatus", "UNKNOWN")
|
||||
error = pod.get("error")
|
||||
|
||||
result = {
|
||||
"pod_id": pod_id,
|
||||
"pod_status": current_status,
|
||||
"idle_minutes": round(idle_min, 1),
|
||||
"threshold_minutes": threshold_min,
|
||||
"action": "none",
|
||||
"error": error,
|
||||
}
|
||||
|
||||
if error:
|
||||
result["action"] = "error"
|
||||
return result
|
||||
|
||||
# Only act if pod is RUNNING and idle exceeds threshold
|
||||
if current_status == "RUNNING" and idle_min > threshold_min:
|
||||
reason = f"idle {idle_min:.0f}min > {threshold_min}min threshold"
|
||||
if dry_run:
|
||||
result["action"] = "would_stop"
|
||||
result["reason"] = reason
|
||||
else:
|
||||
stop_result = stop_pod(pod_id)
|
||||
if "errors" in stop_result:
|
||||
result["action"] = "stop_failed"
|
||||
result["error"] = stop_result["errors"][0].get("message", "unknown")
|
||||
else:
|
||||
result["action"] = "stopped"
|
||||
result["reason"] = reason
|
||||
log_event("stop", pod_id, reason, idle_min)
|
||||
elif current_status == "EXITED" and idle_min > threshold_min:
|
||||
result["action"] = "already_stopped"
|
||||
elif current_status == "RUNNING" and idle_min <= threshold_min:
|
||||
result["action"] = "active"
|
||||
else:
|
||||
result["action"] = "no_action"
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Big Brain idle watchdog — auto-stop RunPod pod")
|
||||
parser.add_argument("--pod-id", default=DEFAULT_POD_ID, help="RunPod pod ID")
|
||||
parser.add_argument("--threshold", type=int, default=DEFAULT_THRESHOLD_MIN, help="Idle minutes before stop")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Check only, don't stop")
|
||||
parser.add_argument("--status", action="store_true", help="Show pod status only")
|
||||
parser.add_argument("--json", dest="as_json", action="store_true", help="JSON output")
|
||||
parser.add_argument("--touch", action="store_true", help="Update last request timestamp to now")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.touch:
|
||||
touch_last_request()
|
||||
print(f"Last request timestamp updated to now ({TIMESTAMP_FILE})")
|
||||
return
|
||||
|
||||
if args.status:
|
||||
pod = get_pod_status(args.pod_id)
|
||||
idle_min = get_idle_minutes()
|
||||
last_req = get_last_request_time()
|
||||
if args.as_json:
|
||||
print(json.dumps({"pod": pod, "idle_minutes": round(idle_min, 1), "last_request": last_req.isoformat() if last_req else None}, indent=2))
|
||||
else:
|
||||
print(f"Pod: {args.pod_id}")
|
||||
print(f"Status: {pod.get('desiredStatus', 'UNKNOWN')}")
|
||||
print(f"Idle: {idle_min:.1f} min")
|
||||
print(f"Last req: {last_req.isoformat() if last_req else 'never (grace period)'}")
|
||||
print(f"Cost/hr: ${COST_PER_HOUR}")
|
||||
if pod.get("uptimeSeconds"):
|
||||
hrs = pod["uptimeSeconds"] / 3600
|
||||
print(f"Uptime: {hrs:.1f} hrs (${hrs * COST_PER_HOUR:.2f})")
|
||||
return
|
||||
|
||||
result = run_watchdog(args.pod_id, args.threshold, args.dry_run)
|
||||
|
||||
if args.as_json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
status_emoji = {
|
||||
"none": "---",
|
||||
"active": "ACTIVE",
|
||||
"already_stopped": "STOPPED",
|
||||
"stopped": "STOPPED",
|
||||
"would_stop": "WOULD STOP",
|
||||
"stop_failed": "STOP FAILED",
|
||||
"error": "ERROR",
|
||||
"no_action": "OK",
|
||||
}
|
||||
label = status_emoji.get(result["action"], result["action"])
|
||||
print(f"[Big Brain] {label} | pod={result['pod_status']} | idle={result['idle_minutes']}m | threshold={result['threshold_minutes']}m")
|
||||
if result.get("reason"):
|
||||
print(f" Reason: {result['reason']}")
|
||||
if result.get("error"):
|
||||
print(f" Error: {result['error']}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
194
scripts/big_brain_manager.py
Normal file
194
scripts/big_brain_manager.py
Normal file
@@ -0,0 +1,194 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Big Brain Manager — Auto-resume pod before inference.
|
||||
|
||||
Import this module or run standalone to ensure the Big Brain pod is
|
||||
RUNNING before sending inference requests. If the pod is stopped,
|
||||
resumes it and polls until ready.
|
||||
|
||||
Usage:
|
||||
python3 scripts/big_brain_manager.py --ensure-running # resume if stopped
|
||||
python3 scripts/big_brain_manager.py --status # just check status
|
||||
python3 scripts/big_brain_manager.py --stop # manually stop
|
||||
python3 scripts/big_brain_manager.py --resume # manually resume
|
||||
python3 scripts/big_brain_manager.py --json # machine-readable
|
||||
|
||||
As a library:
|
||||
from big_brain_manager import ensure_running, touch_request
|
||||
ensure_running() # blocks until pod is RUNNING
|
||||
touch_request() # update idle timestamp
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone
|
||||
|
||||
# Reuse watchdog internals
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from big_brain_idle_watchdog import (
|
||||
get_pod_status, stop_pod, resume_pod, get_runpod_key,
|
||||
touch_last_request, get_last_request_time, log_event,
|
||||
DEFAULT_POD_ID, COST_PER_HOUR, TIMESTAMP_FILE,
|
||||
)
|
||||
|
||||
OLLAMA_ENDPOINT = "https://{pod_id}-11434.proxy.runpod.net"
|
||||
POLL_INTERVAL_SEC = 10
|
||||
MAX_POLL_ATTEMPTS = 30 # 5 minutes max wait
|
||||
|
||||
|
||||
def is_ollama_ready(pod_id: str) -> bool:
|
||||
"""Check if Ollama is responding on the pod endpoint."""
|
||||
url = OLLAMA_ENDPOINT.format(pod_id=pod_id) + "/api/tags"
|
||||
try:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
return resp.status == 200
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def ensure_running(pod_id: str = DEFAULT_POD_ID, timeout_sec: int = 300) -> dict:
|
||||
"""
|
||||
Ensure the Big Brain pod is running. Resume if stopped, wait until ready.
|
||||
|
||||
Returns status dict with action taken and final state.
|
||||
"""
|
||||
pod = get_pod_status(pod_id)
|
||||
status = pod.get("desiredStatus", "UNKNOWN")
|
||||
result = {"pod_id": pod_id, "initial_status": status, "action": "none"}
|
||||
|
||||
if pod.get("error"):
|
||||
result["error"] = pod["error"]
|
||||
result["action"] = "error"
|
||||
return result
|
||||
|
||||
if status == "RUNNING":
|
||||
# Verify Ollama is actually serving
|
||||
if is_ollama_ready(pod_id):
|
||||
result["action"] = "already_running"
|
||||
result["final_status"] = "RUNNING"
|
||||
touch_last_request()
|
||||
return result
|
||||
else:
|
||||
# Pod is running but Ollama not ready yet — wait
|
||||
result["action"] = "waiting_for_ollama"
|
||||
|
||||
elif status in ("EXITED", "STOPPED"):
|
||||
# Resume the pod
|
||||
resume_result = resume_pod(pod_id)
|
||||
if "errors" in resume_result:
|
||||
result["action"] = "resume_failed"
|
||||
result["error"] = resume_result["errors"][0].get("message", "unknown")
|
||||
return result
|
||||
result["action"] = "resumed"
|
||||
log_event("resume", pod_id, "auto-resume before inference")
|
||||
|
||||
else:
|
||||
result["action"] = "unexpected_status"
|
||||
result["final_status"] = status
|
||||
return result
|
||||
|
||||
# Poll until Ollama is ready
|
||||
attempts = 0
|
||||
while attempts < (timeout_sec / POLL_INTERVAL_SEC):
|
||||
time.sleep(POLL_INTERVAL_SEC)
|
||||
attempts += 1
|
||||
if is_ollama_ready(pod_id):
|
||||
result["final_status"] = "RUNNING"
|
||||
result["wait_seconds"] = attempts * POLL_INTERVAL_SEC
|
||||
touch_last_request()
|
||||
return result
|
||||
|
||||
result["action"] = "timeout"
|
||||
result["final_status"] = get_pod_status(pod_id).get("desiredStatus", "UNKNOWN")
|
||||
return result
|
||||
|
||||
|
||||
def touch_request():
|
||||
"""Public wrapper for updating the last request timestamp."""
|
||||
touch_last_request()
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Big Brain pod manager — auto-resume for inference")
|
||||
parser.add_argument("--pod-id", default=DEFAULT_POD_ID, help="RunPod pod ID")
|
||||
parser.add_argument("--ensure-running", action="store_true", help="Resume if stopped, wait for ready")
|
||||
parser.add_argument("--status", action="store_true", help="Show pod status")
|
||||
parser.add_argument("--stop", action="store_true", help="Stop the pod")
|
||||
parser.add_argument("--resume", action="store_true", help="Resume the pod")
|
||||
parser.add_argument("--touch", action="store_true", help="Update last request timestamp")
|
||||
parser.add_argument("--json", dest="as_json", action="store_true", help="JSON output")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.touch:
|
||||
touch_request()
|
||||
print(f"Timestamp updated: {TIMESTAMP_FILE}")
|
||||
return
|
||||
|
||||
if args.ensure_running:
|
||||
result = ensure_running(args.pod_id)
|
||||
if args.as_json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
action = result["action"]
|
||||
if action == "already_running":
|
||||
print(f"[Big Brain] Already RUNNING and Ollama ready")
|
||||
elif action == "resumed":
|
||||
wait = result.get("wait_seconds", 0)
|
||||
print(f"[Big Brain] Resumed pod, Ollama ready in {wait}s")
|
||||
elif action == "resume_failed":
|
||||
print(f"[Big Brain] RESUME FAILED: {result.get('error', 'unknown')}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
elif action == "timeout":
|
||||
print(f"[Big Brain] Timed out waiting for Ollama", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return
|
||||
|
||||
if args.stop:
|
||||
result = stop_pod(args.pod_id)
|
||||
if "errors" in result:
|
||||
print(f"Stop failed: {result['errors'][0].get('message')}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
log_event("stop", args.pod_id, "manual stop")
|
||||
print(f"Pod {args.pod_id} stopped")
|
||||
return
|
||||
|
||||
if args.resume:
|
||||
result = resume_pod(args.pod_id)
|
||||
if "errors" in result:
|
||||
print(f"Resume failed: {result['errors'][0].get('message')}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
log_event("resume", args.pod_id, "manual resume")
|
||||
print(f"Pod {args.pod_id} resuming...")
|
||||
return
|
||||
|
||||
# Default: status
|
||||
pod = get_pod_status(args.pod_id)
|
||||
last_req = get_last_request_time()
|
||||
ollama_ok = is_ollama_ready(args.pod_id) if pod.get("desiredStatus") == "RUNNING" else False
|
||||
|
||||
if args.as_json:
|
||||
print(json.dumps({
|
||||
"pod": pod,
|
||||
"ollama_ready": ollama_ok,
|
||||
"last_request": last_req.isoformat() if last_req else None,
|
||||
}, indent=2))
|
||||
else:
|
||||
print(f"Pod: {args.pod_id}")
|
||||
print(f"Status: {pod.get('desiredStatus', 'UNKNOWN')}")
|
||||
print(f"Ollama: {'ready' if ollama_ok else 'not responding'}")
|
||||
print(f"Last request: {last_req.isoformat() if last_req else 'never'}")
|
||||
print(f"Cost/hr: ${COST_PER_HOUR}")
|
||||
if pod.get("uptimeSeconds"):
|
||||
hrs = pod["uptimeSeconds"] / 3600
|
||||
print(f"Uptime: {hrs:.1f} hrs (${hrs * COST_PER_HOUR:.2f})")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
345
tests/test_big_brain.py
Normal file
345
tests/test_big_brain.py
Normal file
@@ -0,0 +1,345 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Big Brain Idle Watchdog + Manager Tests
|
||||
|
||||
Run:
|
||||
python3 tests/test_big_brain.py
|
||||
pytest tests/test_big_brain.py -v
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import shutil
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from unittest.mock import patch, MagicMock
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
import threading
|
||||
import time
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts"))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test timestamp tracking
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_touch_and_read_timestamp():
|
||||
"""Write and read back last request timestamp."""
|
||||
from big_brain_idle_watchdog import touch_last_request, get_last_request_time, TIMESTAMP_FILE
|
||||
|
||||
# Override timestamp file to temp location
|
||||
import big_brain_idle_watchdog as wd
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
assert wd.get_last_request_time() is None # No file yet
|
||||
wd.touch_last_request()
|
||||
ts = wd.get_last_request_time()
|
||||
assert ts is not None
|
||||
delta = datetime.now(timezone.utc) - ts
|
||||
assert delta.total_seconds() < 5 # Within 5 seconds
|
||||
print("PASS: test_touch_and_read_timestamp")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_idle_minutes_no_file():
|
||||
"""No timestamp file = 0 idle (grace period)."""
|
||||
from big_brain_idle_watchdog import get_idle_minutes, TIMESTAMP_FILE
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "nonexistent")
|
||||
|
||||
try:
|
||||
assert wd.get_idle_minutes() == 0.0
|
||||
print("PASS: test_idle_minutes_no_file")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_idle_minutes_old_timestamp():
|
||||
"""Old timestamp = high idle minutes."""
|
||||
from big_brain_idle_watchdog import get_idle_minutes
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
# Write a timestamp 45 minutes ago
|
||||
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
|
||||
with open(wd.TIMESTAMP_FILE, "w") as f:
|
||||
f.write(old_time.isoformat())
|
||||
|
||||
idle = wd.get_idle_minutes()
|
||||
assert 44 < idle < 46, f"Expected ~45 min idle, got {idle}"
|
||||
print("PASS: test_idle_minutes_old_timestamp")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test cost logging
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_cost_log_write():
|
||||
"""Write and verify a cost log entry."""
|
||||
from big_brain_idle_watchdog import log_event, COST_LOG_FILE
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_log = wd.COST_LOG_FILE
|
||||
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
|
||||
|
||||
try:
|
||||
entry = wd.log_event("stop", "test-pod-123", "idle 45min", 45.2)
|
||||
assert entry["event"] == "stop"
|
||||
assert entry["pod_id"] == "test-pod-123"
|
||||
assert entry["idle_minutes"] == 45.2
|
||||
|
||||
# Verify file was written
|
||||
with open(wd.COST_LOG_FILE) as f:
|
||||
lines = f.readlines()
|
||||
assert len(lines) == 1
|
||||
loaded = json.loads(lines[0])
|
||||
assert loaded["event"] == "stop"
|
||||
print("PASS: test_cost_log_write")
|
||||
finally:
|
||||
wd.COST_LOG_FILE = orig_log
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test watchdog logic (mocked API)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_watchdog_stops_when_idle():
|
||||
"""Pod RUNNING + idle > threshold → should stop."""
|
||||
from big_brain_idle_watchdog import run_watchdog
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
orig_log = wd.COST_LOG_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
|
||||
|
||||
try:
|
||||
# Set old timestamp
|
||||
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
|
||||
with open(wd.TIMESTAMP_FILE, "w") as f:
|
||||
f.write(old_time.isoformat())
|
||||
|
||||
# Mock get_pod_status to return RUNNING
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
|
||||
mock_stop = {"data": {"podStop": {"id": "test-pod", "desiredStatus": "STOPPED"}}}
|
||||
|
||||
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod), \
|
||||
patch("big_brain_idle_watchdog.stop_pod", return_value=mock_stop):
|
||||
result = wd.run_watchdog("test-pod", 30, dry_run=False)
|
||||
|
||||
assert result["action"] == "stopped"
|
||||
assert result["idle_minutes"] > 30
|
||||
print("PASS: test_watchdog_stops_when_idle")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
wd.COST_LOG_FILE = orig_log
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_watchdog_dry_run():
|
||||
"""Dry run should NOT stop the pod."""
|
||||
from big_brain_idle_watchdog import run_watchdog
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
|
||||
with open(wd.TIMESTAMP_FILE, "w") as f:
|
||||
f.write(old_time.isoformat())
|
||||
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
|
||||
|
||||
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod), \
|
||||
patch("big_brain_idle_watchdog.stop_pod") as mock_stop:
|
||||
result = wd.run_watchdog("test-pod", 30, dry_run=True)
|
||||
|
||||
assert result["action"] == "would_stop"
|
||||
mock_stop.assert_not_called()
|
||||
print("PASS: test_watchdog_dry_run")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_watchdog_no_stop_when_active():
|
||||
"""Pod RUNNING + idle < threshold → no action."""
|
||||
from big_brain_idle_watchdog import run_watchdog
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
# Recent timestamp
|
||||
wd.touch_last_request()
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
|
||||
|
||||
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
|
||||
result = wd.run_watchdog("test-pod", 30, dry_run=False)
|
||||
|
||||
assert result["action"] == "active"
|
||||
print("PASS: test_watchdog_no_stop_when_active")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_watchdog_already_stopped():
|
||||
"""Pod already EXITED + idle > threshold → already_stopped."""
|
||||
from big_brain_idle_watchdog import run_watchdog
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
old_time = datetime.now(timezone.utc) - timedelta(minutes=45)
|
||||
with open(wd.TIMESTAMP_FILE, "w") as f:
|
||||
f.write(old_time.isoformat())
|
||||
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "EXITED"}
|
||||
|
||||
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
|
||||
result = wd.run_watchdog("test-pod", 30, dry_run=False)
|
||||
|
||||
assert result["action"] == "already_stopped"
|
||||
print("PASS: test_watchdog_already_stopped")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_watchdog_api_error():
|
||||
"""API error should be reported, not crash."""
|
||||
from big_brain_idle_watchdog import run_watchdog
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
mock_pod = {"error": "pod not found"}
|
||||
|
||||
with patch("big_brain_idle_watchdog.get_pod_status", return_value=mock_pod):
|
||||
result = wd.run_watchdog("bad-pod", 30, dry_run=False)
|
||||
|
||||
assert result["action"] == "error"
|
||||
assert "error" in result
|
||||
print("PASS: test_watchdog_api_error")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test manager auto-resume
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_manager_already_running():
|
||||
"""Pod already RUNNING + Ollama ready → no action needed."""
|
||||
from big_brain_manager import ensure_running
|
||||
import big_brain_manager as mgr
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
|
||||
try:
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "RUNNING"}
|
||||
|
||||
with patch("big_brain_manager.get_pod_status", return_value=mock_pod), \
|
||||
patch("big_brain_manager.is_ollama_ready", return_value=True):
|
||||
result = mgr.ensure_running("test-pod")
|
||||
|
||||
assert result["action"] == "already_running"
|
||||
assert result["final_status"] == "RUNNING"
|
||||
print("PASS: test_manager_already_running")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def test_manager_resumes_stopped_pod():
|
||||
"""Pod EXITED → should resume and wait."""
|
||||
from big_brain_manager import ensure_running
|
||||
import big_brain_manager as mgr
|
||||
import big_brain_idle_watchdog as wd
|
||||
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
orig_ts = wd.TIMESTAMP_FILE
|
||||
orig_log = wd.COST_LOG_FILE
|
||||
wd.TIMESTAMP_FILE = os.path.join(tmpdir, "last_request")
|
||||
wd.COST_LOG_FILE = os.path.join(tmpdir, "cost_log.jsonl")
|
||||
|
||||
try:
|
||||
mock_pod = {"id": "test-pod", "desiredStatus": "EXITED"}
|
||||
mock_resume = {"data": {"podResume": {"id": "test-pod", "desiredStatus": "RUNNING"}}}
|
||||
|
||||
with patch("big_brain_manager.get_pod_status", return_value=mock_pod), \
|
||||
patch("big_brain_manager.resume_pod", return_value=mock_resume), \
|
||||
patch("big_brain_manager.is_ollama_ready", return_value=True):
|
||||
result = mgr.ensure_running("test-pod", timeout_sec=20)
|
||||
|
||||
assert result["action"] == "resumed"
|
||||
print("PASS: test_manager_resumes_stopped_pod")
|
||||
finally:
|
||||
wd.TIMESTAMP_FILE = orig_ts
|
||||
wd.COST_LOG_FILE = orig_log
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [
|
||||
test_touch_and_read_timestamp,
|
||||
test_idle_minutes_no_file,
|
||||
test_idle_minutes_old_timestamp,
|
||||
test_cost_log_write,
|
||||
test_watchdog_stops_when_idle,
|
||||
test_watchdog_dry_run,
|
||||
test_watchdog_no_stop_when_active,
|
||||
test_watchdog_already_stopped,
|
||||
test_watchdog_api_error,
|
||||
test_manager_already_running,
|
||||
test_manager_resumes_stopped_pod,
|
||||
]
|
||||
|
||||
passed = 0
|
||||
failed = 0
|
||||
for test in tests:
|
||||
try:
|
||||
test()
|
||||
passed += 1
|
||||
except Exception as e:
|
||||
print(f"FAIL: {test.__name__}: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
failed += 1
|
||||
|
||||
print(f"\n{'='*40}")
|
||||
print(f"Results: {passed} passed, {failed} failed, {len(tests)} total")
|
||||
if failed > 0:
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user