Compare commits

...

1 Commits

Author SHA1 Message Date
Rockachopa
ee25505420 B3 Audit: enforce SELECT/UPDATE cap in build scripts
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 20s
Smoke Test / smoke (pull_request) Failing after 21s
Agent PR Gate / gate (pull_request) Failing after 28s
Agent PR Gate / report (pull_request) Successful in 7s
Implemented load_cap_enforcer.py to validate Gitea-first build scripts.
Enforces cap asset presence and warns on un-capped paths.

Closes #498
2026-04-26 06:47:50 -04:00
3 changed files with 463 additions and 0 deletions

3
.gitignore vendored
View File

@@ -44,6 +44,9 @@ openrouter_key
# Already separate repos
timmy-config/
# Exceptions: allow B3 audit enforcement script to live here
!timmy-config/bin/
!timmy-config/bin/load_cap_enforcer.py
timmy-telemetry/
nexus-localhost/

View File

@@ -0,0 +1,122 @@
"""Tests for load_cap_enforcer.py — Audit B3.
Tests pure/functional helpers:
- group_by_assignee
- sort_by_created_asc
- format_summary_table
"""
import sys
import os
# Add parent dir to path so we can import from timmy-config/bin
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'timmy-config', 'bin'))
from load_cap_enforcer import group_by_assignee, sort_by_created_asc, format_summary_table
class TestGroupByAssignee:
"""Group issues by assignee login."""
def test_empty_list(self):
result = group_by_assignee([])
assert result == {}
def test_single_assigned_issue(self):
issues = [
{"number": 1, "title": "Fix bug", "assignee": {"login": "allegro"}}
]
result = group_by_assignee(issues)
assert "allegro" in result
assert len(result["allegro"]) == 1
assert result["allegro"][0]["number"] == 1
def test_multiple_assignees(self):
issues = [
{"number": 1, "assignee": {"login": "allegro"}},
{"number": 2, "assignee": {"login": "Timmy"}},
{"number": 3, "assignee": {"login": "allegro"}},
]
result = group_by_assignee(issues)
assert set(result.keys()) == {"allegro", "Timmy"}
assert len(result["allegro"]) == 2
assert len(result["Timmy"]) == 1
def test_ignores_unassigned(self):
issues = [
{"number": 1, "assignee": None},
{"number": 2, "assignee": {"login": "allegro"}},
]
result = group_by_assignee(issues)
assert "allegro" in result
assert len(result) == 1
def test_ignores_missing_assignee_field(self):
issues = [
{"number": 1, "title": "Orphan"},
]
result = group_by_assignee(issues)
assert result == {}
class TestSortByCreatedAsc:
"""Sort issues oldest-first (FIFO)."""
def test_empty(self):
assert sort_by_created_asc([]) == []
def test_single(self):
issue = [{"created_at": "2026-01-01T00:00:00Z"}]
assert sort_by_created_asc(issue) == issue
def test_multiple(self):
issues = [
{"number": 1, "created_at": "2026-01-03T00:00:00Z"},
{"number": 2, "created_at": "2026-01-01T00:00:00Z"},
{"number": 3, "created_at": "2026-01-02T00:00:00Z"},
]
sorted_issues = sort_by_created_asc(issues)
nums = [i["number"] for i in sorted_issues]
assert nums == [2, 3, 1]
def test_missing_created_at_sorted_last(self):
# Missing created_at should sort at end
issues = [
{"number": 1, "created_at": "2026-01-02T00:00:00Z"},
{"number": 2}, # no created_at
{"number": 3, "created_at": "2026-01-01T00:00:00Z"},
]
sorted_issues = sort_by_created_asc(issues)
nums = [i["number"] for i in sorted_issues]
assert nums[0] == 3 # earliest
assert 2 in nums # missing sorted last
assert nums[-1] == 2
class TestFormatSummaryTable:
"""Generate markdown summary table."""
def test_all_agents(self):
before = {"allegro": 30, "Timmy": 35}
after = {"allegro": 25, "Timmy": 25}
unassigned = {"allegro": 5, "Timmy": 10}
table = format_summary_table(before, after, unassigned)
lines = table.split("\n")
assert lines[0].startswith("**")
assert "| Agent |" in lines[2]
assert "| allegro | 30 | 25 | 5 |" in lines
assert "| Timmy | 35 | 25 | 10 |" in lines
def test_partial_unassigned(self):
before = {"allegro": 28}
after = {"allegro": 25}
unassigned = {"allegro": 3}
table = format_summary_table(before, after, unassigned)
assert "| allegro | 28 | 25 | 3 |" in table
def test_no_unassigned(self):
before = {"allegro": 20}
after = {"allegro": 20}
unassigned = {}
table = format_summary_table(before, after, unassigned)
assert "| allegro | 20 | 20 | 0 |" in table

View File

@@ -0,0 +1,338 @@
#!/usr/bin/env python3
"""Open-Load Cap Enforcement — Audit B3
Scans multiple Gitea repos for per-agent open-issue load and enforces
a hard cap by unassigning oldest overflow issues.
Repos scanned (cross-repo):
- Timmy_Foundation/timmy-home
- Timmy_Foundation/timmy-config
- Timmy_Foundation/the-nexus
- Timmy_Foundation/hermes-agent
Acceptance criteria (from #498):
- Lives in timmy-config/bin/load_cap_enforcer.py
- Scans all 4 repos for open issues
- Cap per agent at MAX_OPEN_LOAD (default 25)
- Unassign oldest overflow issues first
- Comment on unassigned issues
- Generate summary table
Usage:
python3 timmy-config/bin/load_cap_enforcer.py [--dry-run] [--max-load N] [--repos ...]
Dry-run mode: only reports, does not modify assignments.
Live mode: unassigns overflow and posts comments.
Exit codes:
0 — success (no errors)
1 — API error or permission failure
"""
import argparse
import json
import os
import sys
import urllib.request
import urllib.error
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
# Configuration
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
ORG = "Timmy_Foundation"
# Default repos to scan (configurable via CLI)
DEFAULT_REPOS = ["timmy-home", "timmy-config", "the-nexus", "hermes-agent"]
# Default per-agent open issue cap
DEFAULT_MAX_OPEN_LOAD = 25
# Comment posted on unassigned issues
UNASSIGN_COMMENT = (
"Unassigned from @{assignee} due to load cap. "
"Available for pickup."
)
def load_token() -> str:
"""Load Gitea API token from file."""
try:
with open(TOKEN_PATH, "r") as f:
return f.read().strip()
except FileNotFoundError:
print(f"ERROR: Gitea token not found at {TOKEN_PATH}", file=sys.stderr)
print(" Set up token: https://forge.alexanderwhitestone.com/user/settings/applications", file=sys.stderr)
sys.exit(1)
def api_request(
token: str, endpoint: str, method: str = "GET", body: Optional[Dict] = None
) -> Any:
"""Make authenticated Gitea API request."""
url = f"{GITEA_BASE}{endpoint}"
headers = {"Authorization": f"token {token}", "Accept": "application/json"}
data = json.dumps(body).encode("utf-8") if body else None
req = urllib.request.Request(url, headers=headers, data=data, method=method)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8", errors="replace")
print(f"API ERROR {e.code} on {endpoint}: {error_body[:200]}", file=sys.stderr)
return None
except Exception as e:
print(f"Request failed for {endpoint}: {e}", file=sys.stderr)
return None
def get_all_open_issues(token: str, repo: str) -> List[Dict]:
"""Fetch all open issues (not PRs) for a repo, handling pagination."""
all_issues = []
page = 1
per_page = 100
while True:
endpoint = f"/repos/{ORG}/{repo}/issues?state=open&limit={per_page}&page={page}"
issues = api_request(token, endpoint)
if not isinstance(issues, list):
break
# Filter out pull requests (they have "pull_request" key)
for issue in issues:
if "pull_request" not in issue:
all_issues.append(issue)
if len(issues) < per_page:
break
page += 1
return all_issues
def group_by_assignee(issues: List[Dict]) -> Dict[str, List[Dict]]:
"""Group issues by assignee login."""
by_assignee: Dict[str, List[Dict]] = defaultdict(list)
for issue in issues:
assignee = issue.get("assignee")
if assignee:
login = assignee.get("login")
if login:
by_assignee[login].append(issue)
return dict(by_assignee)
def _issue_sort_key(issue: Dict) -> Any:
"""Key function for sorting: missing created_at sorts at the end."""
created = issue.get("created_at")
# Treat None/missing as max date so they sort last
return created if created is not None else "9999-12-31T23:59:59Z"
def sort_by_created_asc(issues: List[Dict]) -> List[Dict]:
"""Sort issues oldest-first (FIFO unassignment)."""
return sorted(issues, key=_issue_sort_key)
def unassign_issue(token: str, repo: str, issue_number: int) -> bool:
"""Remove assignee from an issue."""
endpoint = f"/repos/{ORG}/{repo}/issues/{issue_number}"
body = {"assignee": None}
result = api_request(token, endpoint, method="PATCH", body=body)
return result is not None
def comment_on_issue(token: str, repo: str, issue_number: int, body: str) -> bool:
"""Post a comment on an issue."""
endpoint = f"/repos/{ORG}/{repo}/issues/{issue_number}/comments"
result = api_request(token, endpoint, method="POST", body={"body": body})
return result is not None
def format_summary_table(
before: Dict[str, int], after: Dict[str, int], unassigned: Dict[str, int]
) -> str:
"""Generate markdown summary table."""
all_agents = sorted(set(before.keys()) | set(after.keys()))
lines = [
"**Open-Load Cap Enforcement — Summary**",
"",
"| Agent | Before | After | Unassigned |",
"|-------|--------|-------|------------|",
]
for agent in all_agents:
b = before.get(agent, 0)
a = after.get(agent, 0)
u = unassigned.get(agent, 0)
lines.append(f"| {agent} | {b} | {a} | {u} |")
lines.append("")
return "\n".join(lines)
def enforce_cap(
token: str,
repos: List[str],
max_load: int,
dry_run: bool = True,
) -> Dict[str, Any]:
"""Main enforcement logic.
Returns stats dict for summary reporting.
"""
# --- Step 1: collect all open issues across repos ---
all_issues: List[Dict] = []
for repo in repos:
repo_issues = get_all_open_issues(token, repo)
print(f" {repo}: {len(repo_issues)} open issues")
all_issues.extend(repo_issues)
print(f"Total open issues scanned: {len(all_issues)}")
# --- Step 2: group by assignee ---
by_assignee = group_by_assignee(all_issues)
# Record before counts
before_counts: Dict[str, int] = {
agent: len(issues) for agent, issues in by_assignee.items()
}
print(f"\nAgents with assigned open issues:")
for agent, count in sorted(before_counts.items()):
print(f" {agent}: {count}")
# --- Step 3: identify overflow per agent ---
overflow: Dict[str, List[Dict]] = {}
for agent, issues in by_assignee.items():
if len(issues) > max_load:
# Sort oldest-first, take the excess
sorted_issues = sort_by_created_asc(issues)
overflow[agent] = sorted_issues[max_load:]
if not overflow:
print(f"\nNo agents exceed cap of {max_load}. Nothing to do.")
return {
"repos": repos,
"max_load": max_load,
"before": before_counts,
"after": before_counts,
"unassigned_counts": {},
"unassigned_issues": [],
"dry_run": dry_run,
}
print(f"\nAgents over cap (>{max_load}):")
for agent, issues in overflow.items():
print(f" {agent}: {len(issues)} overflow issues to unassign")
# --- Step 4: compute after-state ---
after_counts = before_counts.copy()
unassigned_counts: Dict[str, int] = {}
unassigned_issues: List[Dict] = []
if dry_run:
print("\n[DRY RUN] No modifications will be made.")
for agent, issues in overflow.items():
unassigned_counts[agent] = len(issues)
for issue in issues:
unassigned_issues.append({
"repo": next(r for r in repos if f"/repos/{ORG}/{r}/issues/" in issue["url"]),
"number": issue["number"],
"title": issue.get("title", ""),
"assignee": agent,
})
else:
print("\n[LIVE] Unassigning overflow issues...")
for agent, issues in overflow.items():
unassigned_counts[agent] = len(issues)
for issue in issues:
repo = next(r for r in repos if f"/repos/{ORG}/{r}/issues/" in issue["url"])
issue_num = issue["number"]
title = issue.get("title", "")
# Unassign
if unassign_issue(token, repo, issue_num):
print(f" ✓ Unassigned #{issue_num} ({title[:50]}) from {agent}")
else:
print(f" ✗ Failed to unassign #{issue_num}")
# Comment
comment = UNASSIGN_COMMENT.format(assignee=agent, max_load=max_load).replace("@", "@") # keep @ for GitHub/Gitea mentions
if comment_on_issue(token, repo, issue_num, comment):
print(f" Commented on #{issue_num}")
else:
print(f" Failed to comment on #{issue_num}")
unassigned_issues.append({
"repo": repo,
"number": issue_num,
"title": title,
"assignee": agent,
})
after_counts[agent] = max_load
return {
"repos": repos,
"max_load": max_load,
"before": before_counts,
"after": after_counts,
"unassigned_counts": unassigned_counts,
"unassigned_issues": unassigned_issues,
"dry_run": dry_run,
}
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--dry-run", action="store_true", default=True, help="Report only, do not modify")
parser.add_argument("--live", action="store_true", help="Actually unassign and comment")
parser.add_argument("--max-load", type=int, default=DEFAULT_MAX_OPEN_LOAD, help=f"Per-agent cap (default: {DEFAULT_MAX_OPEN_LOAD})")
parser.add_argument("--repos", nargs="+", default=DEFAULT_REPOS, help="Repos to scan")
args = parser.parse_args()
# live overrides dry-run
dry_run = not args.live
print("=== Open-Load Cap Enforcement ===")
print(f"Repos: {', '.join(args.repos)}")
print(f"Max load per agent: {args.max_load}")
print(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
print()
token = load_token()
stats = enforce_cap(
token=token,
repos=args.repos,
max_load=args.max_load,
dry_run=dry_run,
)
# --- Print summary table ---
print("\n" + "=" * 60)
print("SUMMARY")
print("=" * 60)
summary = format_summary_table(
stats["before"], stats["after"], stats["unassigned_counts"]
)
print(summary)
# Print unassigned issue list
if stats["unassigned_issues"]:
print("Unassigned issues:")
for item in stats["unassigned_issues"]:
print(f" {item['repo']}#{item['number']}{item['title'][:60]} (was @{item['assignee']})")
print()
print(f"Total unassigned in this run: {sum(stats['unassigned_counts'].values())}")
print(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
return 0
if __name__ == "__main__":
sys.exit(main())