Files
hermes-agent/scripts/morning_review_packet_status.py
2026-04-22 10:56:24 -04:00

286 lines
10 KiB
Python

#!/usr/bin/env python3
"""Generate a grounded status report for hermes-agent morning review packet epic #949."""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import ssl
import urllib.request
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
BASE_API = "https://forge.alexanderwhitestone.com/api/v1"
REPO = "Timmy_Foundation/hermes-agent"
TOKEN_PATH = Path("~/.config/gitea/token").expanduser()
DEFAULT_JSON_OUT = Path("docs/morning-review-packet-2026-04-21.snapshot.json")
DEFAULT_MARKDOWN_OUT = Path("docs/morning-review-packet-2026-04-21-status.md")
def extract_issue_numbers(text: str) -> list[int]:
seen: set[int] = set()
numbers: list[int] = []
for match in re.finditer(r"#(\d+)", text or ""):
num = int(match.group(1))
if num not in seen:
seen.add(num)
numbers.append(num)
return numbers
def _auth_headers(token: str) -> list[dict[str, str]]:
basic = base64.b64encode(f"{token}:".encode()).decode()
return [
{"Authorization": f"token {token}", "Accept": "application/json"},
{"Authorization": f"Basic {basic}", "Accept": "application/json"},
]
def api_get(path: str, *, headers_options: list[dict[str, str]] | None = None) -> Any:
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers_options = headers_options or _auth_headers(token)
ctx = ssl.create_default_context()
url = f"{BASE_API}{path}"
last_error: Exception | None = None
for headers in headers_options:
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
return json.loads(resp.read().decode())
except Exception as exc: # pragma: no cover - exercised via live CLI use
last_error = exc
raise RuntimeError(f"GET {url} failed: {last_error}")
def issue_pr_matches(pr: dict[str, Any], issue_num: int) -> bool:
title = pr.get("title") or ""
body = pr.get("body") or ""
head = (pr.get("head") or {}).get("ref") or ""
exact_ref = re.compile(rf"(?<!\d)#{issue_num}(?!\d)")
body_ref = re.compile(rf"(?i)(closes|close|fixes|fix|resolves|resolve|refs|ref)\s+#?{issue_num}(?!\d)")
branch_variants = {
f"fix/{issue_num}",
f"issue-{issue_num}",
f"burn/{issue_num}",
f"fix/issue-{issue_num}",
}
return bool(
exact_ref.search(title)
or exact_ref.search(body)
or body_ref.search(body)
or head in branch_variants
)
def fetch_open_prs(*, headers_options: list[dict[str, str]]) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
page = 1
while True:
batch = api_get(
f"/repos/{REPO}/pulls?state=open&limit=100&page={page}",
headers_options=headers_options,
)
if not batch:
break
prs.extend(batch)
if len(batch) < 100:
break
page += 1
return prs
def fetch_live_snapshot(epic_issue_num: int = 949) -> dict[str, Any]:
token = TOKEN_PATH.read_text(encoding="utf-8").strip()
headers_options = _auth_headers(token)
epic = api_get(f"/repos/{REPO}/issues/{epic_issue_num}", headers_options=headers_options)
comments = api_get(f"/repos/{REPO}/issues/{epic_issue_num}/comments", headers_options=headers_options)
child_numbers = [n for n in extract_issue_numbers(epic.get("body") or "") if n != epic_issue_num]
decomposition_numbers = [
n
for comment in comments
for n in extract_issue_numbers(comment.get("body") or "")
if n not in child_numbers and n != epic_issue_num
]
open_prs = fetch_open_prs(headers_options=headers_options)
children = []
for number in child_numbers:
issue = api_get(f"/repos/{REPO}/issues/{number}", headers_options=headers_options)
matching_prs = [
{
"number": pr["number"],
"title": pr["title"],
"head": pr.get("head", {}).get("ref", ""),
"url": pr["html_url"],
}
for pr in open_prs
if issue_pr_matches(pr, number)
]
children.append(
{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"html_url": issue["html_url"],
"open_prs": matching_prs,
}
)
decomposition_issues = []
for number in decomposition_numbers:
issue = api_get(f"/repos/{REPO}/issues/{number}", headers_options=headers_options)
decomposition_issues.append(
{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"html_url": issue["html_url"],
}
)
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"repo": REPO,
"epic": {
"number": epic["number"],
"title": epic["title"],
"state": epic["state"],
"html_url": epic["html_url"],
},
"children": children,
"decomposition_issues": decomposition_issues,
}
def summarize_snapshot(snapshot: dict[str, Any]) -> dict[str, int]:
children = snapshot.get("children", [])
open_children = [issue for issue in children if issue.get("state") == "open"]
closed_children = [issue for issue in children if issue.get("state") == "closed"]
open_with_pr = [issue for issue in open_children if issue.get("open_prs")]
open_without_pr = [issue for issue in open_children if not issue.get("open_prs")]
return {
"total_children": len(children),
"open_children": len(open_children),
"closed_children": len(closed_children),
"open_with_pr": len(open_with_pr),
"open_without_pr": len(open_without_pr),
}
def render_markdown(snapshot: dict[str, Any]) -> str:
epic = snapshot["epic"]
children = snapshot.get("children", [])
summary = summarize_snapshot(snapshot)
open_with_pr = [issue for issue in children if issue.get("state") == "open" and issue.get("open_prs")]
open_without_pr = [issue for issue in children if issue.get("state") == "open" and not issue.get("open_prs")]
decomposition = snapshot.get("decomposition_issues", [])
lines = [
f"# Morning Review Packet Status — #{epic['number']}",
"",
f"Generated: {snapshot.get('generated_at', '')}",
f"Epic: [{epic['title']}]({epic.get('html_url', '')})",
"",
"## Summary",
"",
f"- Child QA issues tracked: {summary['total_children']}",
f"- Open child issues: {summary['open_children']}",
f"- Closed child issues: {summary['closed_children']}",
f"- Open child issues already backed by PRs: {summary['open_with_pr']}",
f"- Open child issues still unowned on forge: {summary['open_without_pr']}",
"",
"## Child QA Matrix",
"",
"| Issue | State | Open PRs | Title |",
"|------:|-------|----------|-------|",
]
for issue in children:
pr_text = ", ".join(
f"[#{pr['number']}]({pr['url']})" for pr in issue.get("open_prs", [])
) or ""
lines.append(
f"| #{issue['number']} | {issue['state']} | {pr_text} | {issue['title']} |"
)
lines.extend([
"",
"## Drift Signals",
"",
"forge/main is still catching up to the upstream packet.",
])
if open_with_pr:
lines.append("")
lines.append("Active PR-backed child lanes:")
for issue in open_with_pr:
pr_numbers = ", ".join(f"#{pr['number']}" for pr in issue.get("open_prs", []))
lines.append(f"- #{issue['number']} -> {pr_numbers} ({issue['title']})")
if open_without_pr:
lines.extend([
"",
"## Unowned Open QA Issues",
"",
])
for issue in open_without_pr:
lines.append(f"- #{issue['number']} {issue['title']}")
if decomposition:
lines.extend([
"",
"## Decomposition Follow-Ups",
"",
])
for issue in decomposition:
lines.append(f"- #{issue['number']} [{issue['state']}] {issue['title']}")
lines.extend([
"",
"## Conclusion",
"",
"Refs #949 only. This epic remains open until every child QA issue has a truthful PASS/FAIL outcome, attached evidence, and any upstream/main versus forge/main drift is resolved or explicitly documented.",
"",
"## Regeneration",
"",
"```bash",
"python3 scripts/morning_review_packet_status.py --fetch-live --json-out docs/morning-review-packet-2026-04-21.snapshot.json --markdown-out docs/morning-review-packet-2026-04-21-status.md",
"```",
])
return "\n".join(lines) + "\n"
def write_json(path: Path, data: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
def main() -> None:
parser = argparse.ArgumentParser(description="Generate grounded status docs for epic #949")
parser.add_argument("--fetch-live", action="store_true", help="Fetch the current packet state from Forge")
parser.add_argument("--snapshot", type=Path, help="Read a local JSON snapshot instead of hitting the API")
parser.add_argument("--json-out", type=Path, default=DEFAULT_JSON_OUT, help="Path to write JSON snapshot")
parser.add_argument("--markdown-out", type=Path, default=DEFAULT_MARKDOWN_OUT, help="Path to write markdown report")
args = parser.parse_args()
if args.fetch_live or not args.snapshot:
snapshot = fetch_live_snapshot()
else:
snapshot = json.loads(args.snapshot.read_text(encoding="utf-8"))
write_json(args.json_out, snapshot)
args.markdown_out.parent.mkdir(parents=True, exist_ok=True)
args.markdown_out.write_text(render_markdown(snapshot), encoding="utf-8")
print(args.markdown_out)
if __name__ == "__main__":
main()