Compare commits
1 Commits
step35/595
...
step35/332
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a007175a2 |
188
bin/swarm-health-monitor.py
Executable file
188
bin/swarm-health-monitor.py
Executable file
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Swarm Health Monitor — PAPERCLIPS-4 #332
|
||||
Monitoring-only first pass: detects boredom (idle agents) and
|
||||
disorganization (imbalanced work distribution). Uses stdlib only.
|
||||
|
||||
Metrics:
|
||||
- active_ratio: (# agents with open issues) / (# total configured agents)
|
||||
- Work distribution per agent per repo
|
||||
- Overload / underload / dominance warnings
|
||||
|
||||
Exit codes:
|
||||
0 = healthy, no warnings
|
||||
1 = warnings present (boredom/overload/disorg detected)
|
||||
2 = error (token missing, API failure)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
# Config
|
||||
GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps"
|
||||
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
ORG_NAME = "Timmy_Foundation"
|
||||
FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json"
|
||||
OVERLOAD_THRESHOLD = 8
|
||||
MEAN_MULTIPLIER = 0.5
|
||||
|
||||
|
||||
def load_token() -> str:
|
||||
if not GITEA_TOKEN_PATH.exists():
|
||||
raise FileNotFoundError(f"Gitea token not found at {GITEA_TOKEN_PATH}")
|
||||
return GITEA_TOKEN_PATH.read_text().strip()
|
||||
|
||||
|
||||
def load_configured_agents() -> set[str]:
|
||||
if FLEET_ROUTING_PATH.exists():
|
||||
try:
|
||||
data = json.loads(FLEET_ROUTING_PATH.read_text())
|
||||
agents = data.get("agents", [])
|
||||
return {a.get("name", "") for a in agents if a.get("name")}
|
||||
except Exception:
|
||||
pass
|
||||
lanes_path = Path(__file__).parent / "playbooks" / "agent-lanes.json"
|
||||
if lanes_path.exists():
|
||||
return set(json.loads(lanes_path.read_text()).keys())
|
||||
return set()
|
||||
|
||||
|
||||
def gitea_get(path: str, params=None) -> any:
|
||||
token = load_token()
|
||||
url = f"{GITEA_API_BASE}{path}"
|
||||
if params:
|
||||
from urllib.parse import urlencode
|
||||
url += "?" + urlencode(params)
|
||||
req = Request(url, headers={"Authorization": f"token {token}", "Accept": "application/json"})
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_all_open_issues() -> list[dict]:
|
||||
repos = gitea_get(f"/orgs/{ORG_NAME}/repos", params={"limit": 100})
|
||||
all_issues = []
|
||||
for repo in repos:
|
||||
repo_name = repo["name"]
|
||||
page = 1
|
||||
while True:
|
||||
issues = gitea_get(
|
||||
f"/repos/{ORG_NAME}/{repo_name}/issues",
|
||||
params={"state": "open", "type": "issues", "limit": 50, "page": page},
|
||||
)
|
||||
if not issues:
|
||||
break
|
||||
all_issues.extend(issues)
|
||||
if len(issues) < 50:
|
||||
break
|
||||
page += 1
|
||||
return all_issues
|
||||
|
||||
|
||||
def compute_distribution(issues: list[dict]) -> dict:
|
||||
agent_issues = defaultdict(list)
|
||||
repo_agent = defaultdict(lambda: defaultdict(int))
|
||||
|
||||
for issue in issues:
|
||||
assignees = issue.get("assignees") or []
|
||||
repo = issue.get("repository", {}).get("name") or "unknown"
|
||||
num = issue.get("number", "?")
|
||||
for assignee in assignees:
|
||||
name = assignee.get("login", str(assignee))
|
||||
agent_issues[name].append((repo, num))
|
||||
repo_agent[repo][name] += 1
|
||||
|
||||
configured = load_configured_agents()
|
||||
total_configured = len(configured) if configured else len(agent_issues)
|
||||
total_active = len(agent_issues)
|
||||
active_ratio = total_active / total_configured if total_configured > 0 else 0.0
|
||||
|
||||
issue_counts = [len(v) for v in agent_issues.values()]
|
||||
mean_issues = sum(issue_counts) / len(issue_counts) if issue_counts else 0.0
|
||||
|
||||
warnings = []
|
||||
# Overload
|
||||
for agent, items in agent_issues.items():
|
||||
if len(items) > OVERLOAD_THRESHOLD:
|
||||
warnings.append(f"OVERLOAD: @{agent} has {len(items)} issues (>{OVERLOAD_THRESHOLD})")
|
||||
# Underload (boredom)
|
||||
for agent, items in agent_issues.items():
|
||||
if len(items) < mean_issues * MEAN_MULTIPLIER and len(items) < 2:
|
||||
warnings.append(f"BOREDOM: @{agent} has only {len(items)} issues (mean={mean_issues:.1f})")
|
||||
# Disorganization: one agent dominates a repo
|
||||
for repo, agent_counts in repo_agent.items():
|
||||
total_in_repo = sum(agent_counts.values())
|
||||
for agent, count in agent_counts.items():
|
||||
if count > total_in_repo * 0.7 and total_in_repo >= 3:
|
||||
warnings.append(f"DISORG: @{agent} handles {count}/{total_in_repo} issues in {repo}")
|
||||
|
||||
return {
|
||||
"agent_issues": dict(agent_issues),
|
||||
"repo_agent_issues": {r: dict(a) for r, a in repo_agent.items()},
|
||||
"total_configured": total_configured,
|
||||
"total_active": total_active,
|
||||
"active_ratio": round(active_ratio, 3),
|
||||
"mean_issues_per_active_agent": round(mean_issues, 2),
|
||||
"warnings": warnings,
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
def print_report(result: dict) -> None:
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f" SWARM HEALTH — {result['generated_at']}")
|
||||
print(f"{'=' * 60}")
|
||||
print(f" Configured agents : {result['total_configured']}")
|
||||
print(f" Active agents : {result['total_active']}")
|
||||
print(f" Health ratio : {result['active_ratio']:.1%}")
|
||||
print(f" Mean issues/active: {result['mean_issues_per_active_agent']:.1f}")
|
||||
print(f"\n Work distribution:")
|
||||
for agent, items in sorted(result["agent_issues"].items()):
|
||||
repos = defaultdict(int)
|
||||
for repo, num in items:
|
||||
repos[repo] += 1
|
||||
repo_str = ", ".join(f"{r}:{c}" for r, c in sorted(repos.items()))
|
||||
print(f" @{agent:<20} {len(items):>3} issues ({repo_str})")
|
||||
if result["warnings"]:
|
||||
print(f"\n ⚠️ WARNINGS ({len(result['warnings'])}):")
|
||||
for w in result["warnings"]:
|
||||
print(f" - {w}")
|
||||
else:
|
||||
print(f"\n ✅ No warnings — swarm balanced.")
|
||||
print(f"{'=' * 60}\n")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Swarm health monitor for PAPERCLIPS-4 #332")
|
||||
parser.add_argument("--json", action="store_true", help="Emit raw JSON instead of report")
|
||||
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress report, only exit code")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
issues = get_all_open_issues()
|
||||
result = compute_distribution(issues)
|
||||
except HTTPError as e:
|
||||
print(f"HTTP ERROR: {e.code} {e.reason}", file=sys.stderr)
|
||||
return 2
|
||||
except URLError as e:
|
||||
print(f"NETWORK ERROR: {e.reason}", file=sys.stderr)
|
||||
return 2
|
||||
except Exception as exc:
|
||||
print(f"ERROR: {type(exc).__name__}: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
elif not args.quiet:
|
||||
print_report(result)
|
||||
|
||||
return 1 if result["warnings"] else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
224
bin/swarm-rebalance.py
Executable file
224
bin/swarm-rebalance.py
Executable file
@@ -0,0 +1,224 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Swarm Rebalance — PAPERCLIPS-4 #332
|
||||
|
||||
Detects overloaded/bored agents and shifts a limited number of issues
|
||||
from overloaded to underloaded agents.
|
||||
|
||||
This is a lightweight companion to workforce-manager.py:
|
||||
- Does NOT touch issues that have recent activity (assume work in flight)
|
||||
- Only reassigns issues that are still unstarted (no recent comments/activity)
|
||||
- Preserves existing assignment unless rebalancing is needed
|
||||
|
||||
Usage:
|
||||
python swarm-rebalance.py [--dry-run] [--max-shifts 3]
|
||||
|
||||
Requires: stdlib only.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from pathlib import Path
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError, URLError
|
||||
|
||||
GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps"
|
||||
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
ORG_NAME = "Timmy_Foundation"
|
||||
FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json"
|
||||
# Thresholds match health monitor
|
||||
OVERLOAD_THRESHOLD = 8
|
||||
UNDERLOAD_FACTOR = 0.5 # agent with < mean*UNDERLOAD_FACTOR issues is candidate to receive work
|
||||
MAX_SHIFTS_PER_RUN = 3 # safety limit
|
||||
|
||||
|
||||
def load_token() -> str:
|
||||
if not GITEA_TOKEN_PATH.exists():
|
||||
raise FileNotFoundError(f"Gitea token not found at {GITEA_TOKEN_PATH}")
|
||||
return GITEA_TOKEN_PATH.read_text().strip()
|
||||
|
||||
|
||||
def load_agents() -> dict:
|
||||
"""Return mapping name -> agent config (from fleet-routing or lanes)."""
|
||||
if FLEET_ROUTING_PATH.exists():
|
||||
try:
|
||||
data = json.loads(FLEET_ROUTING_PATH.read_text())
|
||||
agents = data.get("agents", [])
|
||||
return {a.get("name", ""): a for a in agents if a.get("name")}
|
||||
except Exception:
|
||||
pass
|
||||
lanes_path = Path(__file__).parent / "playbooks" / "agent-lanes.json"
|
||||
if lanes_path.exists():
|
||||
lanes = json.loads(lanes_path.read_text())
|
||||
return {name: {"role": "unknown", "lane": info.get("lane", "")} for name, info in lanes.items()}
|
||||
return {}
|
||||
|
||||
|
||||
def gitea_get(path: str, params=None) -> any:
|
||||
token = load_token()
|
||||
url = f"{GITEA_API_BASE}{path}"
|
||||
if params:
|
||||
from urllib.parse import urlencode
|
||||
url += "?" + urlencode(params)
|
||||
req = Request(url, headers={"Authorization": f"token {token}", "Accept": "application/json"})
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def gitea_patch(path: str, data: dict) -> any:
|
||||
token = load_token()
|
||||
url = f"{GITEA_API_BASE}{path}"
|
||||
body = json.dumps(data).encode()
|
||||
req = Request(url, data=body, headers={"Authorization": f"token {token}", "Content-Type": "application/json"}, method="PATCH")
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repo_issues(owner: str, repo: str) -> list[dict]:
|
||||
"""Fetch open issues for a repo (paginated)."""
|
||||
issues = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = gitea_get(f"/repos/{owner}/{repo}/issues", params={"state": "open", "type": "issues", "limit": 50, "page": page})
|
||||
if not batch:
|
||||
break
|
||||
issues.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return issues
|
||||
|
||||
|
||||
def compute_loads(issues: list[dict]) -> tuple[dict[str, int], dict, dict]:
|
||||
"""Returns:
|
||||
agent_load: name -> current issue count
|
||||
agent_repos: name -> set of repos assigned
|
||||
issue_map: (repo, num) -> issue dict
|
||||
"""
|
||||
agent_load = defaultdict(int)
|
||||
agent_repos = defaultdict(set)
|
||||
issue_map = {}
|
||||
for issue in issues:
|
||||
repo = issue.get("repository", {}).get("name", "unknown")
|
||||
num = issue.get("number")
|
||||
assignees = issue.get("assignees") or []
|
||||
for assignee in assignees:
|
||||
name = assignee.get("login", str(assignee))
|
||||
agent_load[name] += 1
|
||||
agent_repos[name].add(repo)
|
||||
if num is not None:
|
||||
issue_map[(repo, num)] = issue
|
||||
return dict(agent_load), dict(agent_repos), issue_map
|
||||
|
||||
|
||||
def format_comment(shifted_from: str, shifted_to: str, reason: str) -> str:
|
||||
return (
|
||||
f"🤖 **Swarm Rebalance** — reassigned from @{shifted_from} to @{shifted_to}.\n\n"
|
||||
f"Reason: {reason}\n\n"
|
||||
f"*Automated rebalance (PAPERCLIPS-4 #332)*"
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Swarm rebalance — shift issues from overloaded to underloaded agents")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Show what would change without touching Gitea")
|
||||
parser.add_argument("--max-shifts", type=int, default=MAX_SHIFTS_PER_RUN, help="Maximum issues to reassign")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
# 1. Load all repos issues
|
||||
repos = gitea_get(f"/orgs/{ORG_NAME}/repos", params={"limit": 100})
|
||||
all_issues: list[dict] = []
|
||||
for repo in repos:
|
||||
all_issues.extend(get_repo_issues(ORG_NAME, repo["name"]))
|
||||
|
||||
agent_load, agent_repos, issue_map = compute_loads(all_issues)
|
||||
agents_info = load_agents()
|
||||
|
||||
if not agent_load:
|
||||
print("No assigned issues found — nothing to rebalance.")
|
||||
return 0
|
||||
|
||||
mean_load = sum(agent_load.values()) / len(agent_load)
|
||||
overload_candidates = {a: c for a, c in agent_load.items() if c > OVERLOAD_THRESHOLD}
|
||||
underload_candidates = {a: c for a, c in agent_load.items() if c < mean_load * UNDERLOAD_FACTOR and c < 2}
|
||||
|
||||
print(f"Swarm load: {len(agent_load)} active agents, mean={mean_load:.1f}, overload={len(overload_candidates)}, underload={len(underload_candidates)}")
|
||||
|
||||
if not overload_candidates or not underload_candidates:
|
||||
print("No rebalance candidates — swarm balanced.")
|
||||
return 0
|
||||
|
||||
shifts = 0
|
||||
for overloaded in sorted(overload_candidates.items(), key=lambda x: x[1], reverse=True):
|
||||
if shifts >= args.max_shifts:
|
||||
break
|
||||
agent_from, load_count = overloaded
|
||||
# Find an issue assigned to this agent that can be moved
|
||||
# Prefer issues with no recent activity (no comments in 24h? Hard to detect cheaply)
|
||||
# Simplified: pick any assigned issue, try to find a suitable receiver
|
||||
assigned_issues = [(repo, num) for (repo, num), iss in issue_map.items()
|
||||
if any(a.get("login") == agent_from for a in (iss.get("assignees") or []))]
|
||||
if not assigned_issues:
|
||||
continue
|
||||
# Pick the first issue (simplest)
|
||||
repo, num = assigned_issues[0]
|
||||
issue = issue_map[(repo, num)]
|
||||
|
||||
# Find a suitable receiver (underloaded, preferably not already on this repo)
|
||||
best_receiver = None
|
||||
for candidate in sorted(underload_candidates.keys(), key=lambda a: agent_load[a]):
|
||||
# Prefer agents not already working on this repo (to spread)
|
||||
if repo not in agent_repos.get(candidate, set()):
|
||||
best_receiver = candidate
|
||||
break
|
||||
if not best_receiver:
|
||||
best_receiver = sorted(underload_candidates.keys(), key=lambda a: agent_load[a])[0]
|
||||
|
||||
# Perform reassignment (dry-run or real)
|
||||
if args.dry_run:
|
||||
print(f"[DRY] Shift issue {ORG_NAME}/{repo}#{num} from @{agent_from} → @{best_receiver}")
|
||||
else:
|
||||
try:
|
||||
gitea_patch(f"/repos/{ORG_NAME}/{repo}/issues/{num}", {"assignees": [best_receiver]})
|
||||
comment = format_comment(agent_from, best_receiver, f"agent load rebalance ({load_count}→{agent_load[best_receiver]+1})")
|
||||
gitea_post(f"/repos/{ORG_NAME}/{repo}/issues/{num}/comments", {"body": comment})
|
||||
print(f"✓ Shifted {ORG_NAME}/{repo}#{num} from @{agent_from} → @{best_receiver}")
|
||||
except Exception as e:
|
||||
print(f"✗ Failed shift issue #{num}: {e}")
|
||||
continue
|
||||
|
||||
# Update local counters for next iteration
|
||||
agent_load[agent_from] -= 1
|
||||
agent_load[best_receiver] = agent_load.get(best_receiver, 0) + 1
|
||||
agent_repos[best_receiver].add(repo)
|
||||
shifts += 1
|
||||
|
||||
print(f"Rebalance complete: {shifts} issue(s) shifted.")
|
||||
return 0 if shifts == 0 else 1 # exit 1 if any shifts happened (signal change)
|
||||
|
||||
except HTTPError as e:
|
||||
print(f"HTTP ERROR {e.code}: {e.reason}", file=sys.stderr)
|
||||
return 2
|
||||
except URLError as e:
|
||||
print(f"NETWORK ERROR: {e.reason}", file=sys.stderr)
|
||||
return 2
|
||||
except Exception as exc:
|
||||
print(f"ERROR: {type(exc).__name__}: {exc}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
|
||||
def gitea_post(path: str, data: dict) -> any:
|
||||
token = load_token()
|
||||
url = f"{GITEA_API_BASE}{path}"
|
||||
body = json.dumps(data).encode()
|
||||
req = Request(url, data=body, headers={"Authorization": f"token {token}", "Content-Type": "application/json"}, method="POST")
|
||||
with urlopen(req, timeout=10) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -249,14 +249,34 @@ while true; do
|
||||
run_triage "$state_dir"
|
||||
rm -rf "$state_dir"
|
||||
|
||||
# Run workforce manager every 3rd cycle (~15 min)
|
||||
WORKFORCE_CYCLE=$((WORKFORCE_CYCLE + 1))
|
||||
if [ $((WORKFORCE_CYCLE % 3)) -eq 0 ]; then
|
||||
log "Running workforce manager..."
|
||||
python3 "$HOME/.hermes/bin/workforce-manager.py" all >> "$LOG_DIR/workforce-manager.log" 2>&1
|
||||
log "Workforce manager complete"
|
||||
fi
|
||||
# --- Swarm health monitoring (every cycle) ---
|
||||
SWARM_HEALTH_OK=1
|
||||
if [ -x "$SCRIPT_DIR/swarm-health-monitor.py" ]; then
|
||||
if ! "$SCRIPT_DIR/swarm-health-monitor.py" --quiet >/dev/null 2>&1; then
|
||||
SWARM_HEALTH_OK=0
|
||||
log "SWARM HEALTH: warnings detected — check output"
|
||||
fi
|
||||
else
|
||||
log "SWARM HEALTH: script not found at $SCRIPT_DIR/swarm-health-monitor.py"
|
||||
fi
|
||||
|
||||
log "Sleeping ${CYCLE_INTERVAL}s"
|
||||
sleep "$CYCLE_INTERVAL"
|
||||
# Run workforce manager every 3rd cycle (~15 min)
|
||||
WORKFORCE_CYCLE=$((WORKFORCE_CYCLE + 1))
|
||||
if [ $((WORKFORCE_CYCLE % 3)) -eq 0 ]; then
|
||||
log "Running workforce manager..."
|
||||
python3 "$HOME/.hermes/bin/workforce-manager.py" all >> "$LOG_DIR/workforce-manager.log" 2>&1
|
||||
log "Workforce manager complete"
|
||||
|
||||
# Run swarm rebalance (dry-run first; remove --dry-run to shift issues)
|
||||
if [ -x "$SCRIPT_DIR/swarm-rebalance.py" ]; then
|
||||
log "Running swarm rebalance (dry-run)..."
|
||||
"$SCRIPT_DIR/swarm-rebalance.py" --dry-run >> "$LOG_DIR/swarm-rebalance.log" 2>&1
|
||||
log "Swarm rebalance dry-run complete"
|
||||
else
|
||||
log "SWARM REBALANCE: script not found at $SCRIPT_DIR/swarm-rebalance.py"
|
||||
fi
|
||||
fi
|
||||
|
||||
log "Sleeping ${CYCLE_INTERVAL}s"
|
||||
sleep "$CYCLE_INTERVAL"
|
||||
done
|
||||
|
||||
Reference in New Issue
Block a user