192 lines
6.3 KiB
Python
Executable File
192 lines
6.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
|
LOW_RISK_PREFIXES = (
|
|
'docs/', 'reports/', 'notes/', 'tickets/', 'research/', 'briefings/',
|
|
'twitter-archive/notes/', 'tests/'
|
|
)
|
|
LOW_RISK_SUFFIXES = {'.md', '.txt', '.jsonl'}
|
|
MEDIUM_RISK_PREFIXES = ('.gitea/workflows/',)
|
|
HIGH_RISK_PREFIXES = (
|
|
'scripts/', 'deploy/', 'infrastructure/', 'metrics/', 'heartbeat/',
|
|
'wizards/', 'evennia/', 'uniwizard/', 'uni-wizard/', 'timmy-local/',
|
|
'evolution/'
|
|
)
|
|
HIGH_RISK_SUFFIXES = {'.py', '.sh', '.ini', '.service'}
|
|
|
|
|
|
def read_changed_files(path):
|
|
return [line.strip() for line in Path(path).read_text(encoding='utf-8').splitlines() if line.strip()]
|
|
|
|
|
|
def classify_risk(files):
|
|
if not files:
|
|
return 'high'
|
|
level = 'low'
|
|
for file_path in files:
|
|
path = file_path.strip()
|
|
suffix = Path(path).suffix.lower()
|
|
if path.startswith(LOW_RISK_PREFIXES):
|
|
continue
|
|
if path.startswith(HIGH_RISK_PREFIXES) or suffix in HIGH_RISK_SUFFIXES:
|
|
return 'high'
|
|
if path.startswith(MEDIUM_RISK_PREFIXES):
|
|
level = 'medium'
|
|
continue
|
|
if path.startswith(LOW_RISK_PREFIXES) or suffix in LOW_RISK_SUFFIXES:
|
|
continue
|
|
level = 'high'
|
|
return level
|
|
|
|
|
|
def validate_pr_body(title, body):
|
|
details = []
|
|
combined = f"{title}\n{body}".strip()
|
|
if not re.search(r'#\d+', combined):
|
|
details.append('PR body/title must include an issue reference like #562.')
|
|
if not re.search(r'(^|\n)\s*(verification|tests?)\s*:', body, re.IGNORECASE):
|
|
details.append('PR body must include a Verification: section.')
|
|
return (len(details) == 0, details)
|
|
|
|
|
|
def build_comment_body(syntax_status, tests_status, criteria_status, risk_level):
|
|
statuses = {
|
|
'syntax': syntax_status,
|
|
'tests': tests_status,
|
|
'criteria': criteria_status,
|
|
}
|
|
all_clean = all(value == 'success' for value in statuses.values())
|
|
action = 'auto-merge' if all_clean and risk_level == 'low' else 'human review'
|
|
lines = [
|
|
'## Agent PR Gate',
|
|
'',
|
|
'| Check | Status |',
|
|
'|-------|--------|',
|
|
f"| Syntax / parse | {syntax_status} |",
|
|
f"| Test suite | {tests_status} |",
|
|
f"| PR criteria | {criteria_status} |",
|
|
f"| Risk level | {risk_level} |",
|
|
'',
|
|
]
|
|
failed = [name for name, value in statuses.items() if value != 'success']
|
|
if failed:
|
|
lines.append('### Failure details')
|
|
for name in failed:
|
|
lines.append(f'- {name} reported failure. Inspect the workflow logs for that step.')
|
|
else:
|
|
lines.append('All automated checks passed.')
|
|
lines.extend([
|
|
'',
|
|
f'Recommendation: {action}.',
|
|
'Low-risk documentation/test-only PRs may be auto-merged. Operational changes stay in human review.',
|
|
])
|
|
return '\n'.join(lines)
|
|
|
|
|
|
def _read_event(event_path):
|
|
data = json.loads(Path(event_path).read_text(encoding='utf-8'))
|
|
pr = data.get('pull_request') or {}
|
|
repo = (data.get('repository') or {}).get('full_name') or os.environ.get('GITHUB_REPOSITORY')
|
|
pr_number = pr.get('number') or data.get('number')
|
|
title = pr.get('title') or ''
|
|
body = pr.get('body') or ''
|
|
return repo, pr_number, title, body
|
|
|
|
|
|
def _request_json(method, url, token, payload=None):
|
|
data = None if payload is None else json.dumps(payload).encode('utf-8')
|
|
headers = {'Authorization': f'token {token}', 'Content-Type': 'application/json'}
|
|
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode('utf-8'))
|
|
|
|
|
|
def post_comment(repo, pr_number, token, body):
|
|
url = f'{API_BASE}/repos/{repo}/issues/{pr_number}/comments'
|
|
return _request_json('POST', url, token, {'body': body})
|
|
|
|
|
|
def merge_pr(repo, pr_number, token):
|
|
url = f'{API_BASE}/repos/{repo}/pulls/{pr_number}/merge'
|
|
return _request_json('POST', url, token, {'Do': 'merge'})
|
|
|
|
|
|
def cmd_classify_risk(args):
|
|
files = list(args.files or [])
|
|
if args.files_file:
|
|
files.extend(read_changed_files(args.files_file))
|
|
print(json.dumps({'risk': classify_risk(files), 'files': files}, indent=2))
|
|
return 0
|
|
|
|
|
|
def cmd_validate_pr(args):
|
|
_, _, title, body = _read_event(args.event_path)
|
|
ok, details = validate_pr_body(title, body)
|
|
if ok:
|
|
print('PR body validation passed.')
|
|
return 0
|
|
for detail in details:
|
|
print(detail)
|
|
return 1
|
|
|
|
|
|
def cmd_comment(args):
|
|
repo, pr_number, _, _ = _read_event(args.event_path)
|
|
body = build_comment_body(args.syntax, args.tests, args.criteria, args.risk)
|
|
post_comment(repo, pr_number, args.token, body)
|
|
print(f'Commented on PR #{pr_number} in {repo}.')
|
|
return 0
|
|
|
|
|
|
def cmd_merge(args):
|
|
repo, pr_number, _, _ = _read_event(args.event_path)
|
|
merge_pr(repo, pr_number, args.token)
|
|
print(f'Merged PR #{pr_number} in {repo}.')
|
|
return 0
|
|
|
|
|
|
def build_parser():
|
|
parser = argparse.ArgumentParser(description='Agent PR CI helpers for timmy-home.')
|
|
sub = parser.add_subparsers(dest='command', required=True)
|
|
|
|
classify = sub.add_parser('classify-risk')
|
|
classify.add_argument('--files-file')
|
|
classify.add_argument('files', nargs='*')
|
|
classify.set_defaults(func=cmd_classify_risk)
|
|
|
|
validate = sub.add_parser('validate-pr')
|
|
validate.add_argument('--event-path', required=True)
|
|
validate.set_defaults(func=cmd_validate_pr)
|
|
|
|
comment = sub.add_parser('comment')
|
|
comment.add_argument('--event-path', required=True)
|
|
comment.add_argument('--token', required=True)
|
|
comment.add_argument('--syntax', required=True)
|
|
comment.add_argument('--tests', required=True)
|
|
comment.add_argument('--criteria', required=True)
|
|
comment.add_argument('--risk', required=True)
|
|
comment.set_defaults(func=cmd_comment)
|
|
|
|
merge = sub.add_parser('merge')
|
|
merge.add_argument('--event-path', required=True)
|
|
merge.add_argument('--token', required=True)
|
|
merge.set_defaults(func=cmd_merge)
|
|
return parser
|
|
|
|
|
|
def main(argv=None):
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|