diff --git a/tools/rockachopa_priority_watcher.py b/tools/rockachopa_priority_watcher.py new file mode 100644 index 00000000..3cb3fdd0 --- /dev/null +++ b/tools/rockachopa_priority_watcher.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +Rockachopa Priority Watcher + +Never ignore Alexander (Rockachopa) on Gitea. + +Scans all accessible repos for: +1. New comments by Rockachopa +2. New comments containing @Timmy mentions + +Tracks seen comment IDs in a local JSON state file to avoid duplicate alerts. +Designed to run from cron every 3-5 minutes. +""" + +import json +import os +import urllib.request +from datetime import datetime, timedelta +from pathlib import Path + +GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com") +TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")] +STATE_PATH = Path(os.environ.get("WATCHER_STATE_PATH", "/root/.rockachopa_watcher_state.json")) +PRIORITY_USERS = {"Rockachopa", "rockachopa"} +MENTION_TARGETS = {"@Timmy", "@timmy"} +ALERT_WEBHOOK = os.environ.get("WATCHER_ALERT_WEBHOOK", "") # e.g. telegram/discard origin + +def _load_token() -> str: + for path in TOKEN_PATHS: + try: + with open(path) as f: + token = f.read().strip() + if token: + return token + except FileNotFoundError: + pass + raise RuntimeError("Gitea token not found") + +def _api_request(path: str): + url = f"{GITEA_URL}/api/v1{path}" + req = urllib.request.Request(url, headers={"Authorization": f"token {_load_token()}"}) + with urllib.request.urlopen(req, timeout=20) as resp: + return json.loads(resp.read().decode()) + +def _load_state() -> dict: + if not STATE_PATH.exists(): + return {"seen_comment_ids": [], "last_run": None} + with open(STATE_PATH) as f: + return json.load(f) + +def _save_state(state: dict): + STATE_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(STATE_PATH, "w") as f: + json.dump(state, f, indent=2) + +def _get_repos() -> list: + """Fetch all repos the token can access.""" + repos = [] + # User repos + page = 1 + while True: + batch = _api_request(f"/user/repos?limit=50&page={page}") + if not batch: + break + repos.extend(batch) + if len(batch) < 50: + break + page += 1 + # Org repos + page = 1 + while True: + batch = _api_request(f"/orgs/Timmy_Foundation/repos?limit=50&page={page}") + if not batch: + break + repos.extend(batch) + if len(batch) < 50: + break + page += 1 + # Deduplicate by full_name + seen = set() + unique = [] + for r in repos: + fn = r["full_name"] + if fn not in seen: + seen.add(fn) + unique.append(r) + return unique + +def _get_recent_comments(repo_full_name: str, since: datetime) -> list: + """Fetch recent issue/PR comments for a repo.""" + comments = [] + # Issue comments + page = 1 + while True: + try: + batch = _api_request(f"/repos/{repo_full_name}/issues/comments?limit=50&page={page}&since={since.isoformat()}") + except Exception: + break + if not batch: + break + comments.extend(batch) + if len(batch) < 50: + break + page += 1 + return comments + +def _format_alert(comment: dict, repo: str) -> str: + user = comment.get("user", {}).get("login", "unknown") + url = comment.get("html_url", "") + body_preview = comment.get("body", "")[:200].replace("\n", " ") + return f"🚨 PRIORITY ALERT\nRepo: {repo}\nUser: {user}\nURL: {url}\nPreview: {body_preview}..." + +def run_watcher(): + state = _load_state() + seen = set(state.get("seen_comment_ids", [])) + last_run = state.get("last_run") + + # Default to checking last 24h on first run + if last_run: + since = datetime.fromisoformat(last_run) + else: + since = datetime.utcnow() - timedelta(hours=24) + + repos = _get_repos() + alerts = [] + new_seen = set(seen) + + for repo in repos: + fn = repo["full_name"] + try: + comments = _get_recent_comments(fn, since) + except Exception as e: + print(f"[SKIP] {fn}: {e}") + continue + + for c in comments: + cid = c.get("id") + if cid in seen: + continue + new_seen.add(cid) + + user = c.get("user", {}).get("login", "") + body = c.get("body", "") + + is_priority_user = user in PRIORITY_USERS + has_mention = any(m in body for m in MENTION_TARGETS) + + if is_priority_user or has_mention: + alerts.append(_format_alert(c, fn)) + + # Output alerts + if alerts: + for alert in alerts: + print(alert) + print("-" * 40) + else: + print(f"[{datetime.utcnow().isoformat()}] No new priority comments.") + + # Save state + state["seen_comment_ids"] = sorted(new_seen)[-5000:] # cap at 5k IDs + state["last_run"] = datetime.utcnow().isoformat() + _save_state(state) + + return alerts + +if __name__ == "__main__": + run_watcher()