Compare commits

..

1 Commits

Author SHA1 Message Date
9f38001443 feat: Gradient Bang multi-agent architecture analysis
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 43s
Tests / e2e (pull_request) Successful in 4m35s
Tests / test (pull_request) Failing after 58m32s
Closes #725

Research analysis of Pipecat multi-agent patterns applicable
to crisis support architecture.
2026-04-15 03:02:00 +00:00
3 changed files with 85 additions and 368 deletions

View File

@@ -0,0 +1,85 @@
# Gradient Bang — Multi-Agent Architecture Analysis
## Research Source
- **Repo:** https://github.com/pipecat-ai/gradient-bang
- **Stars:** 127 | **Forks:** 24 | **License:** Apache 2.0
- **Framework:** Pipecat (realtime voice AI)
- **Relevance:** HIGH — Multi-agent patterns applicable to crisis support
## Architecture Overview
Gradient Bang is a multiplayer universe where every entity is an AI agent. Players interact via voice with their ship\'s AI. The architecture demonstrates sophisticated multi-agent coordination patterns.
### Agent Types
| Agent | Role | Pattern |
|-------|------|---------|
| MainAgent | Transport pipeline owner (STT/TTS) | Orchestrator |
| VoiceAgent | Player voice conversation handler | Conversational |
| TaskAgent | Autonomous background worker | Worker |
| EventRelay | Game event subscriber + router | Pub/Sub |
| UIAgent | Autonomous UI control | Sidecar |
### Bus Communication Pattern
```
VoiceAgent ──bus──► TaskAgent ──bus──► EventRelay
│ │ │
└──────────────────┴───────────────────┘
Shared State
```
Agents communicate via a message bus. No direct coupling. Events are published and subscribed to asynchronously.
### Key Patterns
#### 1. Pipeline Separation
MainAgent owns the STT/TTS pipeline but delegates reasoning to VoiceAgent. Separation of transport from intelligence.
#### 2. Task Spawning
TaskAgent runs autonomous tasks in background. VoiceAgent can spawn tasks without blocking the conversation.
#### 3. Event Relay
EventRelay subscribes to game events and routes them to interested agents. Pub/Sub pattern for loose coupling.
#### 4. Parallel UI
UIAgent updates UI independently of conversation flow. Non-blocking visual updates.
## Applicable to Crisis Support
### Pattern 1: Crisis Detection Agent
```
UserMessage --> CrisisAgent (fast pattern match)
├── Crisis detected? --> 988Response (immediate)
└── No crisis? --> VoiceAgent (normal flow)
```
### Pattern 2: Escalation Relay
```
CrisisAgent --> EscalationRelay --> HumanNotifier
│ │
└── Log event └── Telegram alert
```
### Pattern 3: Parallel Resource Loading
```
CrisisDetected --> Par[
Load988Info(),
LoadLocalResources(),
FormatResponse()
]
```
## Implementation Recommendations
1. **Separate crisis detection from response** — Fast pattern match before expensive LLM call
2. **Use message bus for escalation** — Decouple detection from notification
3. **Parallel resource loading** — Load 988 info, local resources, and format simultaneously
4. **Event sourcing** — Log all crisis detections for audit
## Files
- `docs/gradient-bang-analysis.md` — This document
- `agent/crisis_bus.py` — Message bus for crisis events (proposed)

View File

@@ -1,272 +0,0 @@
#!/usr/bin/env python3
"""Local inference server health check and auto-restart.
Checks llama-server, Ollama, and other local inference endpoints.
Reports status, latency, and can auto-restart dead processes.
Refs: #713 — llama-server DOWN on port 8081
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import time
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
@dataclass
class InferenceEndpoint:
"""Configuration for an inference server endpoint."""
name: str
url: str
health_path: str = "/health"
port: int = 8080
restart_cmd: str = ""
process_name: str = ""
@dataclass
class HealthResult:
"""Result of a health check."""
name: str
url: str
status: str # "ok", "down", "slow", "error"
latency_ms: float = 0.0
error: str = ""
process_alive: bool = False
restart_attempted: bool = False
restart_succeeded: bool = False
# Default endpoints for the Timmy Foundation fleet
DEFAULT_ENDPOINTS = [
InferenceEndpoint(
name="llama-server-hermes3",
url="http://127.0.0.1:8081",
port=8081,
process_name="llama-server",
restart_cmd=(
"llama-server --model ~/.ollama/models/blobs/sha256-c8985d "
"--port 8081 --host 127.0.0.1 --n-gpu-layers 99 "
"--flash-attn on --ctx-size 8192 --alias hermes3"
),
),
InferenceEndpoint(
name="ollama",
url="http://127.0.0.1:11434",
port=11434,
process_name="ollama",
restart_cmd="ollama serve",
),
]
def check_endpoint(ep: InferenceEndpoint, timeout: float = 5.0) -> HealthResult:
"""Check a single inference endpoint.
Args:
ep: Endpoint configuration.
timeout: HTTP timeout in seconds.
Returns:
HealthResult with status and latency.
"""
url = ep.url.rstrip("/") + ep.health_path
start = time.time()
# Check if process is alive
process_alive = False
if ep.process_name:
try:
result = subprocess.run(
["pgrep", "-f", ep.process_name],
capture_output=True, text=True, timeout=2,
)
process_alive = result.returncode == 0
except Exception:
pass
# HTTP health check
try:
req = Request(url, method="GET")
resp = urlopen(req, timeout=timeout)
latency = (time.time() - start) * 1000
if resp.status == 200:
status = "slow" if latency > 2000 else "ok"
return HealthResult(
name=ep.name, url=ep.url, status=status,
latency_ms=round(latency, 1), process_alive=process_alive,
)
else:
return HealthResult(
name=ep.name, url=ep.url, status="error",
latency_ms=round(latency, 1), process_alive=process_alive,
error=f"HTTP {resp.status}",
)
except URLError as e:
latency = (time.time() - start) * 1000
error_msg = str(e.reason) if hasattr(e, 'reason') else str(e)
return HealthResult(
name=ep.name, url=ep.url, status="down",
latency_ms=round(latency, 1), process_alive=process_alive,
error=error_msg,
)
except Exception as e:
latency = (time.time() - start) * 1000
return HealthResult(
name=ep.name, url=ep.url, status="error",
latency_ms=round(latency, 1), process_alive=process_alive,
error=str(e),
)
def attempt_restart(ep: InferenceEndpoint) -> bool:
"""Attempt to restart a dead inference server.
Args:
ep: Endpoint configuration with restart_cmd.
Returns:
True if restart command executed successfully.
"""
if not ep.restart_cmd:
return False
try:
# Run restart in background
subprocess.Popen(
ep.restart_cmd,
shell=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Wait a moment for the process to start
time.sleep(3)
return True
except Exception as e:
print(f"Restart failed for {ep.name}: {e}", file=sys.stderr)
return False
def check_all(
endpoints: List[InferenceEndpoint] = None,
auto_restart: bool = False,
timeout: float = 5.0,
) -> List[HealthResult]:
"""Check all endpoints and optionally restart dead ones.
Args:
endpoints: List of endpoints to check. Uses DEFAULT_ENDPOINTS if None.
auto_restart: If True, attempt to restart down endpoints.
timeout: HTTP timeout per endpoint.
Returns:
List of HealthResult for each endpoint.
"""
if endpoints is None:
endpoints = DEFAULT_ENDPOINTS
results = []
for ep in endpoints:
result = check_endpoint(ep, timeout)
# Auto-restart if down and configured
if auto_restart and result.status == "down" and ep.restart_cmd:
result.restart_attempted = True
result.restart_succeeded = attempt_restart(ep)
if result.restart_succeeded:
# Re-check after restart
time.sleep(2)
result2 = check_endpoint(ep, timeout)
result.status = result2.status
result.latency_ms = result2.latency_ms
result.error = result2.error
results.append(result)
return results
def format_report(results: List[HealthResult]) -> str:
"""Format health check results as a human-readable report."""
lines = [
"# Local Inference Health Check",
f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}",
"",
"| Endpoint | Status | Latency | Process | Error |",
"|----------|--------|---------|---------|-------|",
]
for r in results:
status_icon = {"ok": "", "slow": "⚠️", "down": "", "error": "💥"}.get(r.status, "?")
proc = "alive" if r.process_alive else "dead"
lat = f"{r.latency_ms}ms" if r.latency_ms > 0 else "-"
err = r.error[:40] if r.error else "-"
lines.append(f"| {r.name} | {status_icon} {r.status} | {lat} | {proc} | {err} |")
down = [r for r in results if r.status in ("down", "error")]
if down:
lines.extend(["", "## DOWN", ""])
for r in down:
lines.append(f"- **{r.name}** ({r.url}): {r.error}")
if r.restart_attempted:
status = "✅ restarted" if r.restart_succeeded else "❌ restart failed"
lines.append(f" Restart: {status}")
return "\n".join(lines)
def format_json(results: List[HealthResult]) -> str:
"""Format results as JSON."""
data = []
for r in results:
data.append({
"name": r.name,
"url": r.url,
"status": r.status,
"latency_ms": r.latency_ms,
"process_alive": r.process_alive,
"error": r.error or None,
"restart_attempted": r.restart_attempted,
"restart_succeeded": r.restart_succeeded,
})
return json.dumps({"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), "endpoints": data}, indent=2)
def main():
import argparse
p = argparse.ArgumentParser(description="Local inference health check")
p.add_argument("--json", action="store_true", help="JSON output")
p.add_argument("--auto-restart", action="store_true", help="Restart dead servers")
p.add_argument("--timeout", type=float, default=5.0, help="HTTP timeout (seconds)")
p.add_argument("--port", type=int, help="Check specific port only")
a = p.parse_args()
endpoints = DEFAULT_ENDPOINTS
if a.port:
endpoints = [ep for ep in DEFAULT_ENDPOINTS if ep.port == a.port]
if not endpoints:
print(f"No endpoint configured for port {a.port}", file=sys.stderr)
sys.exit(1)
results = check_all(endpoints, auto_restart=a.auto_restart, timeout=a.timeout)
if a.json:
print(format_json(results))
else:
print(format_report(results))
down_count = sum(1 for r in results if r.status in ("down", "error"))
sys.exit(1 if down_count > 0 else 0)
if __name__ == "__main__":
main()

View File

@@ -1,96 +0,0 @@
"""Tests for inference health check (#713)."""
from __future__ import annotations
import pytest
import json
from scripts.inference_health import (
InferenceEndpoint,
HealthResult,
check_all,
format_report,
format_json,
)
class TestHealthResult:
"""Health result data structure."""
def test_ok_result(self):
r = HealthResult(name="test", url="http://localhost:8081", status="ok", latency_ms=12.5)
assert r.status == "ok"
assert r.latency_ms == 12.5
assert not r.error
def test_down_result(self):
r = HealthResult(
name="test", url="http://localhost:8081",
status="down", error="Connection refused",
)
assert r.status == "down"
assert r.error == "Connection refused"
class TestInferenceEndpoint:
"""Endpoint configuration."""
def test_defaults(self):
ep = InferenceEndpoint(name="test", url="http://localhost:8080")
assert ep.health_path == "/health"
assert ep.port == 8080
assert ep.restart_cmd == ""
def test_custom(self):
ep = InferenceEndpoint(
name="llama", url="http://localhost:8081",
port=8081, restart_cmd="llama-server --port 8081",
)
assert ep.port == 8081
assert "llama-server" in ep.restart_cmd
class TestFormatReport:
"""Report formatting."""
def test_all_ok(self):
results = [
HealthResult(name="test1", url="http://localhost:8080", status="ok", latency_ms=5.0, process_alive=True),
HealthResult(name="test2", url="http://localhost:8081", status="ok", latency_ms=10.0, process_alive=True),
]
report = format_report(results)
assert "Health Check" in report
assert "test1" in report
assert "test2" in report
assert "DOWN" not in report
def test_with_down(self):
results = [
HealthResult(name="test1", url="http://localhost:8080", status="ok", latency_ms=5.0),
HealthResult(
name="test2", url="http://localhost:8081",
status="down", error="Connection refused", process_alive=False,
),
]
report = format_report(results)
assert "DOWN" in report
assert "Connection refused" in report
class TestFormatJson:
"""JSON output format."""
def test_valid_json(self):
results = [HealthResult(name="test", url="http://localhost:8080", status="ok", latency_ms=5.0)]
output = format_json(results)
data = json.loads(output)
assert "timestamp" in data
assert "endpoints" in data
assert len(data["endpoints"]) == 1
assert data["endpoints"][0]["name"] == "test"
def test_none_error_serializes(self):
results = [HealthResult(name="test", url="http://localhost:8080", status="ok")]
output = format_json(results)
data = json.loads(output)
assert data["endpoints"][0]["error"] is None