Compare commits
1 Commits
step35/338
...
step35/498
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c36777486 |
172
bin/load_cap_enforcer.py
Executable file
172
bin/load_cap_enforcer.py
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Open-Load Cap Enforcer — AUDIT-B3 (#498)
|
||||
|
||||
Scans multiple repos, caps per-agent open issue load at 25, auto-unassigns overflow
|
||||
(oldest first), comments unassigned issues, and generates a summary table.
|
||||
|
||||
Usage:
|
||||
python load_cap_enforcer.py --dry-run # preview only
|
||||
python load_cap_enforcer.py # execute changes
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
ORG = "Timmy_Foundation"
|
||||
REPOS = ["timmy-home", "timmy-config", "the-nexus", "hermes-agent"]
|
||||
CAP = 25
|
||||
|
||||
|
||||
def load_token() -> str:
|
||||
try:
|
||||
with open(TOKEN_PATH) as f:
|
||||
return f.read().strip()
|
||||
except FileNotFoundError:
|
||||
sys.exit(f"ERROR: Gitea token not found at {TOKEN_PATH}")
|
||||
|
||||
|
||||
def api_request(method: str, endpoint: str, token: str, payload: Dict | None = None) -> dict | None:
|
||||
url = f"{GITEA_BASE}{endpoint}"
|
||||
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
|
||||
data = json.dumps(payload).encode("utf-8") if payload else None
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode("utf-8"))
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode() if e.fp else "No error body"
|
||||
print(f"API Error {e.code}: {body}")
|
||||
return None
|
||||
|
||||
|
||||
def get_all_issues(repo: str, token: str) -> List[Dict]:
|
||||
issues = []
|
||||
page = 1
|
||||
while True:
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues?state=open&page={page}&limit=50"
|
||||
batch = api_request("GET", endpoint, token)
|
||||
if not batch:
|
||||
break
|
||||
issues.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
time.sleep(0.2)
|
||||
return issues
|
||||
|
||||
|
||||
def parse_iso(s: str) -> datetime:
|
||||
return datetime.fromisoformat(s.replace("Z", "+00:00"))
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Enforce open-issue load cap per agent")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without making changes")
|
||||
args = parser.parse_args()
|
||||
|
||||
token = load_token()
|
||||
print(f"{'DRY RUN — ' if args.dry_run else ''}Scanning {len(REPOS)} repos...")
|
||||
|
||||
agent_issues: Dict[str, List[Tuple[datetime, int, str, str]]] = {}
|
||||
total_issues = 0
|
||||
for repo in REPOS:
|
||||
issues = get_all_issues(repo, token)
|
||||
total_issues += len(issues)
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee:
|
||||
username = assignee.get("login") or assignee.get("username", "unknown")
|
||||
created = parse_iso(issue["created_at"])
|
||||
agent_issues.setdefault(username, []).append((created, issue["number"], issue["title"], repo))
|
||||
print(f" {repo}: {len(issues)} open issues")
|
||||
|
||||
print(f"\nTotal open issues scanned: {total_issues}")
|
||||
print("\n=== BEFORE TABLE ===")
|
||||
print(f"{'Agent':<20} {'Before':>6}")
|
||||
for agent, issues in sorted(agent_issues.items(), key=lambda x: -len(x[1])):
|
||||
print(f" {agent:<18} {len(issues):>6}")
|
||||
|
||||
plan: Dict[str, Tuple[List[Tuple[datetime, int, str, str]], int, int]] = {}
|
||||
grand_total_unassign = 0
|
||||
for agent, issues in agent_issues.items():
|
||||
count = len(issues)
|
||||
if count > CAP:
|
||||
sorted_issues = sorted(issues, key=lambda x: x[0])
|
||||
overflow = sorted_issues[: count - CAP]
|
||||
plan[agent] = (overflow, count, CAP)
|
||||
grand_total_unassign += len(overflow)
|
||||
|
||||
if not plan:
|
||||
print("\nNo agents exceed the cap. All clear.")
|
||||
return
|
||||
|
||||
print(f"\n=== ACTION PLAN (cap={CAP}) ===")
|
||||
print(f"{'Agent':<20} {'Before':>6} {'After':>6} {'→ Unassign':>10}")
|
||||
for agent, (overflow, before, after) in sorted(plan.items(), key=lambda x: -x[1][1]):
|
||||
print(f" {agent:<18} {before:>6} {after:>6} {len(overflow):>10}")
|
||||
|
||||
if args.dry_run:
|
||||
print(f"\nDRY RUN — would unassign {grand_total_unassign} issues across {len(plan)} agents.")
|
||||
return
|
||||
|
||||
print(f"\n=== EXECUTING — unassigning {grand_total_unassign} issues ===")
|
||||
summary_lines = [
|
||||
"# Open-Load Cap Enforcement — Run @ " + datetime.utcnow().isoformat() + "Z",
|
||||
"",
|
||||
"## Summary",
|
||||
"- **Agents capped:** " + str(len(plan)),
|
||||
"- **Total issues unassigned:** " + str(grand_total_unassign),
|
||||
"",
|
||||
"| Agent | Before | After | Unassigned |",
|
||||
"|-------|--------|-------|------------|",
|
||||
]
|
||||
|
||||
for agent, (overflow, before, after) in sorted(plan.items(), key=lambda x: -x[1][1]):
|
||||
unassigned_numbers: List[Tuple[int, str]] = []
|
||||
for created, number, title, repo in overflow:
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues/{number}"
|
||||
result = api_request("PATCH", endpoint, token, {"assignee": None})
|
||||
if result:
|
||||
print(f" Unassigned {repo}#{number} from @{agent} — '{title[:50]}'")
|
||||
unassigned_numbers.append((number, repo))
|
||||
else:
|
||||
print(f" FAILED to unassign {repo}#{number}")
|
||||
|
||||
comment_body = (
|
||||
"Unassigned from @" + agent + " due to load cap enforcement "
|
||||
"(AUDIT-B3 / #498). This agent had >25 open issues and exceeded the cap. "
|
||||
"Issue is now available for pickup."
|
||||
)
|
||||
for number, repo in unassigned_numbers:
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues/{number}/comments"
|
||||
api_request("POST", endpoint, token, {"body": comment_body})
|
||||
time.sleep(0.2)
|
||||
|
||||
summary_lines.append(
|
||||
f"| {agent} | {before} | {after} | {len(unassigned_numbers)} |"
|
||||
)
|
||||
print(f" @{agent}: {len(unassigned_numbers)} issues unassigned and commented")
|
||||
|
||||
summary = "\n".join(summary_lines)
|
||||
print("\n=== FINAL SUMMARY ===")
|
||||
print(summary)
|
||||
|
||||
parent_url = f"/repos/{ORG}/timmy-home/issues/495/comments"
|
||||
result = api_request("POST", parent_url, token, {"body": summary})
|
||||
if result:
|
||||
print("\nSummary posted to timmy-home#495")
|
||||
else:
|
||||
print("\nWARNING: Could not post summary to timmy-home#495")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user