- Moved all agent loop scripts into source control (bin/) - claude-loop.sh, gemini-loop.sh, timmy-orchestrator.sh - workforce-manager.py, agent-dispatch.sh, nexus-merge-bot.sh - ops dashboard scripts (ops-panel, ops-helpers, ops-gitea) - monitoring scripts (timmy-status, timmy-loopstat) - deploy.sh: one-command overlay onto ~/.hermes/ - Updated README with sidecar architecture docs - All loops now target the-nexus + autolora only
406 lines
14 KiB
Python
Executable File
406 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
workforce-manager.py — Autonomous agent workforce management
|
|
|
|
Three capabilities:
|
|
1. AUTO-ASSIGN: Match unassigned issues to the right agent by difficulty
|
|
2. QUALITY SCORE: Track merge rate per agent, demote poor performers
|
|
3. CREDIT MONITOR: Alert when agent quotas are likely exhausted
|
|
|
|
Runs as a periodic script called by the orchestrator or cron.
|
|
|
|
ACCEPTANCE CRITERIA:
|
|
Auto-assign:
|
|
- Scans all repos for unassigned issues
|
|
- Scores issue difficulty (0-10) based on: labels, title keywords, file count
|
|
- Maps difficulty to agent tier: hard(8-10)→perplexity, medium(4-7)→gemini/manus, easy(0-3)→kimi
|
|
- Assigns via Gitea API, adds appropriate labels
|
|
- Never assigns EPICs (those need human decision)
|
|
- Never reassigns already-assigned issues
|
|
- Respects agent capacity (max concurrent issues per agent)
|
|
|
|
Quality scoring:
|
|
- Pulls all closed PRs from last 7 days per agent
|
|
- Calculates: merge rate, avg time to merge, rejection count
|
|
- Agents below 40% merge rate get demoted one tier
|
|
- Agents above 80% merge rate get promoted one tier
|
|
- Writes scorecard to ~/.hermes/logs/agent-scorecards.json
|
|
|
|
Credit monitoring:
|
|
- Tracks daily PR count per agent
|
|
- Manus: alert if >250 credits used (300/day limit)
|
|
- Loop agents: alert if error rate spikes (likely rate limited)
|
|
- Writes alerts to ~/.hermes/logs/workforce-alerts.json
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timedelta, timezone
|
|
from collections import defaultdict
|
|
|
|
# === CONFIG ===
|
|
GITEA_URL = "http://143.198.27.163:3000"
|
|
TOKEN_FILE = os.path.expanduser("~/.hermes/gitea_token_vps")
|
|
LOG_DIR = os.path.expanduser("~/.hermes/logs")
|
|
SCORECARD_FILE = os.path.join(LOG_DIR, "agent-scorecards.json")
|
|
ALERTS_FILE = os.path.join(LOG_DIR, "workforce-alerts.json")
|
|
|
|
REPOS = [
|
|
"Timmy_Foundation/the-nexus",
|
|
"Timmy_Foundation/autolora",
|
|
]
|
|
|
|
# Agent tiers: which agents handle which difficulty
|
|
AGENT_TIERS = {
|
|
"heavy": ["perplexity"], # 8-10 difficulty
|
|
"medium": ["gemini", "manus"], # 4-7 difficulty
|
|
"grunt": ["kimi"], # 0-3 difficulty
|
|
}
|
|
|
|
# Max concurrent issues per agent
|
|
MAX_CONCURRENT = {
|
|
"perplexity": 2, # one-shot, manual
|
|
"manus": 2, # one-shot, 300 credits/day
|
|
"gemini": 5, # 3-worker loop
|
|
"kimi": 3, # 1-worker loop
|
|
"claude": 10, # 10-worker loop, managed by its own loop
|
|
}
|
|
|
|
# Credit limits (daily)
|
|
CREDIT_LIMITS = {
|
|
"manus": 300,
|
|
}
|
|
|
|
# Keywords that indicate difficulty
|
|
HARD_KEYWORDS = [
|
|
"sovereignty", "nostr", "nip-", "economic", "architecture",
|
|
"protocol", "edge intelligence", "memory graph", "identity",
|
|
"cryptograph", "zero-knowledge", "consensus", "p2p",
|
|
"distributed", "rlhf", "grpo", "training pipeline",
|
|
]
|
|
MEDIUM_KEYWORDS = [
|
|
"feature", "integration", "api", "websocket", "three.js",
|
|
"portal", "dashboard", "visualization", "agent", "deploy",
|
|
"docker", "ssl", "infrastructure", "mcp", "inference",
|
|
]
|
|
EASY_KEYWORDS = [
|
|
"refactor", "test", "docstring", "typo", "format", "lint",
|
|
"rename", "cleanup", "dead code", "move", "extract",
|
|
"add unit test", "fix import", "update readme",
|
|
]
|
|
|
|
|
|
def api(method, path, data=None):
|
|
"""Make a Gitea API call."""
|
|
with open(TOKEN_FILE) as f:
|
|
token = f.read().strip()
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
headers = {
|
|
"Authorization": f"token {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
if data:
|
|
req = urllib.request.Request(url, json.dumps(data).encode(), headers, method=method)
|
|
else:
|
|
req = urllib.request.Request(url, headers=headers, method=method)
|
|
try:
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def score_difficulty(issue):
|
|
"""Score an issue 0-10 based on title, labels, and signals."""
|
|
title = issue["title"].lower()
|
|
labels = [l["name"].lower() for l in issue.get("labels", [])]
|
|
score = 5 # default medium
|
|
|
|
# EPICs are always 10 (but we skip them for auto-assign)
|
|
if "[epic]" in title or "epic:" in title:
|
|
return 10
|
|
|
|
# Label-based scoring
|
|
if "p0-critical" in labels:
|
|
score += 2
|
|
if "p1-important" in labels:
|
|
score += 1
|
|
if "p2-backlog" in labels:
|
|
score -= 1
|
|
if "needs-design" in labels:
|
|
score += 2
|
|
if "sovereignty" in labels or "nostr" in labels:
|
|
score += 2
|
|
if "infrastructure" in labels:
|
|
score += 1
|
|
|
|
# Keyword-based scoring
|
|
for kw in HARD_KEYWORDS:
|
|
if kw in title:
|
|
score += 2
|
|
break
|
|
for kw in EASY_KEYWORDS:
|
|
if kw in title:
|
|
score -= 2
|
|
break
|
|
|
|
return max(0, min(10, score))
|
|
|
|
|
|
def get_agent_for_difficulty(score, current_loads):
|
|
"""Pick the best agent for a given difficulty score."""
|
|
if score >= 8:
|
|
tier = "heavy"
|
|
elif score >= 4:
|
|
tier = "medium"
|
|
else:
|
|
tier = "grunt"
|
|
|
|
candidates = AGENT_TIERS[tier]
|
|
# Pick the agent with the most capacity
|
|
best = None
|
|
best_capacity = -1
|
|
for agent in candidates:
|
|
max_c = MAX_CONCURRENT.get(agent, 3)
|
|
current = current_loads.get(agent, 0)
|
|
capacity = max_c - current
|
|
if capacity > best_capacity:
|
|
best_capacity = capacity
|
|
best = agent
|
|
if best_capacity <= 0:
|
|
# All agents in tier are full, try next tier down
|
|
fallback_order = ["medium", "grunt"] if tier == "heavy" else ["grunt"]
|
|
for fb_tier in fallback_order:
|
|
for agent in AGENT_TIERS[fb_tier]:
|
|
max_c = MAX_CONCURRENT.get(agent, 3)
|
|
current = current_loads.get(agent, 0)
|
|
if max_c - current > 0:
|
|
return agent
|
|
return None
|
|
return best
|
|
|
|
|
|
def auto_assign():
|
|
"""Scan repos for unassigned issues and assign to appropriate agents."""
|
|
print("=== AUTO-ASSIGN ===")
|
|
|
|
# Get current agent loads (open issues per agent)
|
|
current_loads = defaultdict(int)
|
|
all_unassigned = []
|
|
|
|
for repo in REPOS:
|
|
issues = api("GET", f"/repos/{repo}/issues?state=open&type=issues&limit=50")
|
|
if isinstance(issues, dict) and "error" in issues:
|
|
print(f" ERROR fetching {repo}: {issues['error']}")
|
|
continue
|
|
for issue in issues:
|
|
assignees = [a["login"] for a in (issue.get("assignees") or [])]
|
|
if assignees:
|
|
for a in assignees:
|
|
current_loads[a] += 1
|
|
else:
|
|
issue["_repo"] = repo
|
|
all_unassigned.append(issue)
|
|
|
|
print(f" Agent loads: {dict(current_loads)}")
|
|
print(f" Unassigned issues: {len(all_unassigned)}")
|
|
|
|
assigned_count = 0
|
|
for issue in all_unassigned:
|
|
title = issue["title"].lower()
|
|
|
|
# Skip EPICs — those need human decision
|
|
if "[epic]" in title or "epic:" in title:
|
|
print(f" SKIP #{issue['number']} (EPIC): {issue['title'][:60]}")
|
|
continue
|
|
|
|
# Skip META/audit/showcase
|
|
if "[meta]" in title or "[audit]" in title or "[showcase]" in title:
|
|
print(f" SKIP #{issue['number']} (meta): {issue['title'][:60]}")
|
|
continue
|
|
|
|
score = score_difficulty(issue)
|
|
agent = get_agent_for_difficulty(score, current_loads)
|
|
|
|
if agent is None:
|
|
print(f" SKIP #{issue['number']} (all agents full): {issue['title'][:60]}")
|
|
continue
|
|
|
|
# Assign
|
|
repo = issue["_repo"]
|
|
result = api("PATCH", f"/repos/{repo}/issues/{issue['number']}", {
|
|
"assignees": [agent]
|
|
})
|
|
|
|
if "error" not in result:
|
|
current_loads[agent] += 1
|
|
assigned_count += 1
|
|
tier = "HEAVY" if score >= 8 else "MEDIUM" if score >= 4 else "GRUNT"
|
|
print(f" ASSIGN #{issue['number']} -> {agent} (score={score} {tier}): {issue['title'][:50]}")
|
|
else:
|
|
print(f" ERROR assigning #{issue['number']}: {result['error']}")
|
|
|
|
print(f" Assigned {assigned_count} issues this cycle.")
|
|
return assigned_count
|
|
|
|
|
|
def quality_score():
|
|
"""Calculate merge rate and quality metrics per agent over last 7 days."""
|
|
print("\n=== QUALITY SCORING ===")
|
|
|
|
since = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
agent_stats = defaultdict(lambda: {"merged": 0, "closed_unmerged": 0, "open": 0, "total": 0})
|
|
|
|
for repo in REPOS:
|
|
# Merged PRs
|
|
merged = api("GET", f"/repos/{repo}/pulls?state=closed&sort=updated&limit=50")
|
|
if isinstance(merged, dict) and "error" in merged:
|
|
continue
|
|
for pr in merged:
|
|
if pr.get("updated_at", "") < since:
|
|
continue
|
|
agent = pr["user"]["login"]
|
|
agent_stats[agent]["total"] += 1
|
|
if pr.get("merged"):
|
|
agent_stats[agent]["merged"] += 1
|
|
else:
|
|
agent_stats[agent]["closed_unmerged"] += 1
|
|
|
|
# Open PRs
|
|
open_prs = api("GET", f"/repos/{repo}/pulls?state=open&limit=50")
|
|
if isinstance(open_prs, dict) and "error" in open_prs:
|
|
continue
|
|
for pr in open_prs:
|
|
agent = pr["user"]["login"]
|
|
agent_stats[agent]["open"] += 1
|
|
agent_stats[agent]["total"] += 1
|
|
|
|
scorecards = {}
|
|
for agent, stats in sorted(agent_stats.items()):
|
|
total = stats["total"]
|
|
if total == 0:
|
|
continue
|
|
merge_rate = stats["merged"] / max(total, 1) * 100
|
|
|
|
# Determine tier adjustment
|
|
if merge_rate >= 80:
|
|
recommendation = "PROMOTE — high merge rate"
|
|
elif merge_rate < 40 and total >= 3:
|
|
recommendation = "DEMOTE — low merge rate"
|
|
else:
|
|
recommendation = "HOLD — acceptable"
|
|
|
|
scorecards[agent] = {
|
|
"merged": stats["merged"],
|
|
"closed_unmerged": stats["closed_unmerged"],
|
|
"open": stats["open"],
|
|
"total": total,
|
|
"merge_rate": round(merge_rate, 1),
|
|
"recommendation": recommendation,
|
|
"updated": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
print(f" {agent:15s} merged={stats['merged']:3d} rejected={stats['closed_unmerged']:3d} open={stats['open']:3d} rate={merge_rate:5.1f}% {recommendation}")
|
|
|
|
# Save scorecards
|
|
with open(SCORECARD_FILE, "w") as f:
|
|
json.dump(scorecards, f, indent=2)
|
|
print(f" Scorecards saved to {SCORECARD_FILE}")
|
|
|
|
return scorecards
|
|
|
|
|
|
def credit_monitor():
|
|
"""Track daily usage per agent and alert on approaching limits."""
|
|
print("\n=== CREDIT MONITORING ===")
|
|
|
|
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
|
daily_counts = defaultdict(int)
|
|
|
|
for repo in REPOS:
|
|
# Count PRs created today per agent
|
|
prs = api("GET", f"/repos/{repo}/pulls?state=all&sort=created&limit=50")
|
|
if isinstance(prs, dict) and "error" in prs:
|
|
continue
|
|
for pr in prs:
|
|
created = pr.get("created_at", "")[:10]
|
|
if created == today:
|
|
agent = pr["user"]["login"]
|
|
daily_counts[agent] += 1
|
|
|
|
alerts = []
|
|
for agent, count in sorted(daily_counts.items()):
|
|
limit = CREDIT_LIMITS.get(agent)
|
|
if limit:
|
|
pct = count / limit * 100
|
|
status = f"{count}/{limit} ({pct:.0f}%)"
|
|
if pct >= 80:
|
|
alert = f"WARNING: {agent} at {status} daily credits"
|
|
alerts.append({"agent": agent, "type": "credit_limit", "message": alert, "time": datetime.now(timezone.utc).isoformat()})
|
|
print(f" ⚠️ {alert}")
|
|
else:
|
|
print(f" {agent:15s} {status}")
|
|
else:
|
|
print(f" {agent:15s} {count} PRs today (no credit limit)")
|
|
|
|
# Check loop health via log files
|
|
loop_logs = {
|
|
"claude": "claude-loop.log",
|
|
"gemini": "gemini-loop.log",
|
|
"kimi": "kimi-loop.log",
|
|
}
|
|
for agent, logfile in loop_logs.items():
|
|
logpath = os.path.join(LOG_DIR, logfile)
|
|
if not os.path.exists(logpath):
|
|
continue
|
|
# Count errors in last 50 lines
|
|
try:
|
|
with open(logpath) as f:
|
|
lines = f.readlines()[-50:]
|
|
errors = sum(1 for l in lines if "FAIL" in l or "ERROR" in l or "rate" in l.lower())
|
|
if errors >= 10:
|
|
alert = f"WARNING: {agent} loop has {errors} errors in last 50 log lines (possible rate limit)"
|
|
alerts.append({"agent": agent, "type": "error_spike", "message": alert, "time": datetime.now(timezone.utc).isoformat()})
|
|
print(f" ⚠️ {alert}")
|
|
except:
|
|
pass
|
|
|
|
# Save alerts
|
|
existing = []
|
|
if os.path.exists(ALERTS_FILE):
|
|
try:
|
|
with open(ALERTS_FILE) as f:
|
|
existing = json.load(f)
|
|
except:
|
|
pass
|
|
existing.extend(alerts)
|
|
# Keep last 100 alerts
|
|
existing = existing[-100:]
|
|
with open(ALERTS_FILE, "w") as f:
|
|
json.dump(existing, f, indent=2)
|
|
|
|
if not alerts:
|
|
print(" No alerts. All systems nominal.")
|
|
|
|
return alerts
|
|
|
|
|
|
def main():
|
|
os.makedirs(LOG_DIR, exist_ok=True)
|
|
|
|
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
|
|
|
|
if mode in ("all", "assign"):
|
|
auto_assign()
|
|
if mode in ("all", "score"):
|
|
quality_score()
|
|
if mode in ("all", "credits"):
|
|
credit_monitor()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|