diff --git a/workforce-manager.py b/workforce-manager.py new file mode 100644 index 00000000..7243ead1 --- /dev/null +++ b/workforce-manager.py @@ -0,0 +1,633 @@ +#!/usr/bin/env python3 +""" +Workforce Manager - Epic #204 / Milestone #218 + +Reads fleet routing, Wolf evaluation scores, and open Gitea issues across +Timmy_Foundation repos. Assigns each issue to the best-available agent, +tracks success rates, and dispatches work. + +Usage: + python workforce-manager.py # Scan, assign, dispatch + python workforce-manager.py --dry-run # Show assignments without dispatching + python workforce-manager.py --status # Show agent status and open issue count + python workforce-manager.py --cron # Run silently, save to log +""" + +import argparse +import json +import logging +import os +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + +try: + import requests +except ImportError: + print("FATAL: requests is required. pip install requests", file=sys.stderr) + sys.exit(1) + + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json" +WOLF_RESULTS_DIR = Path.home() / ".hermes" / "wolf" / "results" +GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps" +GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1" +WORKFORCE_STATE_PATH = Path.home() / ".hermes" / "workforce-state.json" +ORG_NAME = "Timmy_Foundation" + +# Role-to-agent-role mapping heuristics +ROLE_KEYWORDS = { + "code-generation": [ + "code", "implement", "feature", "function", "class", "script", + "build", "create", "add", "module", "component", + ], + "issue-triage": [ + "triage", "categorize", "tag", "label", "organize", + "backlog", "sort", "prioritize", "review issue", + ], + "on-request-queries": [ + "query", "search", "lookup", "find", "check", + "info", "report", "status", + ], + "devops": [ + "deploy", "ci", "cd", "pipeline", "docker", "container", + "server", "infrastructure", "config", "nginx", "cron", + "setup", "install", "environment", "provision", + "build", "release", "workflow", + ], + "documentation": [ + "doc", "readme", "document", "write", "guide", + "spec", "wiki", "changelog", "tutorial", + "explain", "describe", + ], + "code-review": [ + "review", "refactor", "fix", "bug", "debug", + "test", "lint", "style", "improve", + "clean up", "optimize", "performance", + ], + "triage-routing": [ + "route", "assign", "triage", "dispatch", + "organize", "categorize", + ], + "small-tasks": [ + "small", "quick", "minor", "typo", "label", + "update", "rename", "cleanup", + ], + "inactive": [], + "unknown": [], +} + +# Priority keywords (higher = more urgent, route to more capable agent) +PRIORITY_KEYWORDS = { + "critical": 5, + "urgent": 4, + "block": 4, + "bug": 3, + "fix": 3, + "security": 5, + "deploy": 2, + "feature": 1, + "enhancement": 1, + "documentation": 1, + "cleanup": 0, +} + +# Cost tier priority (lower index = prefer first) +TIER_ORDER = ["free", "cheap", "prepaid", "unknown"] + + +# --------------------------------------------------------------------------- +# Data loading +# --------------------------------------------------------------------------- + +def load_json(path: Path) -> Any: + if not path.exists(): + logging.warning("File not found: %s", path) + return None + with open(path) as f: + return json.load(f) + + +def load_fleet_routing() -> List[dict]: + data = load_json(FLEET_ROUTING_PATH) + if data and "agents" in data: + return data["agents"] + return [] + + +def load_wolf_scores() -> Dict[str, dict]: + """Load Wolf evaluation scores from results directory.""" + scores: Dict[str, dict] = {} + if not WOLF_RESULTS_DIR.exists(): + return scores + for f in sorted(WOLF_RESULTS_DIR.glob("*.json")): + data = load_json(f) + if data and "model_scores" in data: + for entry in data["model_scores"]: + model = entry.get("model", "") + if model: + scores[model] = entry + return scores + + +def load_workforce_state() -> dict: + if WORKFORCE_STATE_PATH.exists(): + return load_json(WORKFORCE_STATE_PATH) or {} + return {"assignments": [], "agent_stats": {}, "last_run": None} + + +def save_workforce_state(state: dict) -> None: + WORKFORCE_STATE_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(WORKFORCE_STATE_PATH, "w") as f: + json.dump(state, f, indent=2) + logging.info("Workforce state saved to %s", WORKFORCE_STATE_PATH) + + +# --------------------------------------------------------------------------- +# Gitea API +# --------------------------------------------------------------------------- + +class GiteaAPI: + """Thin wrapper for Gitea REST API.""" + + def __init__(self, token: str, base_url: str = GITEA_API_BASE): + self.base_url = base_url.rstrip("/") + self.session = requests.Session() + self.session.headers.update({ + "Authorization": f"token {token}", + "Accept": "application/json", + "Content-Type": "application/json", + }) + + def _get(self, path: str, params: Optional[dict] = None) -> Any: + r = self.session.get(f"{self.base_url}{path}", params=params) + r.raise_for_status() + return r.json() + + def _post(self, path: str, data: dict) -> Any: + r = self.session.post(f"{self.base_url}{path}", json=data) + r.raise_for_status() + return r.json() + + def _patch(self, path: str, data: dict) -> Any: + r = self.session.patch(f"{self.base_url}{path}", json=data) + r.raise_for_status() + return r.json() + + def get_org_repos(self, org: str) -> List[dict]: + return self._get(f"/orgs/{org}/repos", params={"limit": 100}) + + def get_open_issues(self, owner: str, repo: str, page: int = 1) -> List[dict]: + params = {"state": "open", "type": "issues", "limit": 50, "page": page} + return self._get(f"/repos/{owner}/{repo}/issues", params=params) + + def get_all_open_issues(self, org: str) -> List[dict]: + """Fetch all open issues across all org repos.""" + repos = self.get_org_repos(org) + all_issues = [] + for repo in repos: + name = repo["name"] + try: + # Paginate through all issues + page = 1 + while True: + issues = self.get_open_issues(org, name, page=page) + if not issues: + break + all_issues.extend(issues) + if len(issues) < 50: + break + page += 1 + logging.info("Loaded %d open issues from %s/%s", len(all_issues), org, name) + except Exception as exc: + logging.warning("Failed to load issues from %s/%s: %s", org, name, exc) + return all_issues + + def add_issue_comment(self, owner: str, repo: str, issue_num: int, body: str) -> dict: + return self._post(f"/repos/{owner}/{repo}/issues/{issue_num}/comments", {"body": body}) + + def add_issue_label(self, owner: str, repo: str, issue_num: int, label: str) -> dict: + return self._post( + f"/repos/{owner}/{repo}/issues/{issue_num}/labels", + {"labels": [label]}, + ) + + def assign_issue(self, owner: str, repo: str, issue_num: int, assignee: str) -> dict: + return self._patch( + f"/repos/{owner}/{repo}/issues/{issue_num}", + {"assignees": [assignee]}, + ) + + +# --------------------------------------------------------------------------- +# Scoring & Assignment Logic +# --------------------------------------------------------------------------- + +def classify_issue(issue: dict) -> str: + """Determine the best agent role for an issue based on title/body.""" + title = (issue.get("title", "") or "").lower() + body = (issue.get("body", "") or "").lower() + text = f"{title} {body}" + labels = [l.get("name", "").lower() for l in issue.get("labels", [])] + text += " " + " ".join(labels) + + best_role = "small-tasks" # default + best_score = 0 + + for role, keywords in ROLE_KEYWORDS.items(): + if not keywords: + continue + score = sum(2 for kw in keywords if kw in text) + # Boost if a matching label exists + for label in labels: + if any(kw in label for kw in keywords): + score += 3 + if score > best_score: + best_score = score + best_role = role + + return best_role + + +def compute_priority(issue: dict) -> int: + """Compute issue priority from keywords.""" + title = (issue.get("title", "") or "").lower() + body = (issue.get("body", "") or "").lower() + text = f"{title} {body}" + return sum(v for k, v in PRIORITY_KEYWORDS.items() if k in text) + + +def score_agent_for_issue(agent: dict, role: str, wolf_scores: dict, priority: int) -> float: + """Score how well an agent matches an issue. Higher is better.""" + score = 0.0 + + # Primary: role match + agent_role = agent.get("role", "unknown") + if agent_role == role: + score += 10.0 + elif agent_role == "small-tasks" and role in ("issue-triage", "on-request-queries"): + score += 6.0 + elif agent_role == "triage-routing" and role in ("issue-triage", "triage-routing"): + score += 8.0 + elif agent_role == "code-generation" and role in ("code-review", "devops"): + score += 4.0 + + # Wolf quality bonus + model = agent.get("model", "") + wolf_entry = None + for wm, ws in wolf_scores.items(): + if model and model.lower() in wm.lower(): + wolf_entry = ws + break + if wolf_entry and wolf_entry.get("success"): + score += wolf_entry.get("total", 0) * 3.0 + + # Cost efficiency: prefer free/cheap for low priority + tier = agent.get("tier", "unknown") + tier_idx = TIER_ORDER.index(tier) if tier in TIER_ORDER else 3 + if priority <= 1 and tier in ("free", "cheap"): + score += 4.0 + elif priority >= 3 and tier in ("prepaid",): + score += 3.0 + else: + score += (3 - tier_idx) * 1.0 + + # Activity bonus + if agent.get("active", False): + score += 2.0 + + # Repo familiarity bonus: more repos slightly better + repo_count = agent.get("repo_count", 0) + score += min(repo_count * 0.2, 2.0) + + return round(score, 3) + + +def find_best_agent(agents: List[dict], role: str, wolf_scores: dict, priority: int, + exclude: Optional[List[str]] = None) -> Optional[dict]: + """Find the best agent for the given role and priority.""" + exclude = exclude or [] + candidates = [] + for agent in agents: + if agent.get("name") in exclude: + continue + if not agent.get("active", False): + continue + s = score_agent_for_issue(agent, role, wolf_scores, priority) + candidates.append((s, agent)) + + if not candidates: + return None + + candidates.sort(key=lambda x: x[0], reverse=True) + return candidates[0][1] + + +# --------------------------------------------------------------------------- +# Dispatch +# --------------------------------------------------------------------------- + +def dispatch_assignment(api: GiteaAPI, issue: dict, agent: dict, dry_run: bool = False) -> dict: + """Assign an issue to an agent and optionally post a comment.""" + owner = ORG_NAME + repo = issue.get("repository", {}).get("name", "") + + # Extract repo from issue repo_url if not in the repository key + if not repo: + repo_url = issue.get("repository_url", "") + if repo_url: + repo = repo_url.rstrip("/").split("/")[-1] + + if not repo: + return {"success": False, "error": "Cannot determine repository for issue"} + + issue_num = issue.get("number") + agent_name = agent.get("name", "unknown") + + comment_body = ( + f"🤖 **Workforce Manager assigned this issue to: @{agent_name}**\n\n" + f"- **Agent:** {agent_name}\n" + f"- **Model:** {agent.get('model', 'unknown')}\n" + f"- **Role:** {agent.get('role', 'unknown')}\n" + f"- **Tier:** {agent.get('tier', 'unknown')}\n" + f"- **Assigned at:** {datetime.now(timezone.utc).isoformat()}\n\n" + f"*Automated assignment by Workforce Manager (Epic #204)*" + ) + + if dry_run: + return { + "success": True, + "dry_run": True, + "repo": repo, + "issue_number": issue_num, + "assignee": agent_name, + "comment": comment_body, + } + + try: + api.assign_issue(owner, repo, issue_num, agent_name) + api.add_issue_comment(owner, repo, issue_num, comment_body) + return { + "success": True, + "repo": repo, + "issue_number": issue_num, + "issue_title": issue.get("title", ""), + "assignee": agent_name, + } + except Exception as exc: + return { + "success": False, + "repo": repo, + "issue_number": issue_num, + "error": str(exc), + } + + +# --------------------------------------------------------------------------- +# State Tracking +# --------------------------------------------------------------------------- + +def update_agent_stats(state: dict, result: dict) -> None: + """Update per-agent success tracking.""" + agent_name = result.get("assignee", "unknown") + if "agent_stats" not in state: + state["agent_stats"] = {} + if agent_name not in state["agent_stats"]: + state["agent_stats"][agent_name] = { + "total_assigned": 0, + "successful": 0, + "failed": 0, + "success_rate": 0.0, + "last_assignment": None, + "assigned_issues": [], + } + + stats = state["agent_stats"][agent_name] + stats["total_assigned"] += 1 + stats["last_assignment"] = datetime.now(timezone.utc).isoformat() + stats["assigned_issues"].append({ + "repo": result.get("repo", ""), + "issue_number": result.get("issue_number"), + "success": result.get("success", False), + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + + if result.get("success"): + stats["successful"] += 1 + else: + stats["failed"] += 1 + + total = stats["successful"] + stats["failed"] + stats["success_rate"] = round(stats["successful"] / total, 3) if total > 0 else 0.0 + + +def print_status(state: dict, agents: List[dict], issues_count: int) -> None: + """Print workforce status.""" + print(f"\n{'=' * 60}") + print(f"Workforce Manager Status - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}") + print(f"{'=' * 60}") + + # Fleet summary + active = [a for a in agents if a.get("active")] + print(f"\nFleet: {len(active)} active agents, {len(agents)} total") + tier_counts = {} + for a in active: + t = a.get("tier", "unknown") + tier_counts[t] = tier_counts.get(t, 0) + 1 + for t, c in sorted(tier_counts.items()): + print(f" {t}: {c} agents") + + # Agent scores + wolf = load_wolf_scores() + print(f"\nAgent Details:") + print(f" {'Name':<25} {'Model':<30} {'Role':<18} {'Tier':<10}") + for a in agents: + if not a.get("active"): + continue + stats = state.get("agent_stats", {}).get(a["name"], {}) + rate = stats.get("success_rate", 0.0) + total = stats.get("total_assigned", 0) + wolf_badge = "" + for wm, ws in wolf.items(): + if a["model"] and a["model"].lower() in wm.lower() and ws.get("success"): + wolf_badge = f"[wolf:{ws['total']}]" + break + name_str = f"{a['name']} {wolf_badge}" + if total > 0: + name_str += f" (s/r: {rate}, n={total})" + print(f" {name_str:<45} {a.get('role', 'unknown'):<18} {a.get('tier', '?'):<10}") + + print(f"\nOpen Issues: {issues_count}") + print(f"Assignments Made: {len(state.get('assignments', []))}") + if state.get("last_run"): + print(f"Last Run: {state['last_run']}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser(description="Workforce Manager - Assign Gitea issues to AI agents") + parser.add_argument("--dry-run", action="store_true", help="Show assignments without dispatching") + parser.add_argument("--status", action="store_true", help="Show workforce status only") + parser.add_argument("--cron", action="store_true", help="Run silently for cron scheduling") + parser.add_argument("--label", type=str, help="Only process issues with this label") + parser.add_argument("--max-issues", type=int, default=100, help="Max issues to process per run") + args = parser.parse_args() + + # Setup logging + if args.cron: + logging.basicConfig(level=logging.WARNING, format="%(asctime)s [%(levelname)s] %(message)s") + else: + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + + logging.info("Workforce Manager starting") + + # Load data + agents = load_fleet_routing() + if not agents: + logging.error("No agents found in fleet-routing.json") + return 1 + logging.info("Loaded %d agents from fleet routing", len(agents)) + + wolf_scores = load_wolf_scores() + if wolf_scores: + logging.info("Loaded %d model scores from Wolf results", len(wolf_scores)) + + state = load_workforce_state() + + # Load Gitea token + if GITEA_TOKEN_PATH.exists(): + token = GITEA_TOKEN_PATH.read_text().strip() + else: + logging.error("Gitea token not found at %s", GITEA_TOKEN_PATH) + return 1 + + api = GiteaAPI(token) + + # Status-only mode + if args.status: + # Quick open issue count + repos = api.get_org_repos(ORG_NAME) + total = sum(r.get("open_issues_count", 0) for r in repos) + print_status(state, agents, total) + return 0 + + # Fetch open issues + if not args.cron: + print(f"Scanning open issues across {ORG_NAME} repos...") + + issues = api.get_all_open_issues(ORG_NAME) + + # Filter by label + if args.label: + issues = [ + i for i in issues + if any(args.label in (l.get("name", "") or "").lower() for l in i.get("labels", [])) + ] + + if args.label: + logging.info("Filtered to %d issues with label '%s'", len(issues), args.label) + else: + logging.info("Found %d open issues", len(issues)) + + # Skip issues already assigned + already_assigned_nums = set() + for a in state.get("assignments", []): + already_assigned_nums.add((a.get("repo"), a.get("issue_number"))) + + issues = [ + i for i in issues + if not i.get("assignee") and + not (i.get("repository", {}).get("name"), i.get("number")) in already_assigned_nums + ] + logging.info("%d unassigned issues to process", len(issues)) + + # Sort by priority + issues_with_priority = [(compute_priority(i), i) for i in issues] + issues_with_priority.sort(key=lambda x: x[0], reverse=True) + issues = [i for _, i in issues_with_priority[:args.max_issues]] + + # Assign issues + assignments = [] + agent_exclusions: Dict[str, List[str]] = {} # repo -> list of assigned agents per run + global_exclusions: List[str] = [] # agents already at capacity per run + max_per_agent_per_run = 5 + + for issue in issues: + role = classify_issue(issue) + priority = compute_priority(issue) + repo = issue.get("repository", {}).get("name", "") + + # Avoid assigning same agent twice to same repo in one run + repo_excluded = agent_exclusions.get(repo, []) + + # Also exclude agents already at assignment cap + cap_excluded = [ + name for name, stats in state.get("agent_stats", {}).items() + if stats.get("total_assigned", 0) > max_per_agent_per_run + ] + + excluded = list(set(repo_excluded + global_exclusions + cap_excluded)) + + agent = find_best_agent(agents, role, wolf_scores, priority, exclude=excluded) + if not agent: + # Relax exclusions if no agent found + agent = find_best_agent(agents, role, wolf_scores, priority, exclude=[]) + if not agent: + logging.warning("No suitable agent for issue #%d: %s (role=%s)", + issue.get("number"), issue.get("title", ""), role) + continue + + result = dispatch_assignment(api, issue, agent, dry_run=args.dry_run) + assignments.append(result) + update_agent_stats(state, result) + + # Track per-repo exclusions + if repo not in agent_exclusions: + agent_exclusions[repo] = [] + agent_exclusions[repo].append(agent["name"]) + + if args.dry_run: + print(f" [DRY] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})") + else: + status_str = "OK" if result.get("success") else "FAIL" + print(f" [{status_str}] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})") + + # Save state + state["assignments"].extend([{ + "repo": a.get("repo"), + "issue_number": a.get("issue_number"), + "assignee": a.get("assignee"), + "success": a.get("success", False), + "timestamp": datetime.now(timezone.utc).isoformat(), + } for a in assignments]) + state["last_run"] = datetime.now(timezone.utc).isoformat() + save_workforce_state(state) + + # Summary + ok = sum(1 for a in assignments if a.get("success")) + fail = len(assignments) - ok + logging.info("Done: %d assigned, %d succeeded, %d failed", len(assignments), ok, fail) + + if not args.cron: + print(f"\n{'=' * 60}") + print(f"Summary: {len(assignments)} assignments, {ok} OK, {fail} failed") + # Show agent stats + for name, stats in state.get("agent_stats", {}).items(): + if stats.get("total_assigned", 0) > 0: + print(f" @{name}: {stats['successful']}/{stats['total_assigned']} ({stats.get('success_rate', 0):.0%} success)") + print(f"{'=' * 60}") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())