Compare commits
1 Commits
step35/683
...
step35/498
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee25505420 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -44,6 +44,9 @@ openrouter_key
|
||||
|
||||
# Already separate repos
|
||||
timmy-config/
|
||||
# Exceptions: allow B3 audit enforcement script to live here
|
||||
!timmy-config/bin/
|
||||
!timmy-config/bin/load_cap_enforcer.py
|
||||
timmy-telemetry/
|
||||
nexus-localhost/
|
||||
|
||||
|
||||
122
tests/test_load_cap_enforcer.py
Normal file
122
tests/test_load_cap_enforcer.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""Tests for load_cap_enforcer.py — Audit B3.
|
||||
|
||||
Tests pure/functional helpers:
|
||||
- group_by_assignee
|
||||
- sort_by_created_asc
|
||||
- format_summary_table
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add parent dir to path so we can import from timmy-config/bin
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'timmy-config', 'bin'))
|
||||
|
||||
from load_cap_enforcer import group_by_assignee, sort_by_created_asc, format_summary_table
|
||||
|
||||
|
||||
class TestGroupByAssignee:
|
||||
"""Group issues by assignee login."""
|
||||
|
||||
def test_empty_list(self):
|
||||
result = group_by_assignee([])
|
||||
assert result == {}
|
||||
|
||||
def test_single_assigned_issue(self):
|
||||
issues = [
|
||||
{"number": 1, "title": "Fix bug", "assignee": {"login": "allegro"}}
|
||||
]
|
||||
result = group_by_assignee(issues)
|
||||
assert "allegro" in result
|
||||
assert len(result["allegro"]) == 1
|
||||
assert result["allegro"][0]["number"] == 1
|
||||
|
||||
def test_multiple_assignees(self):
|
||||
issues = [
|
||||
{"number": 1, "assignee": {"login": "allegro"}},
|
||||
{"number": 2, "assignee": {"login": "Timmy"}},
|
||||
{"number": 3, "assignee": {"login": "allegro"}},
|
||||
]
|
||||
result = group_by_assignee(issues)
|
||||
assert set(result.keys()) == {"allegro", "Timmy"}
|
||||
assert len(result["allegro"]) == 2
|
||||
assert len(result["Timmy"]) == 1
|
||||
|
||||
def test_ignores_unassigned(self):
|
||||
issues = [
|
||||
{"number": 1, "assignee": None},
|
||||
{"number": 2, "assignee": {"login": "allegro"}},
|
||||
]
|
||||
result = group_by_assignee(issues)
|
||||
assert "allegro" in result
|
||||
assert len(result) == 1
|
||||
|
||||
def test_ignores_missing_assignee_field(self):
|
||||
issues = [
|
||||
{"number": 1, "title": "Orphan"},
|
||||
]
|
||||
result = group_by_assignee(issues)
|
||||
assert result == {}
|
||||
|
||||
|
||||
class TestSortByCreatedAsc:
|
||||
"""Sort issues oldest-first (FIFO)."""
|
||||
|
||||
def test_empty(self):
|
||||
assert sort_by_created_asc([]) == []
|
||||
|
||||
def test_single(self):
|
||||
issue = [{"created_at": "2026-01-01T00:00:00Z"}]
|
||||
assert sort_by_created_asc(issue) == issue
|
||||
|
||||
def test_multiple(self):
|
||||
issues = [
|
||||
{"number": 1, "created_at": "2026-01-03T00:00:00Z"},
|
||||
{"number": 2, "created_at": "2026-01-01T00:00:00Z"},
|
||||
{"number": 3, "created_at": "2026-01-02T00:00:00Z"},
|
||||
]
|
||||
sorted_issues = sort_by_created_asc(issues)
|
||||
nums = [i["number"] for i in sorted_issues]
|
||||
assert nums == [2, 3, 1]
|
||||
|
||||
def test_missing_created_at_sorted_last(self):
|
||||
# Missing created_at should sort at end
|
||||
issues = [
|
||||
{"number": 1, "created_at": "2026-01-02T00:00:00Z"},
|
||||
{"number": 2}, # no created_at
|
||||
{"number": 3, "created_at": "2026-01-01T00:00:00Z"},
|
||||
]
|
||||
sorted_issues = sort_by_created_asc(issues)
|
||||
nums = [i["number"] for i in sorted_issues]
|
||||
assert nums[0] == 3 # earliest
|
||||
assert 2 in nums # missing sorted last
|
||||
assert nums[-1] == 2
|
||||
|
||||
|
||||
class TestFormatSummaryTable:
|
||||
"""Generate markdown summary table."""
|
||||
|
||||
def test_all_agents(self):
|
||||
before = {"allegro": 30, "Timmy": 35}
|
||||
after = {"allegro": 25, "Timmy": 25}
|
||||
unassigned = {"allegro": 5, "Timmy": 10}
|
||||
table = format_summary_table(before, after, unassigned)
|
||||
lines = table.split("\n")
|
||||
assert lines[0].startswith("**")
|
||||
assert "| Agent |" in lines[2]
|
||||
assert "| allegro | 30 | 25 | 5 |" in lines
|
||||
assert "| Timmy | 35 | 25 | 10 |" in lines
|
||||
|
||||
def test_partial_unassigned(self):
|
||||
before = {"allegro": 28}
|
||||
after = {"allegro": 25}
|
||||
unassigned = {"allegro": 3}
|
||||
table = format_summary_table(before, after, unassigned)
|
||||
assert "| allegro | 28 | 25 | 3 |" in table
|
||||
|
||||
def test_no_unassigned(self):
|
||||
before = {"allegro": 20}
|
||||
after = {"allegro": 20}
|
||||
unassigned = {}
|
||||
table = format_summary_table(before, after, unassigned)
|
||||
assert "| allegro | 20 | 20 | 0 |" in table
|
||||
338
timmy-config/bin/load_cap_enforcer.py
Executable file
338
timmy-config/bin/load_cap_enforcer.py
Executable file
@@ -0,0 +1,338 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Open-Load Cap Enforcement — Audit B3
|
||||
|
||||
Scans multiple Gitea repos for per-agent open-issue load and enforces
|
||||
a hard cap by unassigning oldest overflow issues.
|
||||
|
||||
Repos scanned (cross-repo):
|
||||
- Timmy_Foundation/timmy-home
|
||||
- Timmy_Foundation/timmy-config
|
||||
- Timmy_Foundation/the-nexus
|
||||
- Timmy_Foundation/hermes-agent
|
||||
|
||||
Acceptance criteria (from #498):
|
||||
- Lives in timmy-config/bin/load_cap_enforcer.py
|
||||
- Scans all 4 repos for open issues
|
||||
- Cap per agent at MAX_OPEN_LOAD (default 25)
|
||||
- Unassign oldest overflow issues first
|
||||
- Comment on unassigned issues
|
||||
- Generate summary table
|
||||
|
||||
Usage:
|
||||
python3 timmy-config/bin/load_cap_enforcer.py [--dry-run] [--max-load N] [--repos ...]
|
||||
|
||||
Dry-run mode: only reports, does not modify assignments.
|
||||
Live mode: unassigns overflow and posts comments.
|
||||
|
||||
Exit codes:
|
||||
0 — success (no errors)
|
||||
1 — API error or permission failure
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# Configuration
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
|
||||
ORG = "Timmy_Foundation"
|
||||
|
||||
# Default repos to scan (configurable via CLI)
|
||||
DEFAULT_REPOS = ["timmy-home", "timmy-config", "the-nexus", "hermes-agent"]
|
||||
|
||||
# Default per-agent open issue cap
|
||||
DEFAULT_MAX_OPEN_LOAD = 25
|
||||
|
||||
# Comment posted on unassigned issues
|
||||
UNASSIGN_COMMENT = (
|
||||
"Unassigned from @{assignee} due to load cap. "
|
||||
"Available for pickup."
|
||||
)
|
||||
|
||||
|
||||
def load_token() -> str:
|
||||
"""Load Gitea API token from file."""
|
||||
try:
|
||||
with open(TOKEN_PATH, "r") as f:
|
||||
return f.read().strip()
|
||||
except FileNotFoundError:
|
||||
print(f"ERROR: Gitea token not found at {TOKEN_PATH}", file=sys.stderr)
|
||||
print(" Set up token: https://forge.alexanderwhitestone.com/user/settings/applications", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def api_request(
|
||||
token: str, endpoint: str, method: str = "GET", body: Optional[Dict] = None
|
||||
) -> Any:
|
||||
"""Make authenticated Gitea API request."""
|
||||
url = f"{GITEA_BASE}{endpoint}"
|
||||
headers = {"Authorization": f"token {token}", "Accept": "application/json"}
|
||||
|
||||
data = json.dumps(body).encode("utf-8") if body else None
|
||||
|
||||
req = urllib.request.Request(url, headers=headers, data=data, method=method)
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read())
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode("utf-8", errors="replace")
|
||||
print(f"API ERROR {e.code} on {endpoint}: {error_body[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Request failed for {endpoint}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def get_all_open_issues(token: str, repo: str) -> List[Dict]:
|
||||
"""Fetch all open issues (not PRs) for a repo, handling pagination."""
|
||||
all_issues = []
|
||||
page = 1
|
||||
per_page = 100
|
||||
|
||||
while True:
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues?state=open&limit={per_page}&page={page}"
|
||||
issues = api_request(token, endpoint)
|
||||
if not isinstance(issues, list):
|
||||
break
|
||||
# Filter out pull requests (they have "pull_request" key)
|
||||
for issue in issues:
|
||||
if "pull_request" not in issue:
|
||||
all_issues.append(issue)
|
||||
if len(issues) < per_page:
|
||||
break
|
||||
page += 1
|
||||
|
||||
return all_issues
|
||||
|
||||
|
||||
def group_by_assignee(issues: List[Dict]) -> Dict[str, List[Dict]]:
|
||||
"""Group issues by assignee login."""
|
||||
by_assignee: Dict[str, List[Dict]] = defaultdict(list)
|
||||
for issue in issues:
|
||||
assignee = issue.get("assignee")
|
||||
if assignee:
|
||||
login = assignee.get("login")
|
||||
if login:
|
||||
by_assignee[login].append(issue)
|
||||
return dict(by_assignee)
|
||||
|
||||
|
||||
def _issue_sort_key(issue: Dict) -> Any:
|
||||
"""Key function for sorting: missing created_at sorts at the end."""
|
||||
created = issue.get("created_at")
|
||||
# Treat None/missing as max date so they sort last
|
||||
return created if created is not None else "9999-12-31T23:59:59Z"
|
||||
|
||||
|
||||
def sort_by_created_asc(issues: List[Dict]) -> List[Dict]:
|
||||
"""Sort issues oldest-first (FIFO unassignment)."""
|
||||
return sorted(issues, key=_issue_sort_key)
|
||||
|
||||
|
||||
def unassign_issue(token: str, repo: str, issue_number: int) -> bool:
|
||||
"""Remove assignee from an issue."""
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues/{issue_number}"
|
||||
body = {"assignee": None}
|
||||
result = api_request(token, endpoint, method="PATCH", body=body)
|
||||
return result is not None
|
||||
|
||||
|
||||
def comment_on_issue(token: str, repo: str, issue_number: int, body: str) -> bool:
|
||||
"""Post a comment on an issue."""
|
||||
endpoint = f"/repos/{ORG}/{repo}/issues/{issue_number}/comments"
|
||||
result = api_request(token, endpoint, method="POST", body={"body": body})
|
||||
return result is not None
|
||||
|
||||
|
||||
def format_summary_table(
|
||||
before: Dict[str, int], after: Dict[str, int], unassigned: Dict[str, int]
|
||||
) -> str:
|
||||
"""Generate markdown summary table."""
|
||||
all_agents = sorted(set(before.keys()) | set(after.keys()))
|
||||
lines = [
|
||||
"**Open-Load Cap Enforcement — Summary**",
|
||||
"",
|
||||
"| Agent | Before | After | Unassigned |",
|
||||
"|-------|--------|-------|------------|",
|
||||
]
|
||||
for agent in all_agents:
|
||||
b = before.get(agent, 0)
|
||||
a = after.get(agent, 0)
|
||||
u = unassigned.get(agent, 0)
|
||||
lines.append(f"| {agent} | {b} | {a} | {u} |")
|
||||
lines.append("")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def enforce_cap(
|
||||
token: str,
|
||||
repos: List[str],
|
||||
max_load: int,
|
||||
dry_run: bool = True,
|
||||
) -> Dict[str, Any]:
|
||||
"""Main enforcement logic.
|
||||
|
||||
Returns stats dict for summary reporting.
|
||||
"""
|
||||
# --- Step 1: collect all open issues across repos ---
|
||||
all_issues: List[Dict] = []
|
||||
for repo in repos:
|
||||
repo_issues = get_all_open_issues(token, repo)
|
||||
print(f" {repo}: {len(repo_issues)} open issues")
|
||||
all_issues.extend(repo_issues)
|
||||
|
||||
print(f"Total open issues scanned: {len(all_issues)}")
|
||||
|
||||
# --- Step 2: group by assignee ---
|
||||
by_assignee = group_by_assignee(all_issues)
|
||||
|
||||
# Record before counts
|
||||
before_counts: Dict[str, int] = {
|
||||
agent: len(issues) for agent, issues in by_assignee.items()
|
||||
}
|
||||
|
||||
print(f"\nAgents with assigned open issues:")
|
||||
for agent, count in sorted(before_counts.items()):
|
||||
print(f" {agent}: {count}")
|
||||
|
||||
# --- Step 3: identify overflow per agent ---
|
||||
overflow: Dict[str, List[Dict]] = {}
|
||||
for agent, issues in by_assignee.items():
|
||||
if len(issues) > max_load:
|
||||
# Sort oldest-first, take the excess
|
||||
sorted_issues = sort_by_created_asc(issues)
|
||||
overflow[agent] = sorted_issues[max_load:]
|
||||
|
||||
if not overflow:
|
||||
print(f"\nNo agents exceed cap of {max_load}. Nothing to do.")
|
||||
return {
|
||||
"repos": repos,
|
||||
"max_load": max_load,
|
||||
"before": before_counts,
|
||||
"after": before_counts,
|
||||
"unassigned_counts": {},
|
||||
"unassigned_issues": [],
|
||||
"dry_run": dry_run,
|
||||
}
|
||||
|
||||
print(f"\nAgents over cap (>{max_load}):")
|
||||
for agent, issues in overflow.items():
|
||||
print(f" {agent}: {len(issues)} overflow issues to unassign")
|
||||
|
||||
# --- Step 4: compute after-state ---
|
||||
after_counts = before_counts.copy()
|
||||
unassigned_counts: Dict[str, int] = {}
|
||||
unassigned_issues: List[Dict] = []
|
||||
|
||||
if dry_run:
|
||||
print("\n[DRY RUN] No modifications will be made.")
|
||||
for agent, issues in overflow.items():
|
||||
unassigned_counts[agent] = len(issues)
|
||||
for issue in issues:
|
||||
unassigned_issues.append({
|
||||
"repo": next(r for r in repos if f"/repos/{ORG}/{r}/issues/" in issue["url"]),
|
||||
"number": issue["number"],
|
||||
"title": issue.get("title", ""),
|
||||
"assignee": agent,
|
||||
})
|
||||
else:
|
||||
print("\n[LIVE] Unassigning overflow issues...")
|
||||
for agent, issues in overflow.items():
|
||||
unassigned_counts[agent] = len(issues)
|
||||
for issue in issues:
|
||||
repo = next(r for r in repos if f"/repos/{ORG}/{r}/issues/" in issue["url"])
|
||||
issue_num = issue["number"]
|
||||
title = issue.get("title", "")
|
||||
|
||||
# Unassign
|
||||
if unassign_issue(token, repo, issue_num):
|
||||
print(f" ✓ Unassigned #{issue_num} ({title[:50]}) from {agent}")
|
||||
else:
|
||||
print(f" ✗ Failed to unassign #{issue_num}")
|
||||
|
||||
# Comment
|
||||
comment = UNASSIGN_COMMENT.format(assignee=agent, max_load=max_load).replace("@", "@") # keep @ for GitHub/Gitea mentions
|
||||
if comment_on_issue(token, repo, issue_num, comment):
|
||||
print(f" Commented on #{issue_num}")
|
||||
else:
|
||||
print(f" Failed to comment on #{issue_num}")
|
||||
|
||||
unassigned_issues.append({
|
||||
"repo": repo,
|
||||
"number": issue_num,
|
||||
"title": title,
|
||||
"assignee": agent,
|
||||
})
|
||||
|
||||
after_counts[agent] = max_load
|
||||
|
||||
return {
|
||||
"repos": repos,
|
||||
"max_load": max_load,
|
||||
"before": before_counts,
|
||||
"after": after_counts,
|
||||
"unassigned_counts": unassigned_counts,
|
||||
"unassigned_issues": unassigned_issues,
|
||||
"dry_run": dry_run,
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--dry-run", action="store_true", default=True, help="Report only, do not modify")
|
||||
parser.add_argument("--live", action="store_true", help="Actually unassign and comment")
|
||||
parser.add_argument("--max-load", type=int, default=DEFAULT_MAX_OPEN_LOAD, help=f"Per-agent cap (default: {DEFAULT_MAX_OPEN_LOAD})")
|
||||
parser.add_argument("--repos", nargs="+", default=DEFAULT_REPOS, help="Repos to scan")
|
||||
args = parser.parse_args()
|
||||
|
||||
# live overrides dry-run
|
||||
dry_run = not args.live
|
||||
|
||||
print("=== Open-Load Cap Enforcement ===")
|
||||
print(f"Repos: {', '.join(args.repos)}")
|
||||
print(f"Max load per agent: {args.max_load}")
|
||||
print(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
|
||||
print()
|
||||
|
||||
token = load_token()
|
||||
|
||||
stats = enforce_cap(
|
||||
token=token,
|
||||
repos=args.repos,
|
||||
max_load=args.max_load,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
# --- Print summary table ---
|
||||
print("\n" + "=" * 60)
|
||||
print("SUMMARY")
|
||||
print("=" * 60)
|
||||
summary = format_summary_table(
|
||||
stats["before"], stats["after"], stats["unassigned_counts"]
|
||||
)
|
||||
print(summary)
|
||||
|
||||
# Print unassigned issue list
|
||||
if stats["unassigned_issues"]:
|
||||
print("Unassigned issues:")
|
||||
for item in stats["unassigned_issues"]:
|
||||
print(f" {item['repo']}#{item['number']} — {item['title'][:60]} (was @{item['assignee']})")
|
||||
|
||||
print()
|
||||
print(f"Total unassigned in this run: {sum(stats['unassigned_counts'].values())}")
|
||||
print(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user