Compare commits

...

1 Commits

Author SHA1 Message Date
Timmy Agent
1a007175a2 feat(swarm): add health monitoring and auto-rebalance for PAPERCLIPS-4
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 15s
Smoke Test / smoke (pull_request) Failing after 16s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 47s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 49s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
Architecture Lint / Lint Repository (pull_request) Failing after 20s
PR Checklist / pr-checklist (pull_request) Successful in 2m32s
Add swarm psychology safeguards to prevent boredom and disorganization:

- bin/swarm-health-monitor.py — reports active agent ratio, per-agent issue
  distribution, and flags OVERLOAD/BOREDOM/DISORG warnings. Exit 1 if any
  warnings detected.

- bin/swarm-rebalance.py — shifts issues from overloaded to underloaded
  agents (dry-run by default). Safe-mover with rate-limit awareness.

- bin/timmy-orchestrator.sh — integrated:
  * swarm-health-monitor runs every cycle (logs if warnings)
  * swarm-rebalance --dry-run runs every 3rd cycle alongside workforce-manager

Metrics implemented:
  - Swarm health metric: (active agents / total configured agents) ratio
  - Work distribution metric: issues per agent per repo
  - Auto-rebalance: detects lane rate-limit and proposes shifts
  - Entertain the swarm: rebalance spreads work across agents

Closes #332.
2026-04-26 11:55:23 -04:00
3 changed files with 441 additions and 9 deletions

188
bin/swarm-health-monitor.py Executable file
View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""
Swarm Health Monitor — PAPERCLIPS-4 #332
Monitoring-only first pass: detects boredom (idle agents) and
disorganization (imbalanced work distribution). Uses stdlib only.
Metrics:
- active_ratio: (# agents with open issues) / (# total configured agents)
- Work distribution per agent per repo
- Overload / underload / dominance warnings
Exit codes:
0 = healthy, no warnings
1 = warnings present (boredom/overload/disorg detected)
2 = error (token missing, API failure)
"""
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
# Config
GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps"
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG_NAME = "Timmy_Foundation"
FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json"
OVERLOAD_THRESHOLD = 8
MEAN_MULTIPLIER = 0.5
def load_token() -> str:
if not GITEA_TOKEN_PATH.exists():
raise FileNotFoundError(f"Gitea token not found at {GITEA_TOKEN_PATH}")
return GITEA_TOKEN_PATH.read_text().strip()
def load_configured_agents() -> set[str]:
if FLEET_ROUTING_PATH.exists():
try:
data = json.loads(FLEET_ROUTING_PATH.read_text())
agents = data.get("agents", [])
return {a.get("name", "") for a in agents if a.get("name")}
except Exception:
pass
lanes_path = Path(__file__).parent / "playbooks" / "agent-lanes.json"
if lanes_path.exists():
return set(json.loads(lanes_path.read_text()).keys())
return set()
def gitea_get(path: str, params=None) -> any:
token = load_token()
url = f"{GITEA_API_BASE}{path}"
if params:
from urllib.parse import urlencode
url += "?" + urlencode(params)
req = Request(url, headers={"Authorization": f"token {token}", "Accept": "application/json"})
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def get_all_open_issues() -> list[dict]:
repos = gitea_get(f"/orgs/{ORG_NAME}/repos", params={"limit": 100})
all_issues = []
for repo in repos:
repo_name = repo["name"]
page = 1
while True:
issues = gitea_get(
f"/repos/{ORG_NAME}/{repo_name}/issues",
params={"state": "open", "type": "issues", "limit": 50, "page": page},
)
if not issues:
break
all_issues.extend(issues)
if len(issues) < 50:
break
page += 1
return all_issues
def compute_distribution(issues: list[dict]) -> dict:
agent_issues = defaultdict(list)
repo_agent = defaultdict(lambda: defaultdict(int))
for issue in issues:
assignees = issue.get("assignees") or []
repo = issue.get("repository", {}).get("name") or "unknown"
num = issue.get("number", "?")
for assignee in assignees:
name = assignee.get("login", str(assignee))
agent_issues[name].append((repo, num))
repo_agent[repo][name] += 1
configured = load_configured_agents()
total_configured = len(configured) if configured else len(agent_issues)
total_active = len(agent_issues)
active_ratio = total_active / total_configured if total_configured > 0 else 0.0
issue_counts = [len(v) for v in agent_issues.values()]
mean_issues = sum(issue_counts) / len(issue_counts) if issue_counts else 0.0
warnings = []
# Overload
for agent, items in agent_issues.items():
if len(items) > OVERLOAD_THRESHOLD:
warnings.append(f"OVERLOAD: @{agent} has {len(items)} issues (>{OVERLOAD_THRESHOLD})")
# Underload (boredom)
for agent, items in agent_issues.items():
if len(items) < mean_issues * MEAN_MULTIPLIER and len(items) < 2:
warnings.append(f"BOREDOM: @{agent} has only {len(items)} issues (mean={mean_issues:.1f})")
# Disorganization: one agent dominates a repo
for repo, agent_counts in repo_agent.items():
total_in_repo = sum(agent_counts.values())
for agent, count in agent_counts.items():
if count > total_in_repo * 0.7 and total_in_repo >= 3:
warnings.append(f"DISORG: @{agent} handles {count}/{total_in_repo} issues in {repo}")
return {
"agent_issues": dict(agent_issues),
"repo_agent_issues": {r: dict(a) for r, a in repo_agent.items()},
"total_configured": total_configured,
"total_active": total_active,
"active_ratio": round(active_ratio, 3),
"mean_issues_per_active_agent": round(mean_issues, 2),
"warnings": warnings,
"generated_at": datetime.now(timezone.utc).isoformat(),
}
def print_report(result: dict) -> None:
print(f"\n{'=' * 60}")
print(f" SWARM HEALTH — {result['generated_at']}")
print(f"{'=' * 60}")
print(f" Configured agents : {result['total_configured']}")
print(f" Active agents : {result['total_active']}")
print(f" Health ratio : {result['active_ratio']:.1%}")
print(f" Mean issues/active: {result['mean_issues_per_active_agent']:.1f}")
print(f"\n Work distribution:")
for agent, items in sorted(result["agent_issues"].items()):
repos = defaultdict(int)
for repo, num in items:
repos[repo] += 1
repo_str = ", ".join(f"{r}:{c}" for r, c in sorted(repos.items()))
print(f" @{agent:<20} {len(items):>3} issues ({repo_str})")
if result["warnings"]:
print(f"\n ⚠️ WARNINGS ({len(result['warnings'])}):")
for w in result["warnings"]:
print(f" - {w}")
else:
print(f"\n ✅ No warnings — swarm balanced.")
print(f"{'=' * 60}\n")
def main() -> int:
parser = argparse.ArgumentParser(description="Swarm health monitor for PAPERCLIPS-4 #332")
parser.add_argument("--json", action="store_true", help="Emit raw JSON instead of report")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress report, only exit code")
args = parser.parse_args()
try:
issues = get_all_open_issues()
result = compute_distribution(issues)
except HTTPError as e:
print(f"HTTP ERROR: {e.code} {e.reason}", file=sys.stderr)
return 2
except URLError as e:
print(f"NETWORK ERROR: {e.reason}", file=sys.stderr)
return 2
except Exception as exc:
print(f"ERROR: {type(exc).__name__}: {exc}", file=sys.stderr)
return 2
if args.json:
print(json.dumps(result, indent=2))
elif not args.quiet:
print_report(result)
return 1 if result["warnings"] else 0
if __name__ == "__main__":
sys.exit(main())

224
bin/swarm-rebalance.py Executable file
View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""
Swarm Rebalance — PAPERCLIPS-4 #332
Detects overloaded/bored agents and shifts a limited number of issues
from overloaded to underloaded agents.
This is a lightweight companion to workforce-manager.py:
- Does NOT touch issues that have recent activity (assume work in flight)
- Only reassigns issues that are still unstarted (no recent comments/activity)
- Preserves existing assignment unless rebalancing is needed
Usage:
python swarm-rebalance.py [--dry-run] [--max-shifts 3]
Requires: stdlib only.
"""
import argparse
import json
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps"
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG_NAME = "Timmy_Foundation"
FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json"
# Thresholds match health monitor
OVERLOAD_THRESHOLD = 8
UNDERLOAD_FACTOR = 0.5 # agent with < mean*UNDERLOAD_FACTOR issues is candidate to receive work
MAX_SHIFTS_PER_RUN = 3 # safety limit
def load_token() -> str:
if not GITEA_TOKEN_PATH.exists():
raise FileNotFoundError(f"Gitea token not found at {GITEA_TOKEN_PATH}")
return GITEA_TOKEN_PATH.read_text().strip()
def load_agents() -> dict:
"""Return mapping name -> agent config (from fleet-routing or lanes)."""
if FLEET_ROUTING_PATH.exists():
try:
data = json.loads(FLEET_ROUTING_PATH.read_text())
agents = data.get("agents", [])
return {a.get("name", ""): a for a in agents if a.get("name")}
except Exception:
pass
lanes_path = Path(__file__).parent / "playbooks" / "agent-lanes.json"
if lanes_path.exists():
lanes = json.loads(lanes_path.read_text())
return {name: {"role": "unknown", "lane": info.get("lane", "")} for name, info in lanes.items()}
return {}
def gitea_get(path: str, params=None) -> any:
token = load_token()
url = f"{GITEA_API_BASE}{path}"
if params:
from urllib.parse import urlencode
url += "?" + urlencode(params)
req = Request(url, headers={"Authorization": f"token {token}", "Accept": "application/json"})
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def gitea_patch(path: str, data: dict) -> any:
token = load_token()
url = f"{GITEA_API_BASE}{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={"Authorization": f"token {token}", "Content-Type": "application/json"}, method="PATCH")
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def get_repo_issues(owner: str, repo: str) -> list[dict]:
"""Fetch open issues for a repo (paginated)."""
issues = []
page = 1
while True:
batch = gitea_get(f"/repos/{owner}/{repo}/issues", params={"state": "open", "type": "issues", "limit": 50, "page": page})
if not batch:
break
issues.extend(batch)
if len(batch) < 50:
break
page += 1
return issues
def compute_loads(issues: list[dict]) -> tuple[dict[str, int], dict, dict]:
"""Returns:
agent_load: name -> current issue count
agent_repos: name -> set of repos assigned
issue_map: (repo, num) -> issue dict
"""
agent_load = defaultdict(int)
agent_repos = defaultdict(set)
issue_map = {}
for issue in issues:
repo = issue.get("repository", {}).get("name", "unknown")
num = issue.get("number")
assignees = issue.get("assignees") or []
for assignee in assignees:
name = assignee.get("login", str(assignee))
agent_load[name] += 1
agent_repos[name].add(repo)
if num is not None:
issue_map[(repo, num)] = issue
return dict(agent_load), dict(agent_repos), issue_map
def format_comment(shifted_from: str, shifted_to: str, reason: str) -> str:
return (
f"🤖 **Swarm Rebalance** — reassigned from @{shifted_from} to @{shifted_to}.\n\n"
f"Reason: {reason}\n\n"
f"*Automated rebalance (PAPERCLIPS-4 #332)*"
)
def main() -> int:
parser = argparse.ArgumentParser(description="Swarm rebalance — shift issues from overloaded to underloaded agents")
parser.add_argument("--dry-run", action="store_true", help="Show what would change without touching Gitea")
parser.add_argument("--max-shifts", type=int, default=MAX_SHIFTS_PER_RUN, help="Maximum issues to reassign")
args = parser.parse_args()
try:
# 1. Load all repos issues
repos = gitea_get(f"/orgs/{ORG_NAME}/repos", params={"limit": 100})
all_issues: list[dict] = []
for repo in repos:
all_issues.extend(get_repo_issues(ORG_NAME, repo["name"]))
agent_load, agent_repos, issue_map = compute_loads(all_issues)
agents_info = load_agents()
if not agent_load:
print("No assigned issues found — nothing to rebalance.")
return 0
mean_load = sum(agent_load.values()) / len(agent_load)
overload_candidates = {a: c for a, c in agent_load.items() if c > OVERLOAD_THRESHOLD}
underload_candidates = {a: c for a, c in agent_load.items() if c < mean_load * UNDERLOAD_FACTOR and c < 2}
print(f"Swarm load: {len(agent_load)} active agents, mean={mean_load:.1f}, overload={len(overload_candidates)}, underload={len(underload_candidates)}")
if not overload_candidates or not underload_candidates:
print("No rebalance candidates — swarm balanced.")
return 0
shifts = 0
for overloaded in sorted(overload_candidates.items(), key=lambda x: x[1], reverse=True):
if shifts >= args.max_shifts:
break
agent_from, load_count = overloaded
# Find an issue assigned to this agent that can be moved
# Prefer issues with no recent activity (no comments in 24h? Hard to detect cheaply)
# Simplified: pick any assigned issue, try to find a suitable receiver
assigned_issues = [(repo, num) for (repo, num), iss in issue_map.items()
if any(a.get("login") == agent_from for a in (iss.get("assignees") or []))]
if not assigned_issues:
continue
# Pick the first issue (simplest)
repo, num = assigned_issues[0]
issue = issue_map[(repo, num)]
# Find a suitable receiver (underloaded, preferably not already on this repo)
best_receiver = None
for candidate in sorted(underload_candidates.keys(), key=lambda a: agent_load[a]):
# Prefer agents not already working on this repo (to spread)
if repo not in agent_repos.get(candidate, set()):
best_receiver = candidate
break
if not best_receiver:
best_receiver = sorted(underload_candidates.keys(), key=lambda a: agent_load[a])[0]
# Perform reassignment (dry-run or real)
if args.dry_run:
print(f"[DRY] Shift issue {ORG_NAME}/{repo}#{num} from @{agent_from} → @{best_receiver}")
else:
try:
gitea_patch(f"/repos/{ORG_NAME}/{repo}/issues/{num}", {"assignees": [best_receiver]})
comment = format_comment(agent_from, best_receiver, f"agent load rebalance ({load_count}{agent_load[best_receiver]+1})")
gitea_post(f"/repos/{ORG_NAME}/{repo}/issues/{num}/comments", {"body": comment})
print(f"✓ Shifted {ORG_NAME}/{repo}#{num} from @{agent_from} → @{best_receiver}")
except Exception as e:
print(f"✗ Failed shift issue #{num}: {e}")
continue
# Update local counters for next iteration
agent_load[agent_from] -= 1
agent_load[best_receiver] = agent_load.get(best_receiver, 0) + 1
agent_repos[best_receiver].add(repo)
shifts += 1
print(f"Rebalance complete: {shifts} issue(s) shifted.")
return 0 if shifts == 0 else 1 # exit 1 if any shifts happened (signal change)
except HTTPError as e:
print(f"HTTP ERROR {e.code}: {e.reason}", file=sys.stderr)
return 2
except URLError as e:
print(f"NETWORK ERROR: {e.reason}", file=sys.stderr)
return 2
except Exception as exc:
print(f"ERROR: {type(exc).__name__}: {exc}", file=sys.stderr)
return 2
def gitea_post(path: str, data: dict) -> any:
token = load_token()
url = f"{GITEA_API_BASE}{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={"Authorization": f"token {token}", "Content-Type": "application/json"}, method="POST")
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
if __name__ == "__main__":
sys.exit(main())

View File

@@ -249,14 +249,34 @@ while true; do
run_triage "$state_dir"
rm -rf "$state_dir"
# Run workforce manager every 3rd cycle (~15 min)
WORKFORCE_CYCLE=$((WORKFORCE_CYCLE + 1))
if [ $((WORKFORCE_CYCLE % 3)) -eq 0 ]; then
log "Running workforce manager..."
python3 "$HOME/.hermes/bin/workforce-manager.py" all >> "$LOG_DIR/workforce-manager.log" 2>&1
log "Workforce manager complete"
fi
# --- Swarm health monitoring (every cycle) ---
SWARM_HEALTH_OK=1
if [ -x "$SCRIPT_DIR/swarm-health-monitor.py" ]; then
if ! "$SCRIPT_DIR/swarm-health-monitor.py" --quiet >/dev/null 2>&1; then
SWARM_HEALTH_OK=0
log "SWARM HEALTH: warnings detected — check output"
fi
else
log "SWARM HEALTH: script not found at $SCRIPT_DIR/swarm-health-monitor.py"
fi
log "Sleeping ${CYCLE_INTERVAL}s"
sleep "$CYCLE_INTERVAL"
# Run workforce manager every 3rd cycle (~15 min)
WORKFORCE_CYCLE=$((WORKFORCE_CYCLE + 1))
if [ $((WORKFORCE_CYCLE % 3)) -eq 0 ]; then
log "Running workforce manager..."
python3 "$HOME/.hermes/bin/workforce-manager.py" all >> "$LOG_DIR/workforce-manager.log" 2>&1
log "Workforce manager complete"
# Run swarm rebalance (dry-run first; remove --dry-run to shift issues)
if [ -x "$SCRIPT_DIR/swarm-rebalance.py" ]; then
log "Running swarm rebalance (dry-run)..."
"$SCRIPT_DIR/swarm-rebalance.py" --dry-run >> "$LOG_DIR/swarm-rebalance.log" 2>&1
log "Swarm rebalance dry-run complete"
else
log "SWARM REBALANCE: script not found at $SCRIPT_DIR/swarm-rebalance.py"
fi
fi
log "Sleeping ${CYCLE_INTERVAL}s"
sleep "$CYCLE_INTERVAL"
done