#!/usr/bin/env python3 """ workforce-manager.py — Autonomous agent workforce management Three capabilities: 1. AUTO-ASSIGN: Match unassigned issues to the right agent by difficulty 2. QUALITY SCORE: Track merge rate per agent, demote poor performers 3. CREDIT MONITOR: Alert when agent quotas are likely exhausted Runs as a periodic script called by the orchestrator or cron. ACCEPTANCE CRITERIA: Auto-assign: - Scans all repos for unassigned issues - Scores issue difficulty (0-10) based on: labels, title keywords, file count - Maps difficulty to agent tier: hard(8-10)→perplexity, medium(4-7)→gemini/manus, easy(0-3)→kimi - Assigns via Gitea API, adds appropriate labels - Never assigns EPICs (those need human decision) - Never reassigns already-assigned issues - Respects agent capacity (max concurrent issues per agent) Quality scoring: - Pulls all closed PRs from last 7 days per agent - Calculates: merge rate, avg time to merge, rejection count - Agents below 40% merge rate get demoted one tier - Agents above 80% merge rate get promoted one tier - Writes scorecard to ~/.hermes/logs/agent-scorecards.json Credit monitoring: - Tracks daily PR count per agent - Manus: alert if >250 credits used (300/day limit) - Loop agents: alert if error rate spikes (likely rate limited) - Writes alerts to ~/.hermes/logs/workforce-alerts.json """ import json import os import sys import time import urllib.request from datetime import datetime, timedelta, timezone from collections import defaultdict # === CONFIG === GITEA_URL = "http://143.198.27.163:3000" TOKEN_FILE = os.path.expanduser("~/.hermes/gitea_token_vps") LOG_DIR = os.path.expanduser("~/.hermes/logs") SCORECARD_FILE = os.path.join(LOG_DIR, "agent-scorecards.json") ALERTS_FILE = os.path.join(LOG_DIR, "workforce-alerts.json") REPOS = [ "Timmy_Foundation/the-nexus", "Timmy_Foundation/autolora", ] # Agent tiers: which agents handle which difficulty AGENT_TIERS = { "heavy": ["perplexity"], # 8-10 difficulty "medium": ["gemini", "manus"], # 4-7 difficulty "grunt": ["kimi"], # 0-3 difficulty } # Max concurrent issues per agent MAX_CONCURRENT = { "perplexity": 2, # one-shot, manual "manus": 2, # one-shot, 300 credits/day "gemini": 5, # 3-worker loop "kimi": 3, # 1-worker loop "claude": 10, # 10-worker loop, managed by its own loop } # Credit limits (daily) CREDIT_LIMITS = { "manus": 300, } # Keywords that indicate difficulty HARD_KEYWORDS = [ "sovereignty", "nostr", "nip-", "economic", "architecture", "protocol", "edge intelligence", "memory graph", "identity", "cryptograph", "zero-knowledge", "consensus", "p2p", "distributed", "rlhf", "grpo", "training pipeline", ] MEDIUM_KEYWORDS = [ "feature", "integration", "api", "websocket", "three.js", "portal", "dashboard", "visualization", "agent", "deploy", "docker", "ssl", "infrastructure", "mcp", "inference", ] EASY_KEYWORDS = [ "refactor", "test", "docstring", "typo", "format", "lint", "rename", "cleanup", "dead code", "move", "extract", "add unit test", "fix import", "update readme", ] def api(method, path, data=None): """Make a Gitea API call.""" with open(TOKEN_FILE) as f: token = f.read().strip() url = f"{GITEA_URL}/api/v1{path}" headers = { "Authorization": f"token {token}", "Content-Type": "application/json", } if data: req = urllib.request.Request(url, json.dumps(data).encode(), headers, method=method) else: req = urllib.request.Request(url, headers=headers, method=method) try: resp = urllib.request.urlopen(req, timeout=15) return json.loads(resp.read()) except Exception as e: return {"error": str(e)} def score_difficulty(issue): """Score an issue 0-10 based on title, labels, and signals.""" title = issue["title"].lower() labels = [l["name"].lower() for l in issue.get("labels", [])] score = 5 # default medium # EPICs are always 10 (but we skip them for auto-assign) if "[epic]" in title or "epic:" in title: return 10 # Label-based scoring if "p0-critical" in labels: score += 2 if "p1-important" in labels: score += 1 if "p2-backlog" in labels: score -= 1 if "needs-design" in labels: score += 2 if "sovereignty" in labels or "nostr" in labels: score += 2 if "infrastructure" in labels: score += 1 # Keyword-based scoring for kw in HARD_KEYWORDS: if kw in title: score += 2 break for kw in EASY_KEYWORDS: if kw in title: score -= 2 break return max(0, min(10, score)) def get_agent_for_difficulty(score, current_loads): """Pick the best agent for a given difficulty score.""" if score >= 8: tier = "heavy" elif score >= 4: tier = "medium" else: tier = "grunt" candidates = AGENT_TIERS[tier] # Pick the agent with the most capacity best = None best_capacity = -1 for agent in candidates: max_c = MAX_CONCURRENT.get(agent, 3) current = current_loads.get(agent, 0) capacity = max_c - current if capacity > best_capacity: best_capacity = capacity best = agent if best_capacity <= 0: # All agents in tier are full, try next tier down fallback_order = ["medium", "grunt"] if tier == "heavy" else ["grunt"] for fb_tier in fallback_order: for agent in AGENT_TIERS[fb_tier]: max_c = MAX_CONCURRENT.get(agent, 3) current = current_loads.get(agent, 0) if max_c - current > 0: return agent return None return best def auto_assign(): """Scan repos for unassigned issues and assign to appropriate agents.""" print("=== AUTO-ASSIGN ===") # Get current agent loads (open issues per agent) current_loads = defaultdict(int) all_unassigned = [] for repo in REPOS: issues = api("GET", f"/repos/{repo}/issues?state=open&type=issues&limit=50") if isinstance(issues, dict) and "error" in issues: print(f" ERROR fetching {repo}: {issues['error']}") continue for issue in issues: assignees = [a["login"] for a in (issue.get("assignees") or [])] if assignees: for a in assignees: current_loads[a] += 1 else: issue["_repo"] = repo all_unassigned.append(issue) print(f" Agent loads: {dict(current_loads)}") print(f" Unassigned issues: {len(all_unassigned)}") assigned_count = 0 for issue in all_unassigned: title = issue["title"].lower() # Skip EPICs — those need human decision if "[epic]" in title or "epic:" in title: print(f" SKIP #{issue['number']} (EPIC): {issue['title'][:60]}") continue # Skip META/audit/showcase if "[meta]" in title or "[audit]" in title or "[showcase]" in title: print(f" SKIP #{issue['number']} (meta): {issue['title'][:60]}") continue score = score_difficulty(issue) agent = get_agent_for_difficulty(score, current_loads) if agent is None: print(f" SKIP #{issue['number']} (all agents full): {issue['title'][:60]}") continue # Assign repo = issue["_repo"] result = api("PATCH", f"/repos/{repo}/issues/{issue['number']}", { "assignees": [agent] }) if "error" not in result: current_loads[agent] += 1 assigned_count += 1 tier = "HEAVY" if score >= 8 else "MEDIUM" if score >= 4 else "GRUNT" print(f" ASSIGN #{issue['number']} -> {agent} (score={score} {tier}): {issue['title'][:50]}") else: print(f" ERROR assigning #{issue['number']}: {result['error']}") print(f" Assigned {assigned_count} issues this cycle.") return assigned_count def quality_score(): """Calculate merge rate and quality metrics per agent over last 7 days.""" print("\n=== QUALITY SCORING ===") since = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ") agent_stats = defaultdict(lambda: {"merged": 0, "closed_unmerged": 0, "open": 0, "total": 0}) for repo in REPOS: # Merged PRs merged = api("GET", f"/repos/{repo}/pulls?state=closed&sort=updated&limit=50") if isinstance(merged, dict) and "error" in merged: continue for pr in merged: if pr.get("updated_at", "") < since: continue agent = pr["user"]["login"] agent_stats[agent]["total"] += 1 if pr.get("merged"): agent_stats[agent]["merged"] += 1 else: agent_stats[agent]["closed_unmerged"] += 1 # Open PRs open_prs = api("GET", f"/repos/{repo}/pulls?state=open&limit=50") if isinstance(open_prs, dict) and "error" in open_prs: continue for pr in open_prs: agent = pr["user"]["login"] agent_stats[agent]["open"] += 1 agent_stats[agent]["total"] += 1 scorecards = {} for agent, stats in sorted(agent_stats.items()): total = stats["total"] if total == 0: continue merge_rate = stats["merged"] / max(total, 1) * 100 # Determine tier adjustment if merge_rate >= 80: recommendation = "PROMOTE — high merge rate" elif merge_rate < 40 and total >= 3: recommendation = "DEMOTE — low merge rate" else: recommendation = "HOLD — acceptable" scorecards[agent] = { "merged": stats["merged"], "closed_unmerged": stats["closed_unmerged"], "open": stats["open"], "total": total, "merge_rate": round(merge_rate, 1), "recommendation": recommendation, "updated": datetime.now(timezone.utc).isoformat(), } print(f" {agent:15s} merged={stats['merged']:3d} rejected={stats['closed_unmerged']:3d} open={stats['open']:3d} rate={merge_rate:5.1f}% {recommendation}") # Save scorecards with open(SCORECARD_FILE, "w") as f: json.dump(scorecards, f, indent=2) print(f" Scorecards saved to {SCORECARD_FILE}") return scorecards def credit_monitor(): """Track daily usage per agent and alert on approaching limits.""" print("\n=== CREDIT MONITORING ===") today = datetime.now(timezone.utc).strftime("%Y-%m-%d") daily_counts = defaultdict(int) for repo in REPOS: # Count PRs created today per agent prs = api("GET", f"/repos/{repo}/pulls?state=all&sort=created&limit=50") if isinstance(prs, dict) and "error" in prs: continue for pr in prs: created = pr.get("created_at", "")[:10] if created == today: agent = pr["user"]["login"] daily_counts[agent] += 1 alerts = [] for agent, count in sorted(daily_counts.items()): limit = CREDIT_LIMITS.get(agent) if limit: pct = count / limit * 100 status = f"{count}/{limit} ({pct:.0f}%)" if pct >= 80: alert = f"WARNING: {agent} at {status} daily credits" alerts.append({"agent": agent, "type": "credit_limit", "message": alert, "time": datetime.now(timezone.utc).isoformat()}) print(f" ⚠️ {alert}") else: print(f" {agent:15s} {status}") else: print(f" {agent:15s} {count} PRs today (no credit limit)") # Check loop health via log files loop_logs = { "claude": "claude-loop.log", "gemini": "gemini-loop.log", "kimi": "kimi-loop.log", } for agent, logfile in loop_logs.items(): logpath = os.path.join(LOG_DIR, logfile) if not os.path.exists(logpath): continue # Count errors in last 50 lines try: with open(logpath) as f: lines = f.readlines()[-50:] errors = sum(1 for l in lines if "FAIL" in l or "ERROR" in l or "rate" in l.lower()) if errors >= 10: alert = f"WARNING: {agent} loop has {errors} errors in last 50 log lines (possible rate limit)" alerts.append({"agent": agent, "type": "error_spike", "message": alert, "time": datetime.now(timezone.utc).isoformat()}) print(f" ⚠️ {alert}") except: pass # Save alerts existing = [] if os.path.exists(ALERTS_FILE): try: with open(ALERTS_FILE) as f: existing = json.load(f) except: pass existing.extend(alerts) # Keep last 100 alerts existing = existing[-100:] with open(ALERTS_FILE, "w") as f: json.dump(existing, f, indent=2) if not alerts: print(" No alerts. All systems nominal.") return alerts def main(): os.makedirs(LOG_DIR, exist_ok=True) mode = sys.argv[1] if len(sys.argv) > 1 else "all" if mode in ("all", "assign"): auto_assign() if mode in ("all", "score"): quality_score() if mode in ("all", "credits"): credit_monitor() if __name__ == "__main__": main()