remove deprecated workforce-manager.py — replaced by sovereign-orchestration

This commit is contained in:
2026-03-25 14:41:40 +00:00
parent bafd97da5a
commit 4b69918135

View File

@@ -1,405 +0,0 @@
#!/usr/bin/env python3
"""
workforce-manager.py — Autonomous agent workforce management
Three capabilities:
1. AUTO-ASSIGN: Match unassigned issues to the right agent by difficulty
2. QUALITY SCORE: Track merge rate per agent, demote poor performers
3. CREDIT MONITOR: Alert when agent quotas are likely exhausted
Runs as a periodic script called by the orchestrator or cron.
ACCEPTANCE CRITERIA:
Auto-assign:
- Scans all repos for unassigned issues
- Scores issue difficulty (0-10) based on: labels, title keywords, file count
- Maps difficulty to agent tier: hard(8-10)→perplexity, medium(4-7)→gemini/manus, easy(0-3)→kimi
- Assigns via Gitea API, adds appropriate labels
- Never assigns EPICs (those need human decision)
- Never reassigns already-assigned issues
- Respects agent capacity (max concurrent issues per agent)
Quality scoring:
- Pulls all closed PRs from last 7 days per agent
- Calculates: merge rate, avg time to merge, rejection count
- Agents below 40% merge rate get demoted one tier
- Agents above 80% merge rate get promoted one tier
- Writes scorecard to ~/.hermes/logs/agent-scorecards.json
Credit monitoring:
- Tracks daily PR count per agent
- Manus: alert if >250 credits used (300/day limit)
- Loop agents: alert if error rate spikes (likely rate limited)
- Writes alerts to ~/.hermes/logs/workforce-alerts.json
"""
import json
import os
import sys
import time
import urllib.request
from datetime import datetime, timedelta, timezone
from collections import defaultdict
# === CONFIG ===
GITEA_URL = "http://143.198.27.163:3000"
TOKEN_FILE = os.path.expanduser("~/.hermes/gitea_token_vps")
LOG_DIR = os.path.expanduser("~/.hermes/logs")
SCORECARD_FILE = os.path.join(LOG_DIR, "agent-scorecards.json")
ALERTS_FILE = os.path.join(LOG_DIR, "workforce-alerts.json")
REPOS = [
"Timmy_Foundation/the-nexus",
"Timmy_Foundation/autolora",
]
# Agent tiers: which agents handle which difficulty
AGENT_TIERS = {
"heavy": ["perplexity"], # 8-10 difficulty
"medium": ["gemini", "manus"], # 4-7 difficulty
"grunt": ["kimi"], # 0-3 difficulty
}
# Max concurrent issues per agent
MAX_CONCURRENT = {
"perplexity": 2, # one-shot, manual
"manus": 2, # one-shot, 300 credits/day
"gemini": 5, # 3-worker loop
"kimi": 3, # 1-worker loop
"claude": 10, # 10-worker loop, managed by its own loop
}
# Credit limits (daily)
CREDIT_LIMITS = {
"manus": 300,
}
# Keywords that indicate difficulty
HARD_KEYWORDS = [
"sovereignty", "nostr", "nip-", "economic", "architecture",
"protocol", "edge intelligence", "memory graph", "identity",
"cryptograph", "zero-knowledge", "consensus", "p2p",
"distributed", "rlhf", "grpo", "training pipeline",
]
MEDIUM_KEYWORDS = [
"feature", "integration", "api", "websocket", "three.js",
"portal", "dashboard", "visualization", "agent", "deploy",
"docker", "ssl", "infrastructure", "mcp", "inference",
]
EASY_KEYWORDS = [
"refactor", "test", "docstring", "typo", "format", "lint",
"rename", "cleanup", "dead code", "move", "extract",
"add unit test", "fix import", "update readme",
]
def api(method, path, data=None):
"""Make a Gitea API call."""
with open(TOKEN_FILE) as f:
token = f.read().strip()
url = f"{GITEA_URL}/api/v1{path}"
headers = {
"Authorization": f"token {token}",
"Content-Type": "application/json",
}
if data:
req = urllib.request.Request(url, json.dumps(data).encode(), headers, method=method)
else:
req = urllib.request.Request(url, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read())
except Exception as e:
return {"error": str(e)}
def score_difficulty(issue):
"""Score an issue 0-10 based on title, labels, and signals."""
title = issue["title"].lower()
labels = [l["name"].lower() for l in issue.get("labels", [])]
score = 5 # default medium
# EPICs are always 10 (but we skip them for auto-assign)
if "[epic]" in title or "epic:" in title:
return 10
# Label-based scoring
if "p0-critical" in labels:
score += 2
if "p1-important" in labels:
score += 1
if "p2-backlog" in labels:
score -= 1
if "needs-design" in labels:
score += 2
if "sovereignty" in labels or "nostr" in labels:
score += 2
if "infrastructure" in labels:
score += 1
# Keyword-based scoring
for kw in HARD_KEYWORDS:
if kw in title:
score += 2
break
for kw in EASY_KEYWORDS:
if kw in title:
score -= 2
break
return max(0, min(10, score))
def get_agent_for_difficulty(score, current_loads):
"""Pick the best agent for a given difficulty score."""
if score >= 8:
tier = "heavy"
elif score >= 4:
tier = "medium"
else:
tier = "grunt"
candidates = AGENT_TIERS[tier]
# Pick the agent with the most capacity
best = None
best_capacity = -1
for agent in candidates:
max_c = MAX_CONCURRENT.get(agent, 3)
current = current_loads.get(agent, 0)
capacity = max_c - current
if capacity > best_capacity:
best_capacity = capacity
best = agent
if best_capacity <= 0:
# All agents in tier are full, try next tier down
fallback_order = ["medium", "grunt"] if tier == "heavy" else ["grunt"]
for fb_tier in fallback_order:
for agent in AGENT_TIERS[fb_tier]:
max_c = MAX_CONCURRENT.get(agent, 3)
current = current_loads.get(agent, 0)
if max_c - current > 0:
return agent
return None
return best
def auto_assign():
"""Scan repos for unassigned issues and assign to appropriate agents."""
print("=== AUTO-ASSIGN ===")
# Get current agent loads (open issues per agent)
current_loads = defaultdict(int)
all_unassigned = []
for repo in REPOS:
issues = api("GET", f"/repos/{repo}/issues?state=open&type=issues&limit=50")
if isinstance(issues, dict) and "error" in issues:
print(f" ERROR fetching {repo}: {issues['error']}")
continue
for issue in issues:
assignees = [a["login"] for a in (issue.get("assignees") or [])]
if assignees:
for a in assignees:
current_loads[a] += 1
else:
issue["_repo"] = repo
all_unassigned.append(issue)
print(f" Agent loads: {dict(current_loads)}")
print(f" Unassigned issues: {len(all_unassigned)}")
assigned_count = 0
for issue in all_unassigned:
title = issue["title"].lower()
# Skip EPICs — those need human decision
if "[epic]" in title or "epic:" in title:
print(f" SKIP #{issue['number']} (EPIC): {issue['title'][:60]}")
continue
# Skip META/audit/showcase
if "[meta]" in title or "[audit]" in title or "[showcase]" in title:
print(f" SKIP #{issue['number']} (meta): {issue['title'][:60]}")
continue
score = score_difficulty(issue)
agent = get_agent_for_difficulty(score, current_loads)
if agent is None:
print(f" SKIP #{issue['number']} (all agents full): {issue['title'][:60]}")
continue
# Assign
repo = issue["_repo"]
result = api("PATCH", f"/repos/{repo}/issues/{issue['number']}", {
"assignees": [agent]
})
if "error" not in result:
current_loads[agent] += 1
assigned_count += 1
tier = "HEAVY" if score >= 8 else "MEDIUM" if score >= 4 else "GRUNT"
print(f" ASSIGN #{issue['number']} -> {agent} (score={score} {tier}): {issue['title'][:50]}")
else:
print(f" ERROR assigning #{issue['number']}: {result['error']}")
print(f" Assigned {assigned_count} issues this cycle.")
return assigned_count
def quality_score():
"""Calculate merge rate and quality metrics per agent over last 7 days."""
print("\n=== QUALITY SCORING ===")
since = (datetime.now(timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ")
agent_stats = defaultdict(lambda: {"merged": 0, "closed_unmerged": 0, "open": 0, "total": 0})
for repo in REPOS:
# Merged PRs
merged = api("GET", f"/repos/{repo}/pulls?state=closed&sort=updated&limit=50")
if isinstance(merged, dict) and "error" in merged:
continue
for pr in merged:
if pr.get("updated_at", "") < since:
continue
agent = pr["user"]["login"]
agent_stats[agent]["total"] += 1
if pr.get("merged"):
agent_stats[agent]["merged"] += 1
else:
agent_stats[agent]["closed_unmerged"] += 1
# Open PRs
open_prs = api("GET", f"/repos/{repo}/pulls?state=open&limit=50")
if isinstance(open_prs, dict) and "error" in open_prs:
continue
for pr in open_prs:
agent = pr["user"]["login"]
agent_stats[agent]["open"] += 1
agent_stats[agent]["total"] += 1
scorecards = {}
for agent, stats in sorted(agent_stats.items()):
total = stats["total"]
if total == 0:
continue
merge_rate = stats["merged"] / max(total, 1) * 100
# Determine tier adjustment
if merge_rate >= 80:
recommendation = "PROMOTE — high merge rate"
elif merge_rate < 40 and total >= 3:
recommendation = "DEMOTE — low merge rate"
else:
recommendation = "HOLD — acceptable"
scorecards[agent] = {
"merged": stats["merged"],
"closed_unmerged": stats["closed_unmerged"],
"open": stats["open"],
"total": total,
"merge_rate": round(merge_rate, 1),
"recommendation": recommendation,
"updated": datetime.now(timezone.utc).isoformat(),
}
print(f" {agent:15s} merged={stats['merged']:3d} rejected={stats['closed_unmerged']:3d} open={stats['open']:3d} rate={merge_rate:5.1f}% {recommendation}")
# Save scorecards
with open(SCORECARD_FILE, "w") as f:
json.dump(scorecards, f, indent=2)
print(f" Scorecards saved to {SCORECARD_FILE}")
return scorecards
def credit_monitor():
"""Track daily usage per agent and alert on approaching limits."""
print("\n=== CREDIT MONITORING ===")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
daily_counts = defaultdict(int)
for repo in REPOS:
# Count PRs created today per agent
prs = api("GET", f"/repos/{repo}/pulls?state=all&sort=created&limit=50")
if isinstance(prs, dict) and "error" in prs:
continue
for pr in prs:
created = pr.get("created_at", "")[:10]
if created == today:
agent = pr["user"]["login"]
daily_counts[agent] += 1
alerts = []
for agent, count in sorted(daily_counts.items()):
limit = CREDIT_LIMITS.get(agent)
if limit:
pct = count / limit * 100
status = f"{count}/{limit} ({pct:.0f}%)"
if pct >= 80:
alert = f"WARNING: {agent} at {status} daily credits"
alerts.append({"agent": agent, "type": "credit_limit", "message": alert, "time": datetime.now(timezone.utc).isoformat()})
print(f" ⚠️ {alert}")
else:
print(f" {agent:15s} {status}")
else:
print(f" {agent:15s} {count} PRs today (no credit limit)")
# Check loop health via log files
loop_logs = {
"claude": "claude-loop.log",
"gemini": "gemini-loop.log",
"kimi": "kimi-loop.log",
}
for agent, logfile in loop_logs.items():
logpath = os.path.join(LOG_DIR, logfile)
if not os.path.exists(logpath):
continue
# Count errors in last 50 lines
try:
with open(logpath) as f:
lines = f.readlines()[-50:]
errors = sum(1 for l in lines if "FAIL" in l or "ERROR" in l or "rate" in l.lower())
if errors >= 10:
alert = f"WARNING: {agent} loop has {errors} errors in last 50 log lines (possible rate limit)"
alerts.append({"agent": agent, "type": "error_spike", "message": alert, "time": datetime.now(timezone.utc).isoformat()})
print(f" ⚠️ {alert}")
except:
pass
# Save alerts
existing = []
if os.path.exists(ALERTS_FILE):
try:
with open(ALERTS_FILE) as f:
existing = json.load(f)
except:
pass
existing.extend(alerts)
# Keep last 100 alerts
existing = existing[-100:]
with open(ALERTS_FILE, "w") as f:
json.dump(existing, f, indent=2)
if not alerts:
print(" No alerts. All systems nominal.")
return alerts
def main():
os.makedirs(LOG_DIR, exist_ok=True)
mode = sys.argv[1] if len(sys.argv) > 1 else "all"
if mode in ("all", "assign"):
auto_assign()
if mode in ("all", "score"):
quality_score()
if mode in ("all", "credits"):
credit_monitor()
if __name__ == "__main__":
main()