Compare commits

..

2 Commits

View File

@@ -1,633 +0,0 @@
#!/usr/bin/env python3
"""
Workforce Manager - Epic #204 / Milestone #218
Reads fleet routing, Wolf evaluation scores, and open Gitea issues across
Timmy_Foundation repos. Assigns each issue to the best-available agent,
tracks success rates, and dispatches work.
Usage:
python workforce-manager.py # Scan, assign, dispatch
python workforce-manager.py --dry-run # Show assignments without dispatching
python workforce-manager.py --status # Show agent status and open issue count
python workforce-manager.py --cron # Run silently, save to log
"""
import argparse
import json
import logging
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import requests
except ImportError:
print("FATAL: requests is required. pip install requests", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
FLEET_ROUTING_PATH = Path.home() / ".hermes" / "fleet-routing.json"
WOLF_RESULTS_DIR = Path.home() / ".hermes" / "wolf" / "results"
GITEA_TOKEN_PATH = Path.home() / ".hermes" / "gitea_token_vps"
GITEA_API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
WORKFORCE_STATE_PATH = Path.home() / ".hermes" / "workforce-state.json"
ORG_NAME = "Timmy_Foundation"
# Role-to-agent-role mapping heuristics
ROLE_KEYWORDS = {
"code-generation": [
"code", "implement", "feature", "function", "class", "script",
"build", "create", "add", "module", "component",
],
"issue-triage": [
"triage", "categorize", "tag", "label", "organize",
"backlog", "sort", "prioritize", "review issue",
],
"on-request-queries": [
"query", "search", "lookup", "find", "check",
"info", "report", "status",
],
"devops": [
"deploy", "ci", "cd", "pipeline", "docker", "container",
"server", "infrastructure", "config", "nginx", "cron",
"setup", "install", "environment", "provision",
"build", "release", "workflow",
],
"documentation": [
"doc", "readme", "document", "write", "guide",
"spec", "wiki", "changelog", "tutorial",
"explain", "describe",
],
"code-review": [
"review", "refactor", "fix", "bug", "debug",
"test", "lint", "style", "improve",
"clean up", "optimize", "performance",
],
"triage-routing": [
"route", "assign", "triage", "dispatch",
"organize", "categorize",
],
"small-tasks": [
"small", "quick", "minor", "typo", "label",
"update", "rename", "cleanup",
],
"inactive": [],
"unknown": [],
}
# Priority keywords (higher = more urgent, route to more capable agent)
PRIORITY_KEYWORDS = {
"critical": 5,
"urgent": 4,
"block": 4,
"bug": 3,
"fix": 3,
"security": 5,
"deploy": 2,
"feature": 1,
"enhancement": 1,
"documentation": 1,
"cleanup": 0,
}
# Cost tier priority (lower index = prefer first)
TIER_ORDER = ["free", "cheap", "prepaid", "unknown"]
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_json(path: Path) -> Any:
if not path.exists():
logging.warning("File not found: %s", path)
return None
with open(path) as f:
return json.load(f)
def load_fleet_routing() -> List[dict]:
data = load_json(FLEET_ROUTING_PATH)
if data and "agents" in data:
return data["agents"]
return []
def load_wolf_scores() -> Dict[str, dict]:
"""Load Wolf evaluation scores from results directory."""
scores: Dict[str, dict] = {}
if not WOLF_RESULTS_DIR.exists():
return scores
for f in sorted(WOLF_RESULTS_DIR.glob("*.json")):
data = load_json(f)
if data and "model_scores" in data:
for entry in data["model_scores"]:
model = entry.get("model", "")
if model:
scores[model] = entry
return scores
def load_workforce_state() -> dict:
if WORKFORCE_STATE_PATH.exists():
return load_json(WORKFORCE_STATE_PATH) or {}
return {"assignments": [], "agent_stats": {}, "last_run": None}
def save_workforce_state(state: dict) -> None:
WORKFORCE_STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(WORKFORCE_STATE_PATH, "w") as f:
json.dump(state, f, indent=2)
logging.info("Workforce state saved to %s", WORKFORCE_STATE_PATH)
# ---------------------------------------------------------------------------
# Gitea API
# ---------------------------------------------------------------------------
class GiteaAPI:
"""Thin wrapper for Gitea REST API."""
def __init__(self, token: str, base_url: str = GITEA_API_BASE):
self.base_url = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def _get(self, path: str, params: Optional[dict] = None) -> Any:
r = self.session.get(f"{self.base_url}{path}", params=params)
r.raise_for_status()
return r.json()
def _post(self, path: str, data: dict) -> Any:
r = self.session.post(f"{self.base_url}{path}", json=data)
r.raise_for_status()
return r.json()
def _patch(self, path: str, data: dict) -> Any:
r = self.session.patch(f"{self.base_url}{path}", json=data)
r.raise_for_status()
return r.json()
def get_org_repos(self, org: str) -> List[dict]:
return self._get(f"/orgs/{org}/repos", params={"limit": 100})
def get_open_issues(self, owner: str, repo: str, page: int = 1) -> List[dict]:
params = {"state": "open", "type": "issues", "limit": 50, "page": page}
return self._get(f"/repos/{owner}/{repo}/issues", params=params)
def get_all_open_issues(self, org: str) -> List[dict]:
"""Fetch all open issues across all org repos."""
repos = self.get_org_repos(org)
all_issues = []
for repo in repos:
name = repo["name"]
try:
# Paginate through all issues
page = 1
while True:
issues = self.get_open_issues(org, name, page=page)
if not issues:
break
all_issues.extend(issues)
if len(issues) < 50:
break
page += 1
logging.info("Loaded %d open issues from %s/%s", len(all_issues), org, name)
except Exception as exc:
logging.warning("Failed to load issues from %s/%s: %s", org, name, exc)
return all_issues
def add_issue_comment(self, owner: str, repo: str, issue_num: int, body: str) -> dict:
return self._post(f"/repos/{owner}/{repo}/issues/{issue_num}/comments", {"body": body})
def add_issue_label(self, owner: str, repo: str, issue_num: int, label: str) -> dict:
return self._post(
f"/repos/{owner}/{repo}/issues/{issue_num}/labels",
{"labels": [label]},
)
def assign_issue(self, owner: str, repo: str, issue_num: int, assignee: str) -> dict:
return self._patch(
f"/repos/{owner}/{repo}/issues/{issue_num}",
{"assignees": [assignee]},
)
# ---------------------------------------------------------------------------
# Scoring & Assignment Logic
# ---------------------------------------------------------------------------
def classify_issue(issue: dict) -> str:
"""Determine the best agent role for an issue based on title/body."""
title = (issue.get("title", "") or "").lower()
body = (issue.get("body", "") or "").lower()
text = f"{title} {body}"
labels = [l.get("name", "").lower() for l in issue.get("labels", [])]
text += " " + " ".join(labels)
best_role = "small-tasks" # default
best_score = 0
for role, keywords in ROLE_KEYWORDS.items():
if not keywords:
continue
score = sum(2 for kw in keywords if kw in text)
# Boost if a matching label exists
for label in labels:
if any(kw in label for kw in keywords):
score += 3
if score > best_score:
best_score = score
best_role = role
return best_role
def compute_priority(issue: dict) -> int:
"""Compute issue priority from keywords."""
title = (issue.get("title", "") or "").lower()
body = (issue.get("body", "") or "").lower()
text = f"{title} {body}"
return sum(v for k, v in PRIORITY_KEYWORDS.items() if k in text)
def score_agent_for_issue(agent: dict, role: str, wolf_scores: dict, priority: int) -> float:
"""Score how well an agent matches an issue. Higher is better."""
score = 0.0
# Primary: role match
agent_role = agent.get("role", "unknown")
if agent_role == role:
score += 10.0
elif agent_role == "small-tasks" and role in ("issue-triage", "on-request-queries"):
score += 6.0
elif agent_role == "triage-routing" and role in ("issue-triage", "triage-routing"):
score += 8.0
elif agent_role == "code-generation" and role in ("code-review", "devops"):
score += 4.0
# Wolf quality bonus
model = agent.get("model", "")
wolf_entry = None
for wm, ws in wolf_scores.items():
if model and model.lower() in wm.lower():
wolf_entry = ws
break
if wolf_entry and wolf_entry.get("success"):
score += wolf_entry.get("total", 0) * 3.0
# Cost efficiency: prefer free/cheap for low priority
tier = agent.get("tier", "unknown")
tier_idx = TIER_ORDER.index(tier) if tier in TIER_ORDER else 3
if priority <= 1 and tier in ("free", "cheap"):
score += 4.0
elif priority >= 3 and tier in ("prepaid",):
score += 3.0
else:
score += (3 - tier_idx) * 1.0
# Activity bonus
if agent.get("active", False):
score += 2.0
# Repo familiarity bonus: more repos slightly better
repo_count = agent.get("repo_count", 0)
score += min(repo_count * 0.2, 2.0)
return round(score, 3)
def find_best_agent(agents: List[dict], role: str, wolf_scores: dict, priority: int,
exclude: Optional[List[str]] = None) -> Optional[dict]:
"""Find the best agent for the given role and priority."""
exclude = exclude or []
candidates = []
for agent in agents:
if agent.get("name") in exclude:
continue
if not agent.get("active", False):
continue
s = score_agent_for_issue(agent, role, wolf_scores, priority)
candidates.append((s, agent))
if not candidates:
return None
candidates.sort(key=lambda x: x[0], reverse=True)
return candidates[0][1]
# ---------------------------------------------------------------------------
# Dispatch
# ---------------------------------------------------------------------------
def dispatch_assignment(api: GiteaAPI, issue: dict, agent: dict, dry_run: bool = False) -> dict:
"""Assign an issue to an agent and optionally post a comment."""
owner = ORG_NAME
repo = issue.get("repository", {}).get("name", "")
# Extract repo from issue repo_url if not in the repository key
if not repo:
repo_url = issue.get("repository_url", "")
if repo_url:
repo = repo_url.rstrip("/").split("/")[-1]
if not repo:
return {"success": False, "error": "Cannot determine repository for issue"}
issue_num = issue.get("number")
agent_name = agent.get("name", "unknown")
comment_body = (
f"🤖 **Workforce Manager assigned this issue to: @{agent_name}**\n\n"
f"- **Agent:** {agent_name}\n"
f"- **Model:** {agent.get('model', 'unknown')}\n"
f"- **Role:** {agent.get('role', 'unknown')}\n"
f"- **Tier:** {agent.get('tier', 'unknown')}\n"
f"- **Assigned at:** {datetime.now(timezone.utc).isoformat()}\n\n"
f"*Automated assignment by Workforce Manager (Epic #204)*"
)
if dry_run:
return {
"success": True,
"dry_run": True,
"repo": repo,
"issue_number": issue_num,
"assignee": agent_name,
"comment": comment_body,
}
try:
api.assign_issue(owner, repo, issue_num, agent_name)
api.add_issue_comment(owner, repo, issue_num, comment_body)
return {
"success": True,
"repo": repo,
"issue_number": issue_num,
"issue_title": issue.get("title", ""),
"assignee": agent_name,
}
except Exception as exc:
return {
"success": False,
"repo": repo,
"issue_number": issue_num,
"error": str(exc),
}
# ---------------------------------------------------------------------------
# State Tracking
# ---------------------------------------------------------------------------
def update_agent_stats(state: dict, result: dict) -> None:
"""Update per-agent success tracking."""
agent_name = result.get("assignee", "unknown")
if "agent_stats" not in state:
state["agent_stats"] = {}
if agent_name not in state["agent_stats"]:
state["agent_stats"][agent_name] = {
"total_assigned": 0,
"successful": 0,
"failed": 0,
"success_rate": 0.0,
"last_assignment": None,
"assigned_issues": [],
}
stats = state["agent_stats"][agent_name]
stats["total_assigned"] += 1
stats["last_assignment"] = datetime.now(timezone.utc).isoformat()
stats["assigned_issues"].append({
"repo": result.get("repo", ""),
"issue_number": result.get("issue_number"),
"success": result.get("success", False),
"timestamp": datetime.now(timezone.utc).isoformat(),
})
if result.get("success"):
stats["successful"] += 1
else:
stats["failed"] += 1
total = stats["successful"] + stats["failed"]
stats["success_rate"] = round(stats["successful"] / total, 3) if total > 0 else 0.0
def print_status(state: dict, agents: List[dict], issues_count: int) -> None:
"""Print workforce status."""
print(f"\n{'=' * 60}")
print(f"Workforce Manager Status - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
print(f"{'=' * 60}")
# Fleet summary
active = [a for a in agents if a.get("active")]
print(f"\nFleet: {len(active)} active agents, {len(agents)} total")
tier_counts = {}
for a in active:
t = a.get("tier", "unknown")
tier_counts[t] = tier_counts.get(t, 0) + 1
for t, c in sorted(tier_counts.items()):
print(f" {t}: {c} agents")
# Agent scores
wolf = load_wolf_scores()
print(f"\nAgent Details:")
print(f" {'Name':<25} {'Model':<30} {'Role':<18} {'Tier':<10}")
for a in agents:
if not a.get("active"):
continue
stats = state.get("agent_stats", {}).get(a["name"], {})
rate = stats.get("success_rate", 0.0)
total = stats.get("total_assigned", 0)
wolf_badge = ""
for wm, ws in wolf.items():
if a["model"] and a["model"].lower() in wm.lower() and ws.get("success"):
wolf_badge = f"[wolf:{ws['total']}]"
break
name_str = f"{a['name']} {wolf_badge}"
if total > 0:
name_str += f" (s/r: {rate}, n={total})"
print(f" {name_str:<45} {a.get('role', 'unknown'):<18} {a.get('tier', '?'):<10}")
print(f"\nOpen Issues: {issues_count}")
print(f"Assignments Made: {len(state.get('assignments', []))}")
if state.get("last_run"):
print(f"Last Run: {state['last_run']}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(description="Workforce Manager - Assign Gitea issues to AI agents")
parser.add_argument("--dry-run", action="store_true", help="Show assignments without dispatching")
parser.add_argument("--status", action="store_true", help="Show workforce status only")
parser.add_argument("--cron", action="store_true", help="Run silently for cron scheduling")
parser.add_argument("--label", type=str, help="Only process issues with this label")
parser.add_argument("--max-issues", type=int, default=100, help="Max issues to process per run")
args = parser.parse_args()
# Setup logging
if args.cron:
logging.basicConfig(level=logging.WARNING, format="%(asctime)s [%(levelname)s] %(message)s")
else:
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logging.info("Workforce Manager starting")
# Load data
agents = load_fleet_routing()
if not agents:
logging.error("No agents found in fleet-routing.json")
return 1
logging.info("Loaded %d agents from fleet routing", len(agents))
wolf_scores = load_wolf_scores()
if wolf_scores:
logging.info("Loaded %d model scores from Wolf results", len(wolf_scores))
state = load_workforce_state()
# Load Gitea token
if GITEA_TOKEN_PATH.exists():
token = GITEA_TOKEN_PATH.read_text().strip()
else:
logging.error("Gitea token not found at %s", GITEA_TOKEN_PATH)
return 1
api = GiteaAPI(token)
# Status-only mode
if args.status:
# Quick open issue count
repos = api.get_org_repos(ORG_NAME)
total = sum(r.get("open_issues_count", 0) for r in repos)
print_status(state, agents, total)
return 0
# Fetch open issues
if not args.cron:
print(f"Scanning open issues across {ORG_NAME} repos...")
issues = api.get_all_open_issues(ORG_NAME)
# Filter by label
if args.label:
issues = [
i for i in issues
if any(args.label in (l.get("name", "") or "").lower() for l in i.get("labels", []))
]
if args.label:
logging.info("Filtered to %d issues with label '%s'", len(issues), args.label)
else:
logging.info("Found %d open issues", len(issues))
# Skip issues already assigned
already_assigned_nums = set()
for a in state.get("assignments", []):
already_assigned_nums.add((a.get("repo"), a.get("issue_number")))
issues = [
i for i in issues
if not i.get("assignee") and
not (i.get("repository", {}).get("name"), i.get("number")) in already_assigned_nums
]
logging.info("%d unassigned issues to process", len(issues))
# Sort by priority
issues_with_priority = [(compute_priority(i), i) for i in issues]
issues_with_priority.sort(key=lambda x: x[0], reverse=True)
issues = [i for _, i in issues_with_priority[:args.max_issues]]
# Assign issues
assignments = []
agent_exclusions: Dict[str, List[str]] = {} # repo -> list of assigned agents per run
global_exclusions: List[str] = [] # agents already at capacity per run
max_per_agent_per_run = 5
for issue in issues:
role = classify_issue(issue)
priority = compute_priority(issue)
repo = issue.get("repository", {}).get("name", "")
# Avoid assigning same agent twice to same repo in one run
repo_excluded = agent_exclusions.get(repo, [])
# Also exclude agents already at assignment cap
cap_excluded = [
name for name, stats in state.get("agent_stats", {}).items()
if stats.get("total_assigned", 0) > max_per_agent_per_run
]
excluded = list(set(repo_excluded + global_exclusions + cap_excluded))
agent = find_best_agent(agents, role, wolf_scores, priority, exclude=excluded)
if not agent:
# Relax exclusions if no agent found
agent = find_best_agent(agents, role, wolf_scores, priority, exclude=[])
if not agent:
logging.warning("No suitable agent for issue #%d: %s (role=%s)",
issue.get("number"), issue.get("title", ""), role)
continue
result = dispatch_assignment(api, issue, agent, dry_run=args.dry_run)
assignments.append(result)
update_agent_stats(state, result)
# Track per-repo exclusions
if repo not in agent_exclusions:
agent_exclusions[repo] = []
agent_exclusions[repo].append(agent["name"])
if args.dry_run:
print(f" [DRY] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})")
else:
status_str = "OK" if result.get("success") else "FAIL"
print(f" [{status_str}] #{issue['number']}: {issue.get('title','')[:60]} → @{agent['name']} ({role}, p={priority})")
# Save state
state["assignments"].extend([{
"repo": a.get("repo"),
"issue_number": a.get("issue_number"),
"assignee": a.get("assignee"),
"success": a.get("success", False),
"timestamp": datetime.now(timezone.utc).isoformat(),
} for a in assignments])
state["last_run"] = datetime.now(timezone.utc).isoformat()
save_workforce_state(state)
# Summary
ok = sum(1 for a in assignments if a.get("success"))
fail = len(assignments) - ok
logging.info("Done: %d assigned, %d succeeded, %d failed", len(assignments), ok, fail)
if not args.cron:
print(f"\n{'=' * 60}")
print(f"Summary: {len(assignments)} assignments, {ok} OK, {fail} failed")
# Show agent stats
for name, stats in state.get("agent_stats", {}).items():
if stats.get("total_assigned", 0) > 0:
print(f" @{name}: {stats['successful']}/{stats['total_assigned']} ({stats.get('success_rate', 0):.0%} success)")
print(f"{'=' * 60}")
return 0
if __name__ == "__main__":
sys.exit(main())