Compare commits
2 Commits
fix/708
...
feat/a2a-h
| Author | SHA1 | Date | |
|---|---|---|---|
| 2fba6dade4 | |||
| d7670b98cf |
@@ -1,122 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Atlas Inference Engine Evaluation on RunPod L40S."""
|
||||
|
||||
import argparse, json, os, sys, time, urllib.request, urllib.error
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
RUNPOD_API = "https://api.runpod.io/graphql"
|
||||
POD_NAME = "atlas-eval-l40s"
|
||||
ATLAS_IMAGE = "avarok/atlas-gb10:alpha-2.8"
|
||||
MODEL = "Qwen/Qwen3.5-35B-A3B-NVFP4"
|
||||
COST_LOG = Path.home() / ".hermes" / "atlas_eval_log.jsonl"
|
||||
|
||||
def load_key():
|
||||
k = os.environ.get("RUNPOD_API_KEY", "")
|
||||
if k: return k.strip()
|
||||
p = Path.home() / ".config" / "runpod" / "access_key"
|
||||
if p.exists(): return p.read_text().strip()
|
||||
print("ERROR: No RunPod key"); sys.exit(1)
|
||||
|
||||
def gql(query):
|
||||
req = urllib.request.Request(RUNPOD_API,
|
||||
data=json.dumps({"query": query}).encode(),
|
||||
headers={"Authorization": f"Bearer {load_key()}", "Content-Type": "application/json"},
|
||||
method="POST")
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as r:
|
||||
return json.loads(r.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"Error: {e.read().decode()[:300]}"); return None
|
||||
|
||||
def find_pod():
|
||||
r = gql("{ myself { pods { id name desiredStatus costPerHr gpuCount runtime { uptimeInSeconds } } } }")
|
||||
if r and r.get("data"):
|
||||
for p in r["data"]["myself"]["pods"]:
|
||||
if p["name"] == POD_NAME: return p
|
||||
return None
|
||||
|
||||
def deploy():
|
||||
existing = find_pod()
|
||||
if existing:
|
||||
print(f"Exists: {existing['id']} ({existing['desiredStatus']})")
|
||||
if existing["desiredStatus"] == "STOPPED":
|
||||
gql(f'mutation {{ podResume(input: {{ podId: "{existing["id"]}" }}) {{ id }} }}')
|
||||
print("Resuming...")
|
||||
return existing["id"]
|
||||
q = 'mutation { podFindAndDeployOnDemand(input: { cloudType: COMMUNITY, gpuCount: 1, gpuTypeId: "NVIDIA L40S", name: "' + POD_NAME + '", containerDiskInGb: 50, imageName: "' + ATLAS_IMAGE + '", ports: "8888/http", volumeInGb: 100, volumeMountPath: "/workspace" }) { id desiredStatus } }'
|
||||
r = gql(q)
|
||||
if r and r.get("data"):
|
||||
pod = r["data"]["podFindAndDeployOnDemand"]
|
||||
print(f"Deployed: {pod['id']} -> https://{pod['id']}-8888.proxy.runpod.net")
|
||||
return pod["id"]
|
||||
print("Deploy failed")
|
||||
|
||||
def status():
|
||||
pod = find_pod()
|
||||
if not pod: print("No pod"); return
|
||||
print(f"ID: {pod['id']}\nStatus: {pod['desiredStatus']}\nCost: ${pod['costPerHr']}/hr\nEndpoint: https://{pod['id']}-8888.proxy.runpod.net")
|
||||
u = pod.get("runtime", {}).get("uptimeInSeconds", 0)
|
||||
if u: print(f"Uptime: {u//3600}h {(u%3600)//60}m")
|
||||
|
||||
def benchmark():
|
||||
pod = find_pod()
|
||||
if not pod or pod["desiredStatus"] != "RUNNING":
|
||||
print("Pod not running"); return
|
||||
ep = f"https://{pod['id']}-8888.proxy.runpod.net/v1"
|
||||
print(f"Benchmarking: {ep}")
|
||||
prompts = [
|
||||
"Explain sovereign AI in 100 words.",
|
||||
"Write quicksort in Python.",
|
||||
"Compare transformers vs state space models.",
|
||||
"Describe MoE architecture.",
|
||||
"Write a Dockerfile for Flask+Redis.",
|
||||
]
|
||||
results = []
|
||||
for i, p in enumerate(prompts):
|
||||
print(f"\n[{i+1}/5] {p[:40]}...")
|
||||
start = time.time()
|
||||
try:
|
||||
payload = json.dumps({"model": MODEL, "messages": [{"role": "user", "content": p}], "max_tokens": 512}).encode()
|
||||
req = urllib.request.Request(f"{ep}/chat/completions", data=payload,
|
||||
headers={"Content-Type": "application/json", "Authorization": "Bearer dummy"}, method="POST")
|
||||
with urllib.request.urlopen(req, timeout=120) as resp:
|
||||
r = json.loads(resp.read().decode())
|
||||
elapsed = time.time() - start
|
||||
usage = r.get("usage", {})
|
||||
tps = usage.get("completion_tokens", 0) / elapsed if elapsed > 0 else 0
|
||||
results.append({"prompt": i, "tok_per_sec": round(tps, 2), "tokens": usage.get("completion_tokens", 0)})
|
||||
print(f" {usage.get('completion_tokens', 0)} tokens / {elapsed:.1f}s = {tps:.1f} tok/s")
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
results.append({"prompt": i, "error": str(e)})
|
||||
ok = [r for r in results if "tok_per_sec" in r]
|
||||
if ok:
|
||||
avg = sum(r["tok_per_sec"] for r in ok) / len(ok)
|
||||
print(f"\nAvg: {avg:.1f} tok/s | Min: {min(r['tok_per_sec'] for r in ok):.1f} | Max: {max(r['tok_per_sec'] for r in ok):.1f}")
|
||||
COST_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(COST_LOG, "a") as f:
|
||||
f.write(json.dumps({"ts": datetime.now(timezone.utc).isoformat(), "avg_tps": round(avg, 2), "results": results}) + "\n")
|
||||
|
||||
def stop():
|
||||
pod = find_pod()
|
||||
if not pod: print("No pod"); return
|
||||
gql(f'mutation {{ podStop(input: {{ podId: "{pod["id"]}" }}) {{ id }} }}')
|
||||
print(f"Stopped: {pod['id']}")
|
||||
|
||||
def terminate():
|
||||
pod = find_pod()
|
||||
if not pod: print("No pod"); return
|
||||
gql(f'mutation {{ podTerminate(input: {{ podId: "{pod["id"]}" }}) }}')
|
||||
print(f"Terminated: {pod['id']}")
|
||||
|
||||
def main():
|
||||
p = argparse.ArgumentParser(description="Atlas L40S Eval")
|
||||
sub = p.add_subparsers(dest="cmd")
|
||||
sub.add_parser("deploy"); sub.add_parser("status"); sub.add_parser("benchmark")
|
||||
sub.add_parser("stop"); sub.add_parser("terminate")
|
||||
args = p.parse_args()
|
||||
if not args.cmd: p.print_help(); sys.exit(1)
|
||||
{"deploy": deploy, "status": status, "benchmark": benchmark, "stop": stop, "terminate": terminate}[args.cmd]()
|
||||
|
||||
if __name__ == "__main__": main()
|
||||
292
hermes_cli/a2a_health.py
Normal file
292
hermes_cli/a2a_health.py
Normal file
@@ -0,0 +1,292 @@
|
||||
"""A2A Health Check — periodic heartbeat monitoring for fleet agents.
|
||||
|
||||
Pings each registered agent's A2A endpoint, records response time and status,
|
||||
tracks consecutive failures, and fires alerts when thresholds are breached.
|
||||
|
||||
Usage:
|
||||
from hermes_cli.a2a_health import HealthMonitor
|
||||
monitor = HealthMonitor()
|
||||
results = monitor.check_all()
|
||||
monitor.print_dashboard(results)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
|
||||
STATE_FILE = os.path.expanduser("~/.hermes/a2a_health.json")
|
||||
FLEET_FILE = os.path.expanduser("~/.hermes/fleet_agents.json")
|
||||
|
||||
# Thresholds
|
||||
CONSECUTIVE_FAILURE_THRESHOLD = 3
|
||||
RESPONSE_TIME_THRESHOLD_SEC = 10.0
|
||||
CHECK_INTERVAL_SEC = 300 # 5 minutes
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentStatus:
|
||||
name: str
|
||||
url: str
|
||||
last_check: Optional[str] = None
|
||||
last_healthy: Optional[str] = None
|
||||
response_time_ms: Optional[float] = None
|
||||
status: str = "unknown" # healthy | degraded | down | unknown
|
||||
consecutive_failures: int = 0
|
||||
available_tools: int = 0
|
||||
agent_status: str = ""
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthState:
|
||||
agents: dict = field(default_factory=dict) # name -> AgentStatus dict
|
||||
last_full_check: Optional[str] = None
|
||||
alerts_sent: dict = field(default_factory=dict) # name -> timestamp of last alert
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def _load_state() -> HealthState:
|
||||
if os.path.exists(STATE_FILE):
|
||||
try:
|
||||
with open(STATE_FILE) as f:
|
||||
data = json.load(f)
|
||||
state = HealthState()
|
||||
state.last_full_check = data.get("last_full_check")
|
||||
state.alerts_sent = data.get("alerts_sent", {})
|
||||
for name, agent_data in data.get("agents", {}).items():
|
||||
status = AgentStatus(name=agent_data["name"], url=agent_data["url"])
|
||||
status.last_check = agent_data.get("last_check")
|
||||
status.last_healthy = agent_data.get("last_healthy")
|
||||
status.response_time_ms = agent_data.get("response_time_ms")
|
||||
status.status = agent_data.get("status", "unknown")
|
||||
status.consecutive_failures = agent_data.get("consecutive_failures", 0)
|
||||
status.available_tools = agent_data.get("available_tools", 0)
|
||||
status.agent_status = agent_data.get("agent_status", "")
|
||||
status.error = agent_data.get("error")
|
||||
state.agents[name] = status
|
||||
return state
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
return HealthState()
|
||||
|
||||
|
||||
def _save_state(state: HealthState) -> None:
|
||||
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
|
||||
data = {
|
||||
"last_full_check": state.last_full_check,
|
||||
"alerts_sent": state.alerts_sent,
|
||||
"agents": {},
|
||||
}
|
||||
for name, status in state.agents.items():
|
||||
data["agents"][name] = asdict(status)
|
||||
with open(STATE_FILE, "w") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
|
||||
def _load_fleet() -> list[dict]:
|
||||
"""Load fleet agent definitions from ~/.hermes/fleet_agents.json.
|
||||
|
||||
Format:
|
||||
[
|
||||
{"name": "ezra", "url": "http://143.198.27.163:8080"},
|
||||
{"name": "bezalel", "url": "http://167.99.126.228:8080"},
|
||||
{"name": "allegro", "url": "http://167.99.126.229:8080"}
|
||||
]
|
||||
"""
|
||||
if os.path.exists(FLEET_FILE):
|
||||
try:
|
||||
with open(FLEET_FILE) as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _ping_agent(url: str, timeout: float = 15.0) -> dict:
|
||||
"""Ping an A2A endpoint and return health data.
|
||||
|
||||
Tries /health first, falls back to /a2a with a minimal request.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
|
||||
# Try dedicated health endpoint first
|
||||
health_url = f"{url.rstrip('/')}/health"
|
||||
try:
|
||||
req = urllib.request.Request(health_url, method="GET")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
elapsed = time.monotonic() - start
|
||||
data = json.loads(resp.read())
|
||||
return {
|
||||
"ok": True,
|
||||
"response_time_ms": round(elapsed * 1000, 1),
|
||||
"status": data.get("status", "healthy"),
|
||||
"available_tools": data.get("available_tools", 0),
|
||||
}
|
||||
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError):
|
||||
pass
|
||||
|
||||
# Fallback: try A2A agent card endpoint
|
||||
card_url = f"{url.rstrip('/')}/.well-known/agent-card.json"
|
||||
try:
|
||||
req = urllib.request.Request(card_url, method="GET")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
elapsed = time.monotonic() - start
|
||||
data = json.loads(resp.read())
|
||||
tools = data.get("capabilities", {}).get("tools", [])
|
||||
return {
|
||||
"ok": True,
|
||||
"response_time_ms": round(elapsed * 1000, 1),
|
||||
"status": "healthy",
|
||||
"available_tools": len(tools) if isinstance(tools, list) else 0,
|
||||
}
|
||||
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError, OSError) as e:
|
||||
elapsed = time.monotonic() - start
|
||||
return {
|
||||
"ok": False,
|
||||
"response_time_ms": round(elapsed * 1000, 1),
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
|
||||
class HealthMonitor:
|
||||
"""A2A fleet health monitor."""
|
||||
|
||||
def __init__(self, state_file: str = STATE_FILE, fleet_file: str = FLEET_FILE):
|
||||
self.state_file = state_file
|
||||
self.fleet_file = fleet_file
|
||||
self.state = _load_state()
|
||||
|
||||
def check_agent(self, name: str, url: str) -> AgentStatus:
|
||||
"""Check a single agent and update state."""
|
||||
now = _now_iso()
|
||||
result = _ping_agent(url)
|
||||
|
||||
if name not in self.state.agents:
|
||||
self.state.agents[name] = AgentStatus(name=name, url=url)
|
||||
|
||||
agent = self.state.agents[name]
|
||||
agent.last_check = now
|
||||
agent.response_time_ms = result.get("response_time_ms")
|
||||
|
||||
if result["ok"]:
|
||||
agent.status = "healthy"
|
||||
agent.consecutive_failures = 0
|
||||
agent.available_tools = result.get("available_tools", 0)
|
||||
agent.agent_status = result.get("status", "healthy")
|
||||
agent.last_healthy = now
|
||||
agent.error = None
|
||||
else:
|
||||
agent.consecutive_failures += 1
|
||||
agent.error = result.get("error", "unknown error")
|
||||
|
||||
if agent.consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
|
||||
agent.status = "down"
|
||||
else:
|
||||
agent.status = "degraded"
|
||||
|
||||
# Check response time even on success
|
||||
if result["ok"] and result.get("response_time_ms", 0) > RESPONSE_TIME_THRESHOLD_SEC * 1000:
|
||||
if agent.status == "healthy":
|
||||
agent.status = "degraded"
|
||||
|
||||
return agent
|
||||
|
||||
def check_all(self) -> list[AgentStatus]:
|
||||
"""Check all registered fleet agents."""
|
||||
fleet = _load_fleet()
|
||||
results = []
|
||||
for agent_def in fleet:
|
||||
name = agent_def.get("name", "unknown")
|
||||
url = agent_def.get("url", "")
|
||||
if not url:
|
||||
continue
|
||||
status = self.check_agent(name, url)
|
||||
results.append(status)
|
||||
|
||||
self.state.last_full_check = _now_iso()
|
||||
_save_state(self.state)
|
||||
return results
|
||||
|
||||
def get_alerts(self, results: list[AgentStatus]) -> list[dict]:
|
||||
"""Generate alerts for agents that just went down."""
|
||||
alerts = []
|
||||
now = time.time()
|
||||
|
||||
for agent in results:
|
||||
if agent.status == "down":
|
||||
last_alert = self.state.alerts_sent.get(agent.name, 0)
|
||||
# Alert at most once per hour per agent
|
||||
if now - last_alert > 3600:
|
||||
alerts.append({
|
||||
"agent": agent.name,
|
||||
"consecutive_failures": agent.consecutive_failures,
|
||||
"last_healthy": agent.last_healthy,
|
||||
"error": agent.error,
|
||||
})
|
||||
self.state.alerts_sent[agent.name] = now
|
||||
|
||||
if alerts:
|
||||
_save_state(self.state)
|
||||
|
||||
return alerts
|
||||
|
||||
def print_dashboard(self, results: list[AgentStatus]) -> str:
|
||||
"""Format a text dashboard of fleet health."""
|
||||
lines = []
|
||||
lines.append("=== A2A Fleet Health ===")
|
||||
lines.append(f"Checked: {_now_iso()[:19]}Z")
|
||||
lines.append("")
|
||||
|
||||
if not results:
|
||||
lines.append("No agents registered.")
|
||||
lines.append(f"Add agents to {self.fleet_file}")
|
||||
return "\n".join(lines)
|
||||
|
||||
# Header
|
||||
lines.append(f"{'AGENT':<12} {'STATUS':<10} {'MS':>8} {'TOOLS':>6} {'FAIL':>5} {'LAST HEALTHY'}")
|
||||
lines.append("-" * 72)
|
||||
|
||||
for agent in sorted(results, key=lambda a: a.name):
|
||||
status_icon = {
|
||||
"healthy": "OK",
|
||||
"degraded": "WARN",
|
||||
"down": "DOWN",
|
||||
"unknown": "???",
|
||||
}.get(agent.status, "???")
|
||||
|
||||
ms = f"{agent.response_time_ms:.0f}" if agent.response_time_ms else "-"
|
||||
tools = str(agent.available_tools) if agent.available_tools else "-"
|
||||
fail = str(agent.consecutive_failures) if agent.consecutive_failures else "-"
|
||||
last_h = agent.last_healthy[:19] + "Z" if agent.last_healthy else "never"
|
||||
|
||||
lines.append(f"{agent.name:<12} {status_icon:<10} {ms:>8} {tools:>6} {fail:>5} {last_h}")
|
||||
|
||||
# Summary
|
||||
healthy = sum(1 for a in results if a.status == "healthy")
|
||||
degraded = sum(1 for a in results if a.status == "degraded")
|
||||
down = sum(1 for a in results if a.status == "down")
|
||||
lines.append("-" * 72)
|
||||
lines.append(f"Total: {len(results)} Healthy: {healthy} Degraded: {degraded} Down: {down}")
|
||||
|
||||
# Alerts
|
||||
alerts = self.get_alerts(results)
|
||||
if alerts:
|
||||
lines.append("")
|
||||
lines.append("ALERTS:")
|
||||
for alert in alerts:
|
||||
lines.append(f" !! {alert['agent']} is DOWN ({alert['consecutive_failures']} consecutive failures)")
|
||||
if alert.get("error"):
|
||||
lines.append(f" Error: {alert['error'][:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
64
hermes_cli/a2a_health_commands.py
Normal file
64
hermes_cli/a2a_health_commands.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""CLI command: hermes a2a health
|
||||
|
||||
Fleet agent health check via A2A heartbeat.
|
||||
|
||||
Usage:
|
||||
hermes a2a health # Check all agents, show dashboard
|
||||
hermes a2a health --json # JSON output for scripting
|
||||
hermes a2a health --agent NAME # Check single agent
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
|
||||
from hermes_cli.a2a_health import HealthMonitor, _load_fleet, _ping_agent
|
||||
|
||||
|
||||
def register_subparser(subparsers):
|
||||
"""Register 'hermes a2a health' subcommand."""
|
||||
a2a_parser = subparsers.add_parser("a2a", help="A2A agent communication")
|
||||
a2a_sub = a2a_parser.add_subparsers(dest="a2a_command")
|
||||
|
||||
health_parser = a2a_sub.add_parser("health", help="Check fleet agent health")
|
||||
health_parser.add_argument("--json", action="store_true", help="JSON output")
|
||||
health_parser.add_argument("--agent", type=str, help="Check single agent by name")
|
||||
health_parser.set_defaults(func=run_health_check)
|
||||
|
||||
return a2a_parser
|
||||
|
||||
|
||||
def run_health_check(args: argparse.Namespace) -> int:
|
||||
"""Execute the health check command."""
|
||||
monitor = HealthMonitor()
|
||||
|
||||
if args.agent:
|
||||
# Single agent check
|
||||
fleet = _load_fleet()
|
||||
agent_def = next((a for a in fleet if a.get("name") == args.agent), None)
|
||||
if not agent_def:
|
||||
print(f"Error: agent '{args.agent}' not found in fleet", file=sys.stderr)
|
||||
print(f"Add it to ~/.hermes/fleet_agents.json", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
result = monitor.check_agent(args.agent, agent_def["url"])
|
||||
if args.json:
|
||||
from dataclasses import asdict
|
||||
print(json.dumps(asdict(result), indent=2))
|
||||
else:
|
||||
print(monitor.print_dashboard([result]))
|
||||
|
||||
return 0 if result.status in ("healthy", "degraded") else 1
|
||||
|
||||
# Full fleet check
|
||||
results = monitor.check_all()
|
||||
|
||||
if args.json:
|
||||
from dataclasses import asdict
|
||||
print(json.dumps([asdict(r) for r in results], indent=2))
|
||||
else:
|
||||
print(monitor.print_dashboard(results))
|
||||
|
||||
# Exit code: 0 if all healthy, 1 if any down
|
||||
has_down = any(r.status == "down" for r in results)
|
||||
return 1 if has_down else 0
|
||||
Reference in New Issue
Block a user