#!/usr/bin/env python3 """ Workforce Manager - Epic #204 / Milestone #218 Reads fleet routing, Wolf evaluation scores, and open Gitea issues across Timmy_Foundation repos. Assigns each issue to the best-available agent, tracks success rates, and dispatches work. Usage: python workforce-manager.py # Scan, assign, dispatch python workforce-manager.py --dry-run # Show assignments without dispatching python workforce-manager.py --status # Show agent status and open issue count python workforce-manager.py --cron # Run silently, save to log """ import argparse import json import logging import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional try: import requests except ImportError: print("FATAL: requests is required. pip install requests", file=sys.stderr) sys.exit(1) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json" WOLF_RESULTS_DIR = Path.home() / ".hermes" / "wolf" / "results" GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps" GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1" WORKFORCE_STATE_PATH = Path.home() / ".hermes" / "workforce-state.json" ORG_NAME = "Timmy_Foundation" # Role-to-agent-role mapping heuristics ROLE_KEYWORDS = { "code-generation": [ "code", "implement", "feature", "function", "class", "script", "build", "create", "add", "module", "component", ], "issue-triage": [ "triage", "categorize", "tag", "label", "organize", "backlog", "sort", "prioritize", "review issue", ], "on-request-queries": [ "query", "search", "lookup", "find", "check", "info", "report", "status", ], "devops": [ "deploy", "ci", "cd", "pipeline", "docker", "container", "server", "infrastructure", "config", "nginx", "cron", "setup", "install", "environment", "provision", "build", "release", "workflow", ], "documentation": [ "doc", "readme", "document", "write", "guide", "spec", "wiki", "changelog", "tutorial", "explain", "describe", ], "code-review": [ "review", "refactor", "fix", "bug", "debug", "test", "lint", "style", "improve", "clean up", "optimize", "performance", ], "triage-routing": [ "route", "assign", "triage", "dispatch", "organize", "categorize", ], "small-tasks": [ "small", "quick", "minor", "typo", "label", "update", "rename", "cleanup", ], "inactive": [], "unknown": [], } # Priority keywords (higher = more urgent, route to more capable agent) PRIORITY_KEYWORDS = { "critical": 5, "urgent": 4, "block": 4, "bug": 3, "fix": 3, "security": 5, "deploy": 2, "feature": 1, "enhancement": 1, "documentation": 1, "cleanup": 0, } # Cost tier priority (lower index = prefer first) TIER_ORDER = ["free", "cheap", "prepaid", "unknown"] # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- def load_json(path: Path) -> Any: if not path.exists(): logging.warning("File not found: %s", path) return None with open(path) as f: return json.load(f) def load_fleet_routing() -> List[dict]: data = load_json(FLEET_ROUTING_PATH) if data and "agents" in data: return data["agents"] return [] def load_wolf_scores() -> Dict[str, dict]: """Load Wolf evaluation scores from results directory.""" scores: Dict[str, dict] = {} if not WOLF_RESULTS_DIR.exists(): return scores for f in sorted(WOLF_RESULTS_DIR.glob("*.json")): data = load_json(f) if data and "model_scores" in data: for entry in data["model_scores"]: model = entry.get("model", "") if model: scores[model] = entry return scores def load_workforce_state() -> dict: if WORKFORCE_STATE_PATH.exists(): return load_json(WORKFORCE_STATE_PATH) or {} return {"assignments": [], "agent_stats": {}, "last_run": None} def save_workforce_state(state: dict) -> None: WORKFORCE_STATE_PATH.parent.mkdir(parents=True, exist_ok=True) with open(WORKFORCE_STATE_PATH, "w") as f: json.dump(state, f, indent=2) logging.info("Workforce state saved to %s", WORKFORCE_STATE_PATH) # --------------------------------------------------------------------------- # Gitea API # --------------------------------------------------------------------------- class GiteaAPI: """Thin wrapper for Gitea REST API.""" def __init__(self, token: str, base_url: str = GITEA_API_BASE): self.base_url = base_url.rstrip("/") self.session = requests.Session() self.session.headers.update({ "Authorization": f"token {token}", "Accept": "application/json", "Content-Type": "application/json", }) def _get(self, path: str, params: Optional[dict] = None) -> Any: r = self.session.get(f"{self.base_url}{path}", params=params) r.raise_for_status() return r.json() def _post(self, path: str, data: dict) -> Any: r = self.session.post(f"{self.base_url}{path}", json=data) r.raise_for_status() return r.json() def _patch(self, path: str, data: dict) -> Any: r = self.session.patch(f"{self.base_url}{path}", json=data) r.raise_for_status() return r.json() def get_org_repos(self, org: str) -> List[dict]: return self._get(f"/orgs/{org}/repos", params={"limit": 100}) def get_open_issues(self, owner: str, repo: str, page: int = 1) -> List[dict]: params = {"state": "open", "type": "issues", "limit": 50, "page": page} return self._get(f"/repos/{owner}/{repo}/issues", params=params) def get_all_open_issues(self, org: str) -> List[dict]: """Fetch all open issues across all org repos.""" repos = self.get_org_repos(org) all_issues = [] for repo in repos: name = repo["name"] try: # Paginate through all issues page = 1 while True: issues = self.get_open_issues(org, name, page=page) if not issues: break all_issues.extend(issues) if len(issues) < 50: break page += 1 logging.info("Loaded %d open issues from %s/%s", len(all_issues), org, name) except Exception as exc: logging.warning("Failed to load issues from %s/%s: %s", org, name, exc) return all_issues def add_issue_comment(self, owner: str, repo: str, issue_num: int, body: str) -> dict: return self._post(f"/repos/{owner}/{repo}/issues/{issue_num}/comments", {"body": body}) def add_issue_label(self, owner: str, repo: str, issue_num: int, label: str) -> dict: return self._post( f"/repos/{owner}/{repo}/issues/{issue_num}/labels", {"labels": [label]}, ) def assign_issue(self, owner: str, repo: str, issue_num: int, assignee: str) -> dict: return self._patch( f"/repos/{owner}/{repo}/issues/{issue_num}", {"assignees": [assignee]}, ) # --------------------------------------------------------------------------- # Scoring & Assignment Logic # --------------------------------------------------------------------------- def classify_issue(issue: dict) -> str: """Determine the best agent role for an issue based on title/body.""" title = (issue.get("title", "") or "").lower() body = (issue.get("body", "") or "").lower() text = f"{title} {body}" labels = [l.get("name", "").lower() for l in issue.get("labels", [])] text += " " + " ".join(labels) best_role = "small-tasks" # default best_score = 0 for role, keywords in ROLE_KEYWORDS.items(): if not keywords: continue score = sum(2 for kw in keywords if kw in text) # Boost if a matching label exists for label in labels: if any(kw in label for kw in keywords): score += 3 if score > best_score: best_score = score best_role = role return best_role def compute_priority(issue: dict) -> int: """Compute issue priority from keywords.""" title = (issue.get("title", "") or "").lower() body = (issue.get("body", "") or "").lower() text = f"{title} {body}" return sum(v for k, v in PRIORITY_KEYWORDS.items() if k in text) def score_agent_for_issue(agent: dict, role: str, wolf_scores: dict, priority: int) -> float: """Score how well an agent matches an issue. Higher is better.""" score = 0.0 # Primary: role match agent_role = agent.get("role", "unknown") if agent_role == role: score += 10.0 elif agent_role == "small-tasks" and role in ("issue-triage", "on-request-queries"): score += 6.0 elif agent_role == "triage-routing" and role in ("issue-triage", "triage-routing"): score += 8.0 elif agent_role == "code-generation" and role in ("code-review", "devops"): score += 4.0 # Wolf quality bonus model = agent.get("model", "") wolf_entry = None for wm, ws in wolf_scores.items(): if model and model.lower() in wm.lower(): wolf_entry = ws break if wolf_entry and wolf_entry.get("success"): score += wolf_entry.get("total", 0) * 3.0 # Cost efficiency: prefer free/cheap for low priority tier = agent.get("tier", "unknown") tier_idx = TIER_ORDER.index(tier) if tier in TIER_ORDER else 3 if priority <= 1 and tier in ("free", "cheap"): score += 4.0 elif priority >= 3 and tier in ("prepaid",): score += 3.0 else: score += (3 - tier_idx) * 1.0 # Activity bonus if agent.get("active", False): score += 2.0 # Repo familiarity bonus: more repos slightly better repo_count = agent.get("repo_count", 0) score += min(repo_count * 0.2, 2.0) return round(score, 3) def find_best_agent(agents: List[dict], role: str, wolf_scores: dict, priority: int, exclude: Optional[List[str]] = None) -> Optional[dict]: """Find the best agent for the given role and priority.""" exclude = exclude or [] candidates = [] for agent in agents: if agent.get("name") in exclude: continue if not agent.get("active", False): continue s = score_agent_for_issue(agent, role, wolf_scores, priority) candidates.append((s, agent)) if not candidates: return None candidates.sort(key=lambda x: x[0], reverse=True) return candidates[0][1] # --------------------------------------------------------------------------- # Dispatch # --------------------------------------------------------------------------- def dispatch_assignment(api: GiteaAPI, issue: dict, agent: dict, dry_run: bool = False) -> dict: """Assign an issue to an agent and optionally post a comment.""" owner = ORG_NAME repo = issue.get("repository", {}).get("name", "") # Extract repo from issue repo_url if not in the repository key if not repo: repo_url = issue.get("repository_url", "") if repo_url: repo = repo_url.rstrip("/").split("/")[-1] if not repo: return {"success": False, "error": "Cannot determine repository for issue"} issue_num = issue.get("number") agent_name = agent.get("name", "unknown") comment_body = ( f"🤖 **Workforce Manager assigned this issue to: @{agent_name}**\n\n" f"- **Agent:** {agent_name}\n" f"- **Model:** {agent.get('model', 'unknown')}\n" f"- **Role:** {agent.get('role', 'unknown')}\n" f"- **Tier:** {agent.get('tier', 'unknown')}\n" f"- **Assigned at:** {datetime.now(timezone.utc).isoformat()}\n\n" f"*Automated assignment by Workforce Manager (Epic #204)*" ) if dry_run: return { "success": True, "dry_run": True, "repo": repo, "issue_number": issue_num, "assignee": agent_name, "comment": comment_body, } try: api.assign_issue(owner, repo, issue_num, agent_name) api.add_issue_comment(owner, repo, issue_num, comment_body) return { "success": True, "repo": repo, "issue_number": issue_num, "issue_title": issue.get("title", ""), "assignee": agent_name, } except Exception as exc: return { "success": False, "repo": repo, "issue_number": issue_num, "error": str(exc), } # --------------------------------------------------------------------------- # State Tracking # --------------------------------------------------------------------------- def update_agent_stats(state: dict, result: dict) -> None: """Update per-agent success tracking.""" agent_name = result.get("assignee", "unknown") if "agent_stats" not in state: state["agent_stats"] = {} if agent_name not in state["agent_stats"]: state["agent_stats"][agent_name] = { "total_assigned": 0, "successful": 0, "failed": 0, "success_rate": 0.0, "last_assignment": None, "assigned_issues": [], } stats = state["agent_stats"][agent_name] stats["total_assigned"] += 1 stats["last_assignment"] = datetime.now(timezone.utc).isoformat() stats["assigned_issues"].append({ "repo": result.get("repo", ""), "issue_number": result.get("issue_number"), "success": result.get("success", False), "timestamp": datetime.now(timezone.utc).isoformat(), }) if result.get("success"): stats["successful"] += 1 else: stats["failed"] += 1 total = stats["successful"] + stats["failed"] stats["success_rate"] = round(stats["successful"] / total, 3) if total > 0 else 0.0 def print_status(state: dict, agents: List[dict], issues_count: int) -> None: """Print workforce status.""" print(f"\n{'=' * 60}") print(f"Workforce Manager Status - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") print(f"{'=' * 60}") # Fleet summary active = [a for a in agents if a.get("active")] print(f"\nFleet: {len(active)} active agents, {len(agents)} total") tier_counts = {} for a in active: t = a.get("tier", "unknown") tier_counts[t] = tier_counts.get(t, 0) + 1 for t, c in sorted(tier_counts.items()): print(f" {t}: {c} agents") # Agent scores wolf = load_wolf_scores() print(f"\nAgent Details:") print(f" {'Name':<25} {'Model':<30} {'Role':<18} {'Tier':<10}") for a in agents: if not a.get("active"): continue stats = state.get("agent_stats", {}).get(a["name"], {}) rate = stats.get("success_rate", 0.0) total = stats.get("total_assigned", 0) wolf_badge = "" for wm, ws in wolf.items(): if a["model"] and a["model"].lower() in wm.lower() and ws.get("success"): wolf_badge = f"[wolf:{ws['total']}]" break name_str = f"{a['name']} {wolf_badge}" if total > 0: name_str += f" (s/r: {rate}, n={total})" print(f" {name_str:<45} {a.get('role', 'unknown'):<18} {a.get('tier', '?'):<10}") print(f"\nOpen Issues: {issues_count}") print(f"Assignments Made: {len(state.get('assignments', []))}") if state.get("last_run"): print(f"Last Run: {state['last_run']}") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(description="Workforce Manager - Assign Gitea issues to AI agents") parser.add_argument("--dry-run", action="store_true", help="Show assignments without dispatching") parser.add_argument("--status", action="store_true", help="Show workforce status only") parser.add_argument("--cron", action="store_true", help="Run silently for cron scheduling") parser.add_argument("--label", type=str, help="Only process issues with this label") parser.add_argument("--max-issues", type=int, default=100, help="Max issues to process per run") args = parser.parse_args() # Setup logging if args.cron: logging.basicConfig(level=logging.WARNING, format="%(asctime)s [%(levelname)s] %(message)s") else: logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logging.info("Workforce Manager starting") # Load data agents = load_fleet_routing() if not agents: logging.error("No agents found in fleet-routing.json") return 1 logging.info("Loaded %d agents from fleet routing", len(agents)) wolf_scores = load_wolf_scores() if wolf_scores: logging.info("Loaded %d model scores from Wolf results", len(wolf_scores)) state = load_workforce_state() # Load Gitea token if GITEA_TOKEN_PATH.exists(): token = GITEA_TOKEN_PATH.read_text().strip() else: logging.error("Gitea token not found at %s", GITEA_TOKEN_PATH) return 1 api = GiteaAPI(token) # Status-only mode if args.status: # Quick open issue count repos = api.get_org_repos(ORG_NAME) total = sum(r.get("open_issues_count", 0) for r in repos) print_status(state, agents, total) return 0 # Fetch open issues if not args.cron: print(f"Scanning open issues across {ORG_NAME} repos...") issues = api.get_all_open_issues(ORG_NAME) # Filter by label if args.label: issues = [ i for i in issues if any(args.label in (l.get("name", "") or "").lower() for l in i.get("labels", [])) ] if args.label: logging.info("Filtered to %d issues with label '%s'", len(issues), args.label) else: logging.info("Found %d open issues", len(issues)) # Skip issues already assigned already_assigned_nums = set() for a in state.get("assignments", []): already_assigned_nums.add((a.get("repo"), a.get("issue_number"))) issues = [ i for i in issues if not i.get("assignee") and not (i.get("repository", {}).get("name"), i.get("number")) in already_assigned_nums ] logging.info("%d unassigned issues to process", len(issues)) # Sort by priority issues_with_priority = [(compute_priority(i), i) for i in issues] issues_with_priority.sort(key=lambda x: x[0], reverse=True) issues = [i for _, i in issues_with_priority[:args.max_issues]] # Assign issues assignments = [] agent_exclusions: Dict[str, List[str]] = {} # repo -> list of assigned agents per run global_exclusions: List[str] = [] # agents already at capacity per run max_per_agent_per_run = 5 for issue in issues: role = classify_issue(issue) priority = compute_priority(issue) repo = issue.get("repository", {}).get("name", "") # Avoid assigning same agent twice to same repo in one run repo_excluded = agent_exclusions.get(repo, []) # Also exclude agents already at assignment cap cap_excluded = [ name for name, stats in state.get("agent_stats", {}).items() if stats.get("total_assigned", 0) > max_per_agent_per_run ] excluded = list(set(repo_excluded + global_exclusions + cap_excluded)) agent = find_best_agent(agents, role, wolf_scores, priority, exclude=excluded) if not agent: # Relax exclusions if no agent found agent = find_best_agent(agents, role, wolf_scores, priority, exclude=[]) if not agent: logging.warning("No suitable agent for issue #%d: %s (role=%s)", issue.get("number"), issue.get("title", ""), role) continue result = dispatch_assignment(api, issue, agent, dry_run=args.dry_run) assignments.append(result) update_agent_stats(state, result) # Track per-repo exclusions if repo not in agent_exclusions: agent_exclusions[repo] = [] agent_exclusions[repo].append(agent["name"]) if args.dry_run: print(f" [DRY] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})") else: status_str = "OK" if result.get("success") else "FAIL" print(f" [{status_str}] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})") # Save state state["assignments"].extend([{ "repo": a.get("repo"), "issue_number": a.get("issue_number"), "assignee": a.get("assignee"), "success": a.get("success", False), "timestamp": datetime.now(timezone.utc).isoformat(), } for a in assignments]) state["last_run"] = datetime.now(timezone.utc).isoformat() save_workforce_state(state) # Summary ok = sum(1 for a in assignments if a.get("success")) fail = len(assignments) - ok logging.info("Done: %d assigned, %d succeeded, %d failed", len(assignments), ok, fail) if not args.cron: print(f"\n{'=' * 60}") print(f"Summary: {len(assignments)} assignments, {ok} OK, {fail} failed") # Show agent stats for name, stats in state.get("agent_stats", {}).items(): if stats.get("total_assigned", 0) > 0: print(f" @{name}: {stats['successful']}/{stats['total_assigned']} ({stats.get('success_rate', 0):.0%} success)") print(f"{'=' * 60}") return 0 if __name__ == "__main__": sys.exit(main())