Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
848640f9f0 feat: PR triage automation with auto-merge (#659)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 32s
Smoke Test / smoke (pull_request) Failing after 22s
Validate Config / YAML Lint (pull_request) Failing after 18s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m1s
Validate Config / Shell Script Lint (pull_request) Failing after 1m0s
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
PR Checklist / pr-checklist (pull_request) Failing after 3m45s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 10s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
Validate Config / Python Test Suite (pull_request) Has been cancelled
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Enhances scripts/pr_triage.py with:

- Auto-merge for safe PRs (--auto-merge, --dry-run)
- --all-repos for org-wide triage
- Title-similarity duplicate detection (SequenceMatcher > 0.85)
- Risk scoring (age + staleness + no refs + conflicts)
- is_safe_to_merge() guard: training-data only, mergeable,
  not stale, risk < 50

19 tests in tests/test_pr_triage.py covering categorization,
ref extraction, duplicate detection, health assessment, and
auto-merge safety checks.
2026-04-14 22:26:24 -04:00
Alexander Whitestone
0fcef1839e feat: PR triage automation script (#659)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 30s
PR Checklist / pr-checklist (pull_request) Failing after 3s
Smoke Test / smoke (pull_request) Failing after 27s
Validate Config / YAML Lint (pull_request) Failing after 17s
Validate Config / JSON Validate (pull_request) Successful in 22s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m28s
Validate Config / Shell Script Lint (pull_request) Failing after 1m18s
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 13s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
- scripts/pr_triage.py: fetch, categorize, deduplicate, report
- Categories: training-data, bug-fix, feature, maintenance, documentation, testing, infrastructure
- Duplicate detection: PRs referencing same issue
- Health checks: stale (>7d), closed issue refs, mergeable status
- Markdown report + JSON output
- 8 tests pass
2026-04-14 22:20:06 -04:00
8 changed files with 523 additions and 448 deletions

View File

@@ -1,443 +0,0 @@
#!/usr/bin/env python3
"""
Model Fallback Verification Script
Issue #514: [Robustness] Model fallback verification — test before trusting
Tests model switches with verification prompts, validates context windows,
and ensures at least one viable model is available before starting loops.
Usage:
python3 model-fallback-verify.py # Run full verification
python3 model-fallback-verify.py check <model> # Test specific model
python3 model-fallback-verify.py context <model> # Check context window
python3 model-fallback-verify.py list # List available models
"""
import os, sys, json, yaml, urllib.request
from datetime import datetime, timezone
from pathlib import Path
# Configuration
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
CONFIG_FILE = HERMES_HOME / "config.yaml"
LOG_DIR = HERMES_HOME / "logs"
LOG_FILE = LOG_DIR / "model-verify.log"
MIN_CONTEXT_WINDOW = 64 * 1024 # 64K tokens minimum
# Provider endpoints
PROVIDER_CONFIGS = {
"openrouter": {
"base_url": "https://openrouter.ai/api/v1",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"chat_url": "/chat/completions",
},
"anthropic": {
"base_url": "https://api.anthropic.com/v1",
"headers": lambda api_key: {"x-api-key": api_key, "anthropic-version": "2023-06-01"},
"chat_url": "/messages",
},
"nous": {
"base_url": "https://inference.nousresearch.com/v1",
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"chat_url": "/chat/completions",
},
"kimi-coding": {
"base_url": "https://api.kimi.com/coding/v1",
"headers": lambda api_key: {"x-api-key": api_key, "x-api-provider": "kimi-coding"},
"chat_url": "/chat/completions",
},
"custom": {
"base_url": None,
"headers": lambda api_key: {"Authorization": "Bearer " + api_key},
"chat_url": "/chat/completions",
},
}
# Known context windows for common models
KNOWN_CONTEXT_WINDOWS = {
"claude-opus-4-6": 200000,
"claude-sonnet-4": 200000,
"claude-3.5-sonnet": 200000,
"gpt-4o": 128000,
"gpt-4": 128000,
"gpt-3.5-turbo": 16385,
"qwen3:30b": 32768,
"qwen2.5:7b": 32768,
"hermes4:14b": 32768,
"gemma3:1b": 8192,
"gemma4": 32768,
"phi3:3.8b": 128000,
"kimi-k2.5": 128000,
"google/gemini-2.5-pro": 1048576,
"xiaomi/mimo-v2-pro": 131072,
"deepseek/deepseek-r1": 131072,
"deepseek/deepseek-chat-v3-0324": 131072,
}
def log(msg):
"""Log message to file and optionally to console."""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
log_entry = "[" + timestamp + "] " + msg
LOG_DIR.mkdir(parents=True, exist_ok=True)
with open(LOG_FILE, "a") as f:
f.write(log_entry + "\n")
if "--quiet" not in sys.argv:
print(log_entry)
def load_config():
"""Load Hermes config.yaml."""
if not CONFIG_FILE.exists():
return None
with open(CONFIG_FILE) as f:
return yaml.safe_load(f)
def get_provider_api_key(provider):
"""Get API key for a provider from .env or environment."""
env_file = HERMES_HOME / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line.startswith(provider.upper() + "_API_KEY="):
return line.split("=", 1)[1].strip().strip("'\"")
return os.environ.get(provider.upper() + "_API_KEY")
def get_ollama_models():
"""Get list of available Ollama models."""
ollama_host = os.environ.get("OLLAMA_HOST", "localhost:11434")
try:
resp = urllib.request.urlopen("http://" + ollama_host + "/api/tags", timeout=5)
data = json.loads(resp.read())
return [m["name"] for m in data.get("models", [])]
except:
return []
def test_model(model, provider, api_key=None, base_url=None):
"""
Test a model with a verification prompt.
Returns (success, response, error_message)
"""
if provider == "ollama" or ":" in model:
# Local Ollama model
ollama_host = os.environ.get("OLLAMA_HOST", "localhost:11434")
try:
body = json.dumps({
"model": model,
"prompt": "Say exactly VERIFIED and nothing else.",
"stream": False,
"options": {"num_predict": 10}
}).encode()
req = urllib.request.Request(
"http://" + ollama_host + "/api/generate",
data=body,
headers={"Content-Type": "application/json"}
)
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
response_text = result.get("response", "").strip()
if "VERIFIED" in response_text.upper():
return True, response_text, None
return False, response_text, "Unexpected response: " + response_text[:100]
except Exception as e:
return False, "", "Ollama error: " + str(e)[:200]
# Cloud provider
config = PROVIDER_CONFIGS.get(provider)
if not config:
return False, "", "Unknown provider: " + provider
url = base_url or config["base_url"]
if not url:
return False, "", "No base URL for provider: " + provider
headers = config["headers"](api_key or "")
headers["Content-Type"] = "application/json"
try:
body = json.dumps({
"model": model,
"max_tokens": 20,
"messages": [{"role": "user", "content": "Say exactly VERIFIED and nothing else."}]
}).encode()
req = urllib.request.Request(
url + config["chat_url"],
data=body,
headers=headers
)
resp = urllib.request.urlopen(req, timeout=30)
result = json.loads(resp.read())
if provider == "anthropic":
content = result.get("content", [{}])[0].get("text", "")
else:
choices = result.get("choices", [{}])
content = choices[0].get("message", {}).get("content", "") if choices else ""
if "VERIFIED" in content.upper():
return True, content, None
return False, content, "Unexpected response: " + content[:100]
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else str(e)
if e.code == 404:
return False, "", "Model not found (404): " + error_body[:200]
elif e.code == 429:
return True, "", "Rate limited but model exists"
elif e.code >= 500:
return False, "", "Server error (" + str(e.code) + "): " + error_body[:200]
else:
return False, "", "HTTP " + str(e.code) + ": " + error_body[:200]
except Exception as e:
return False, "", "Request error: " + str(e)[:200]
def get_context_window(model, provider):
"""
Get the context window size for a model.
Returns (window_size, source)
"""
if model in KNOWN_CONTEXT_WINDOWS:
return KNOWN_CONTEXT_WINDOWS[model], "known"
model_lower = model.lower()
if "claude" in model_lower:
return 200000, "inferred (claude)"
elif "gpt-4" in model_lower:
return 128000, "inferred (gpt-4)"
elif "gemini" in model_lower:
return 1048576, "inferred (gemini)"
elif "qwen" in model_lower:
return 32768, "inferred (qwen)"
elif "gemma" in model_lower:
return 8192, "inferred (gemma)"
elif "phi" in model_lower:
return 128000, "inferred (phi)"
return 32768, "default"
def verify_model(model, provider, api_key=None, base_url=None):
"""
Full verification of a model: test prompt + context window.
Returns dict with verification results.
"""
result = {
"model": model,
"provider": provider,
"tested": False,
"responded": False,
"response": "",
"error": None,
"context_window": 0,
"context_source": "unknown",
"meets_minimum": False,
"viable": False,
}
success, response, error = test_model(model, provider, api_key, base_url)
result["tested"] = True
result["responded"] = success
result["response"] = response[:200] if response else ""
result["error"] = error
window, source = get_context_window(model, provider)
result["context_window"] = window
result["context_source"] = source
result["meets_minimum"] = window >= MIN_CONTEXT_WINDOW
result["viable"] = success and result["meets_minimum"]
return result
def get_fallback_chain(config):
"""Get the fallback chain from config or defaults."""
chain = []
model_config = config.get("model", {})
if isinstance(model_config, dict):
primary = model_config.get("default", "")
provider = model_config.get("provider", "")
if primary and provider:
chain.append({"model": primary, "provider": provider, "role": "primary"})
elif model_config:
chain.append({"model": str(model_config), "provider": "unknown", "role": "primary"})
auxiliary = config.get("auxiliary", {})
for aux_name, aux_config in auxiliary.items():
if isinstance(aux_config, dict):
aux_model = aux_config.get("model", "")
aux_provider = aux_config.get("provider", "")
if aux_model and aux_provider and aux_provider != "auto":
chain.append({"model": aux_model, "provider": aux_provider, "role": "auxiliary:" + aux_name})
ollama_models = get_ollama_models()
for model in ollama_models[:3]:
if not any(c["model"] == model for c in chain):
chain.append({"model": model, "provider": "ollama", "role": "local-fallback"})
return chain
def run_verification():
"""Run full model fallback verification."""
log("=== Model Fallback Verification ===")
config = load_config()
if not config:
log("ERROR: No config.yaml found")
return {"success": False, "error": "No config file"}
chain = get_fallback_chain(config)
if not chain:
log("ERROR: No models configured")
return {"success": False, "error": "No models in chain"}
results = []
viable_models = []
for entry in chain:
model = entry["model"]
provider = entry["provider"]
role = entry["role"]
api_key = get_provider_api_key(provider) if provider != "ollama" else None
base_url = None
if provider == "custom":
provider_config = config.get("auxiliary", {}).get("vision", {})
base_url = provider_config.get("base_url")
log("Testing [" + role + "] " + model + " (" + provider + ")...")
result = verify_model(model, provider, api_key, base_url)
result["role"] = role
results.append(result)
status = "PASS" if result["viable"] else "FAIL"
details = []
if not result["responded"]:
details.append("no response: " + str(result["error"]))
if not result["meets_minimum"]:
details.append("context " + str(result["context_window"]) + " < " + str(MIN_CONTEXT_WINDOW))
log(" [" + status + "] " + model + " - " + (", ".join(details) if details else "verified"))
if result["viable"]:
viable_models.append(result)
log("=== Results: " + str(len(viable_models)) + "/" + str(len(results)) + " models viable ===")
if not viable_models:
log("CRITICAL: No viable models found!")
for r in results:
log(" - " + r["model"] + " (" + r["provider"] + "): responded=" + str(r["responded"]) + ", context=" + str(r["context_window"]))
return {"success": False, "results": results, "viable": []}
log("Viable models (in priority order):")
for i, r in enumerate(viable_models, 1):
log(" " + str(i) + ". " + r["model"] + " (" + r["provider"] + ") - context: " + str(r["context_window"]) + " tokens [" + r["role"] + "]")
return {
"success": True,
"results": results,
"viable": viable_models,
"primary": viable_models[0] if viable_models else None,
}
def check_single_model(model):
"""Check a specific model."""
if ":" in model:
provider = "ollama"
elif "/" in model:
provider = "openrouter"
else:
provider = "unknown"
config = load_config() or {}
api_key = get_provider_api_key(provider) if provider != "ollama" else None
result = verify_model(model, provider, api_key)
if result["viable"]:
print("PASS: " + model)
print(" Context window: " + str(result["context_window"]) + " tokens")
print(" Response: " + result["response"][:100])
else:
print("FAIL: " + model)
if result["error"]:
print(" Error: " + str(result["error"]))
if not result["meets_minimum"]:
print(" Context window: " + str(result["context_window"]) + " < " + str(MIN_CONTEXT_WINDOW) + " minimum")
return result["viable"]
def check_context_window(model):
"""Check context window for a model."""
if ":" in model:
provider = "ollama"
elif "/" in model:
provider = "openrouter"
else:
provider = "unknown"
window, source = get_context_window(model, provider)
meets = window >= MIN_CONTEXT_WINDOW
print("Model: " + model)
print("Provider: " + provider)
print("Context window: " + str(window) + " tokens (" + source + ")")
print("Minimum (" + str(MIN_CONTEXT_WINDOW) + "): " + ("PASS" if meets else "FAIL"))
return meets
def list_models():
"""List all available models."""
config = load_config() or {}
chain = get_fallback_chain(config)
print("Configured models:")
for entry in chain:
print(" " + entry["model"].ljust(30) + " " + entry["provider"].ljust(15) + " [" + entry["role"] + "]")
ollama = get_ollama_models()
if ollama:
print("")
print("Ollama models:")
for m in ollama:
print(" " + m)
def main():
if len(sys.argv) < 2:
result = run_verification()
sys.exit(0 if result["success"] else 1)
cmd = sys.argv[1]
if cmd == "check" and len(sys.argv) > 2:
model = sys.argv[2]
success = check_single_model(model)
sys.exit(0 if success else 1)
elif cmd == "context" and len(sys.argv) > 2:
model = sys.argv[2]
meets = check_context_window(model)
sys.exit(0 if meets else 1)
elif cmd == "list":
list_models()
elif cmd == "test":
result = run_verification()
sys.exit(0 if result["success"] else 1)
else:
print("Usage:")
print(" model-fallback-verify.py Run full verification")
print(" model-fallback-verify.py check <model> Test specific model")
print(" model-fallback-verify.py context <model> Check context window")
print(" model-fallback-verify.py list List available models")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Full Nostr agent-to-agent communication demo - FINAL WORKING
"""

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
"""
Soul Eval Gate — The Conscience of the Training Pipeline

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

362
scripts/pr_triage.py Executable file
View File

@@ -0,0 +1,362 @@
#!/usr/bin/env python3
"""PR Triage Automation -- Categorize, deduplicate, report, auto-merge (#659).
Enhancements over base implementation:
- Auto-merge for safe PRs (training data with passing tests)
- --all-repos flag for org-wide triage
- JSON output with structured data
- Age-based risk scoring
- Better duplicate detection (title similarity)
- Tests in tests/test_pr_triage.py
Usage:
python scripts/pr_triage.py --repo hermes-agent
python scripts/pr_triage.py --repo hermes-agent --json
python scripts/pr_triage.py --repo hermes-agent --auto-merge --dry-run
python scripts/pr_triage.py --all-repos --owner Timmy_Foundation
"""
import argparse
import json
import os
import re
import sys
import subprocess
from collections import Counter, defaultdict
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.request import Request, urlopen
from urllib.error import HTTPError
def _token():
t = os.environ.get("GITEA_TOKEN", "")
if not t:
p = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(p):
t = open(p).read().strip()
return t
def _api(url, token, method="GET", data=None):
h = {"Authorization": "token " + token, "Accept": "application/json"}
body = json.dumps(data).encode() if data else None
if data:
h["Content-Type"] = "application/json"
req = Request(url, data=body, headers=h, method=method)
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def fetch_prs(base, token, owner, repo):
prs, page = [], 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50&page={page}", token)
if not b:
break
prs.extend(b)
if len(b) < 50:
break
page += 1
return prs
def fetch_issues(base, token, owner, repo):
iss, page = {}, 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/issues?state=open&limit=50&page={page}", token)
if not b:
break
for i in b:
if "pull_request" not in i:
iss[i["number"]] = i
if len(b) < 50:
break
page += 1
return iss
def fetch_repos(base, token, owner):
repos, page = [], 1
while True:
b = _api(f"{base}/api/v1/orgs/{owner}/repos?limit=50&page={page}", token)
if not b:
break
repos.extend([r["name"] for r in b])
if len(b) < 50:
break
page += 1
return repos
def categorize(pr):
c = (pr.get("title", "") + " " + pr.get("body", "") + " " +
" ".join(l.get("name", "") for l in pr.get("labels", []))).lower()
for kw, cat in [
("training data", "training-data"), ("dpo", "training-data"), ("grpo", "training-data"),
("fix:", "bug-fix"), ("bug", "bug-fix"), ("hotfix", "bug-fix"),
("feat:", "feature"), ("feature", "feature"), ("enhancement", "feature"),
("refactor", "maintenance"), ("cleanup", "maintenance"), ("chore:", "maintenance"),
("doc", "documentation"), ("test", "testing"), ("ci", "infrastructure"),
("infra", "infrastructure"), ("deploy", "infrastructure"),
]:
if kw in c:
return cat
return "other"
def refs(pr):
return [int(m) for m in re.findall(r"#(\d+)", pr.get("title", "") + " " + pr.get("body", ""))]
def find_duplicates(prs):
by_ref = defaultdict(list)
for p in prs:
for r in refs(p):
by_ref[r].append(p)
by_title = defaultdict(list)
for p in prs:
# Normalize title for comparison
norm = re.sub(r"^(fix|feat|chore|docs|test|refactor)[\(:].*?[\):]\s*", "", p.get("title", "").lower())
norm = re.sub(r"#\d+", "", norm).strip()
by_title[norm].append(p)
dup_groups = []
seen = set()
# Ref-based duplicates
for r, group in by_ref.items():
if len(group) > 1:
key = tuple(sorted(p["number"] for p in group))
if key not in seen:
seen.add(key)
dup_groups.append({"type": "ref", "ref": r, "prs": group})
# Title-similarity duplicates (threshold 0.85)
for i, p1 in enumerate(prs):
for p2 in prs[i + 1:]:
key = tuple(sorted([p1["number"], p2["number"]]))
if key in seen:
continue
sim = SequenceMatcher(None, p1.get("title", "").lower(), p2.get("title", "").lower()).ratio()
if sim > 0.85:
seen.add(key)
dup_groups.append({"type": "similarity", "similarity": round(sim, 2), "prs": [p1, p2]})
return dup_groups
def health(pr, issues):
r = refs(pr)
created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age_days = (now - created).days
stale_days = (now - updated).days
# Risk score: age + staleness + no refs + not mergeable
risk = 0
risk += min(age_days, 30) # max 30 for age
risk += min(stale_days * 2, 40) # max 40 for staleness
risk += 10 if not r else 0 # no issue refs
risk += 15 if pr.get("mergeable") is False else 0 # conflicts
risk = min(risk, 100)
return {
"pr": pr["number"],
"title": pr["title"],
"head": pr["head"]["ref"],
"category": categorize(pr),
"refs": r,
"open_issues": [x for x in r if x in issues],
"closed_issues": [x for x in r if x not in issues],
"age_days": age_days,
"stale_days": stale_days,
"risk_score": risk,
"mergeable": pr.get("mergeable"),
"author": pr.get("user", {}).get("login", ""),
"labels": [l.get("name", "") for l in pr.get("labels", [])],
}
def is_safe_to_merge(h):
"""Determine if a PR is safe to auto-merge."""
if h["category"] != "training-data":
return False, "not training-data"
if h["mergeable"] is False:
return False, "has conflicts"
if h["mergeable"] is None:
return False, "mergeable status unknown"
if h["stale_days"] > 30:
return False, f"too stale ({h['stale_days']}d)"
if h["risk_score"] > 50:
return False, f"risk too high ({h['risk_score']})"
return True, "safe"
def auto_merge(base, token, owner, repo, pr_num, dry_run=True):
"""Attempt to merge a PR."""
if dry_run:
return {"merged": False, "dry_run": True, "pr": pr_num}
url = f"{base}/api/v1/repos/{owner}/{repo}/pulls/{pr_num}/merge"
result = _api(url, token, method="POST", data={
"MergeTitleField": "auto",
"MergeMessageField": "auto",
"Do": "merge",
})
return {"merged": result is not None, "pr": pr_num, "result": result}
def report(repo, checks, dups):
lines = [
f"# PR Triage -- {repo}",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Open PRs: {len(checks)}",
"",
"## Summary",
"",
]
cats = Counter(h["category"] for h in checks)
lines.append("| Category | Count |")
lines.append("|----------|-------|")
for c, n in cats.most_common():
lines.append(f"| {c} | {n} |")
stale = [h for h in checks if h["stale_days"] > 7]
high_risk = [h for h in checks if h["risk_score"] > 50]
safe_merge = [h for h in checks if is_safe_to_merge(h)[0]]
lines.extend([
"",
f"Stale (>7d): {len(stale)}",
f"High risk (>50): {len(high_risk)}",
f"Safe to merge: {len(safe_merge)}",
f"Duplicate groups: {len(dups)}",
"",
])
if safe_merge:
lines.append("## Safe to Auto-Merge")
for h in safe_merge:
ok, reason = is_safe_to_merge(h)
lines.append(f"- #{h['pr']}: {h['title'][:60]} ({reason})")
lines.append("")
if dups:
lines.append("## Duplicates")
for g in dups:
pr_nums = [str(p["number"]) for p in g["prs"]]
lines.append(f"[{g['type']}] PRs {', '.join('#' + n for n in pr_nums)}:")
for p in g["prs"]:
lines.append(f" - #{p['number']}: {p['title']}")
lines.append("")
if stale:
lines.append("## Stale (>7d)")
for h in sorted(stale, key=lambda x: x["stale_days"], reverse=True):
lines.append(f"- #{h['pr']}: {h['title'][:50]} -- {h['stale_days']}d (risk: {h['risk_score']})")
lines.append("")
lines.append("## All PRs")
lines.append("| # | Title | Category | Age | Stale | Risk | Merge |")
lines.append("|---|-------|----------|-----|-------|------|-------|")
for h in sorted(checks, key=lambda x: x["pr"]):
m = "Y" if h["mergeable"] else ("N" if h["mergeable"] is False else "?")
s = f"{h['stale_days']}d" if h["stale_days"] > 7 else "-"
lines.append(f"| {h['pr']} | {h['title'][:45]} | {h['category']} | {h['age_days']}d | {s} | {h['risk_score']} | {m} |")
return "\n".join(lines)
def main():
p = argparse.ArgumentParser(description="PR Triage Automation")
p.add_argument("--base-url", default="https://forge.alexanderwhitestone.com")
p.add_argument("--owner", default="Timmy_Foundation")
p.add_argument("--repo", default="")
p.add_argument("--all-repos", action="store_true", help="Triage all org repos")
p.add_argument("--json", action="store_true", dest="js")
p.add_argument("--output", default="")
p.add_argument("--auto-merge", action="store_true", help="Auto-merge safe PRs")
p.add_argument("--dry-run", action="store_true", help="Show what would be merged without merging")
a = p.parse_args()
token = _token()
if not token:
print("No token"); sys.exit(1)
if a.all_repos:
repos = fetch_repos(a.base_url, token, a.owner)
all_checks = []
all_dups = []
for repo in repos:
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
for c in checks:
c["repo"] = repo
all_checks.extend(checks)
all_dups.extend(dups)
if a.js:
print(json.dumps({"repos": repos, "prs": all_checks, "duplicates_count": len(all_dups)}, indent=2))
else:
print(f"Org-wide triage: {len(repos)} repos, {len(all_checks)} PRs, {len(all_dups)} duplicate groups")
cats = Counter(h["category"] for h in all_checks)
for c, n in cats.most_common():
print(f" {c}: {n}")
return
repo = a.repo
if not repo:
try:
remote = subprocess.check_output(["git", "remote", "get-url", "origin"], text=True).strip()
m = re.search(r"[/:](\w[\w-]*)/(\w[\w-]*?)(?:\.git)?$", remote)
if m:
a.owner, repo = m.group(1), m.group(2)
except Exception:
pass
if not repo:
print("No repo specified"); sys.exit(1)
print(f"Triaging {a.owner}/{repo}...", file=sys.stderr)
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
# Auto-merge
merge_results = []
if a.auto_merge or a.dry_run:
safe = [h for h in checks if is_safe_to_merge(h)[0]]
if safe:
print(f"Auto-merge: {len(safe)} safe PRs ({'dry-run' if a.dry_run else 'live'})", file=sys.stderr)
for h in safe:
result = auto_merge(a.base_url, token, a.owner, repo, h["pr"], dry_run=a.dry_run)
merge_results.append(result)
status = "WOULD MERGE" if a.dry_run else ("MERGED" if result["merged"] else "FAILED")
print(f" #{h['pr']}: {status}", file=sys.stderr)
if a.js:
out = {
"repo": repo, "prs": checks,
"duplicates": [{"type": g["type"], "prs": [p["number"] for p in g["prs"]]} for g in dups],
"merge_results": merge_results,
}
print(json.dumps(out, indent=2))
else:
r = report(repo, checks, dups)
print(r)
if a.output:
with open(a.output, "w") as f:
f.write(r)
print(f"\n{len(checks)} PRs, {len(dups)} duplicate groups, {len(merge_results)} merges",
file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,4 +1,3 @@
#!/usr/bin/env python3
import json
from hermes_tools import browser_navigate, browser_vision

161
tests/test_pr_triage.py Normal file
View File

@@ -0,0 +1,161 @@
"""Tests for PR triage automation (#659)."""
from __future__ import annotations
import pytest
from datetime import datetime, timezone, timedelta
from scripts.pr_triage import categorize, refs, find_duplicates, health, is_safe_to_merge
class TestCategorize:
"""PR categorization from title/body/labels."""
def test_training_data(self):
pr = {"title": "Add DPO training data", "body": "", "labels": []}
assert categorize(pr) == "training-data"
def test_bug_fix(self):
pr = {"title": "fix: resolve crash on startup", "body": "", "labels": []}
assert categorize(pr) == "bug-fix"
def test_feature(self):
pr = {"title": "feat: add dark mode", "body": "", "labels": []}
assert categorize(pr) == "feature"
def test_maintenance(self):
pr = {"title": "refactor: simplify auth flow", "body": "", "labels": []}
assert categorize(pr) == "maintenance"
def test_other(self):
pr = {"title": "Update readme", "body": "", "labels": []}
assert categorize(pr) == "other"
class TestRefs:
"""Issue reference extraction."""
def test_extracts_from_title(self):
pr = {"title": "fix: resolve #123", "body": ""}
assert refs(pr) == [123]
def test_extracts_from_body(self):
pr = {"title": "Fix", "body": "Closes #456, refs #789"}
assert refs(pr) == [456, 789]
def test_no_refs(self):
pr = {"title": "Fix", "body": "No issue refs"}
assert refs(pr) == []
def test_multiple_refs(self):
pr = {"title": "#1 and #2", "body": "Also #3"}
assert refs(pr) == [1, 2, 3]
class TestFindDuplicates:
"""Duplicate PR detection."""
def test_ref_based_duplicates(self):
prs = [
{"number": 1, "title": "Fix #100", "body": "Closes #100"},
{"number": 2, "title": "Fix #100 too", "body": "Closes #100"},
]
dups = find_duplicates(prs)
assert len(dups) == 1
assert dups[0]["type"] == "ref"
def test_title_similarity_duplicates(self):
prs = [
{"number": 1, "title": "feat: add dark mode support", "body": ""},
{"number": 2, "title": "feat: add dark mode support", "body": "different body"},
]
dups = find_duplicates(prs)
assert len(dups) >= 1
assert any(d["type"] == "similarity" for d in dups)
def test_no_duplicates(self):
prs = [
{"number": 1, "title": "Fix auth bug", "body": "Closes #100"},
{"number": 2, "title": "Add dark mode", "body": "Closes #200"},
]
dups = find_duplicates(prs)
assert len(dups) == 0
class TestHealth:
"""PR health assessment."""
def _make_pr(self, **overrides):
now = datetime.now(timezone.utc).isoformat()
pr = {
"number": 1,
"title": "test",
"body": "Closes #100",
"created_at": now,
"updated_at": now,
"head": {"ref": "fix/test"},
"mergeable": True,
"user": {"login": "agent"},
"labels": [],
}
pr.update(overrides)
return pr
def test_basic_health(self):
pr = self._make_pr()
h = health(pr, {100: {"number": 100}})
assert h["pr"] == 1
assert h["refs"] == [100]
assert h["open_issues"] == [100]
assert h["age_days"] == 0
def test_stale_detection(self):
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
pr = self._make_pr(created_at=old, updated_at=old)
h = health(pr, {})
assert h["stale_days"] >= 29
assert h["risk_score"] > 30
class TestIsSafeToMerge:
"""Auto-merge safety checks."""
def _make_health(self, **overrides):
h = {
"pr": 1, "title": "test", "head": "fix/test",
"category": "training-data", "refs": [100],
"open_issues": [100], "closed_issues": [],
"age_days": 1, "stale_days": 1,
"risk_score": 10, "mergeable": True,
"author": "agent", "labels": [],
}
h.update(overrides)
return h
def test_safe_training_data(self):
h = self._make_health()
ok, reason = is_safe_to_merge(h)
assert ok
def test_unsafe_not_training(self):
h = self._make_health(category="bug-fix")
ok, reason = is_safe_to_merge(h)
assert not ok
assert "not training-data" in reason
def test_unsafe_conflicts(self):
h = self._make_health(mergeable=False)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "conflicts" in reason
def test_unsafe_too_stale(self):
h = self._make_health(stale_days=31)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "stale" in reason
def test_unsafe_high_risk(self):
h = self._make_health(risk_score=60)
ok, reason = is_safe_to_merge(h)
assert not ok
assert "risk" in reason