Co-authored-by: Codex Agent <codex@hermes.local> Co-committed-by: Codex Agent <codex@hermes.local>
156 lines
5.3 KiB
Bash
Executable File
156 lines
5.3 KiB
Bash
Executable File
#!/usr/bin/env bash
|
|
# ── Gitea Workflow Feed ────────────────────────────────────────────────
|
|
# Shows open PRs, review pressure, and issue queues across core repos.
|
|
# ───────────────────────────────────────────────────────────────────────
|
|
|
|
set -euo pipefail
|
|
|
|
B='\033[1m'
|
|
D='\033[2m'
|
|
R='\033[0m'
|
|
C='\033[36m'
|
|
G='\033[32m'
|
|
Y='\033[33m'
|
|
|
|
resolve_gitea_url() {
|
|
if [ -n "${GITEA_URL:-}" ]; then
|
|
printf '%s\n' "${GITEA_URL%/}"
|
|
return 0
|
|
fi
|
|
if [ -f "$HOME/.hermes/gitea_api" ]; then
|
|
python3 - "$HOME/.hermes/gitea_api" <<'PY'
|
|
from pathlib import Path
|
|
import sys
|
|
|
|
raw = Path(sys.argv[1]).read_text().strip().rstrip("/")
|
|
print(raw[:-7] if raw.endswith("/api/v1") else raw)
|
|
PY
|
|
return 0
|
|
fi
|
|
if [ -f "$HOME/.config/gitea/base-url" ]; then
|
|
tr -d '[:space:]' < "$HOME/.config/gitea/base-url"
|
|
return 0
|
|
fi
|
|
echo "ERROR: set GITEA_URL or create ~/.hermes/gitea_api" >&2
|
|
return 1
|
|
}
|
|
|
|
resolve_ops_token() {
|
|
local token_file
|
|
for token_file in \
|
|
"$HOME/.config/gitea/timmy-token" \
|
|
"$HOME/.hermes/gitea_token_vps" \
|
|
"$HOME/.hermes/gitea_token_timmy"; do
|
|
if [ -f "$token_file" ]; then
|
|
tr -d '[:space:]' < "$token_file"
|
|
return 0
|
|
fi
|
|
done
|
|
return 1
|
|
}
|
|
|
|
GITEA_URL="$(resolve_gitea_url)"
|
|
CORE_REPOS="${CORE_REPOS:-Timmy_Foundation/the-nexus Timmy_Foundation/timmy-home Timmy_Foundation/timmy-config Timmy_Foundation/hermes-agent}"
|
|
TOKEN="$(resolve_ops_token || true)"
|
|
[ -z "$TOKEN" ] && echo "WARN: no approved Timmy Gitea token found; feed will use unauthenticated API calls" >&2
|
|
|
|
echo -e "${B}${C} ◈ GITEA WORKFLOW${R} ${D}$(date '+%H:%M:%S')${R}"
|
|
echo -e "${D}────────────────────────────────────────${R}"
|
|
|
|
python3 - "$GITEA_URL" "$TOKEN" "$CORE_REPOS" <<'PY'
|
|
import json
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
|
|
base = sys.argv[1].rstrip("/")
|
|
token = sys.argv[2]
|
|
repos = sys.argv[3].split()
|
|
headers = {"Authorization": f"token {token}"} if token else {}
|
|
|
|
|
|
def fetch(path):
|
|
req = urllib.request.Request(f"{base}{path}", headers=headers)
|
|
with urllib.request.urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def short_repo(repo):
|
|
return repo.split("/", 1)[1]
|
|
|
|
|
|
issues = []
|
|
pulls = []
|
|
errors = []
|
|
|
|
for repo in repos:
|
|
try:
|
|
repo_pulls = fetch(f"/api/v1/repos/{repo}/pulls?state=open&limit=20")
|
|
for pr in repo_pulls:
|
|
pr["_repo"] = repo
|
|
pulls.append(pr)
|
|
repo_issues = fetch(f"/api/v1/repos/{repo}/issues?state=open&limit=50&type=issues")
|
|
for issue in repo_issues:
|
|
issue["_repo"] = repo
|
|
issues.append(issue)
|
|
except urllib.error.URLError as exc:
|
|
errors.append(f"{repo}: {exc.reason}")
|
|
except Exception as exc: # pragma: no cover - defensive panel path
|
|
errors.append(f"{repo}: {exc}")
|
|
|
|
print(" \033[1mOpen PRs\033[0m")
|
|
if not pulls:
|
|
print(" (none)")
|
|
else:
|
|
for pr in pulls[:8]:
|
|
print(
|
|
f" #{pr['number']:3d} {short_repo(pr['_repo']):12s} "
|
|
f"{pr['user']['login'][:12]:12s} {pr['title'][:40]}"
|
|
)
|
|
|
|
print("\033[2m────────────────────────────────────────\033[0m")
|
|
print(" \033[1mNeeds Timmy / Allegro Review\033[0m")
|
|
reviewers = []
|
|
for repo in repos:
|
|
try:
|
|
repo_items = fetch(f"/api/v1/repos/{repo}/issues?state=open&limit=50&type=pulls")
|
|
for item in repo_items:
|
|
assignees = [a.get("login", "") for a in (item.get("assignees") or [])]
|
|
if any(name in assignees for name in ("Timmy", "allegro")):
|
|
item["_repo"] = repo
|
|
reviewers.append(item)
|
|
except Exception:
|
|
continue
|
|
|
|
if not reviewers:
|
|
print(" (clear)")
|
|
else:
|
|
for item in reviewers[:8]:
|
|
names = ",".join(a.get("login", "") for a in (item.get("assignees") or []))
|
|
print(
|
|
f" #{item['number']:3d} {short_repo(item['_repo']):12s} "
|
|
f"{names[:18]:18s} {item['title'][:34]}"
|
|
)
|
|
|
|
print("\033[2m────────────────────────────────────────\033[0m")
|
|
print(" \033[1mIssue Queues\033[0m")
|
|
queue_agents = ["allegro", "codex-agent", "groq", "claude", "ezra", "perplexity", "KimiClaw"]
|
|
for agent in queue_agents:
|
|
assigned = [
|
|
issue
|
|
for issue in issues
|
|
if agent in [a.get("login", "") for a in (issue.get("assignees") or [])]
|
|
]
|
|
print(f" {agent:12s} {len(assigned):2d}")
|
|
|
|
unassigned = [issue for issue in issues if not issue.get("assignees")]
|
|
print("\033[2m────────────────────────────────────────\033[0m")
|
|
print(f" Unassigned issues: \033[33m{len(unassigned)}\033[0m")
|
|
|
|
if errors:
|
|
print("\033[2m────────────────────────────────────────\033[0m")
|
|
print(" \033[1mErrors\033[0m")
|
|
for err in errors[:4]:
|
|
print(f" {err}")
|
|
PY
|