Adds tools/rockachopa_priority_watcher.py: - Scans all accessible repos for new comments by Rockachopa - Scans for @Timmy mentions in recent comments - Deduplicates via JSON state file - Designed for 3-5 minute cron execution
168 lines
4.8 KiB
Python
168 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Rockachopa Priority Watcher
|
|
|
|
Never ignore Alexander (Rockachopa) on Gitea.
|
|
|
|
Scans all accessible repos for:
|
|
1. New comments by Rockachopa
|
|
2. New comments containing @Timmy mentions
|
|
|
|
Tracks seen comment IDs in a local JSON state file to avoid duplicate alerts.
|
|
Designed to run from cron every 3-5 minutes.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import urllib.request
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
|
TOKEN_PATHS = ["/root/.gitea_token", os.path.expanduser("~/.gitea_token")]
|
|
STATE_PATH = Path(os.environ.get("WATCHER_STATE_PATH", "/root/.rockachopa_watcher_state.json"))
|
|
PRIORITY_USERS = {"Rockachopa", "rockachopa"}
|
|
MENTION_TARGETS = {"@Timmy", "@timmy"}
|
|
ALERT_WEBHOOK = os.environ.get("WATCHER_ALERT_WEBHOOK", "") # e.g. telegram/discard origin
|
|
|
|
def _load_token() -> str:
|
|
for path in TOKEN_PATHS:
|
|
try:
|
|
with open(path) as f:
|
|
token = f.read().strip()
|
|
if token:
|
|
return token
|
|
except FileNotFoundError:
|
|
pass
|
|
raise RuntimeError("Gitea token not found")
|
|
|
|
def _api_request(path: str):
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
req = urllib.request.Request(url, headers={"Authorization": f"token {_load_token()}"})
|
|
with urllib.request.urlopen(req, timeout=20) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
def _load_state() -> dict:
|
|
if not STATE_PATH.exists():
|
|
return {"seen_comment_ids": [], "last_run": None}
|
|
with open(STATE_PATH) as f:
|
|
return json.load(f)
|
|
|
|
def _save_state(state: dict):
|
|
STATE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(STATE_PATH, "w") as f:
|
|
json.dump(state, f, indent=2)
|
|
|
|
def _get_repos() -> list:
|
|
"""Fetch all repos the token can access."""
|
|
repos = []
|
|
# User repos
|
|
page = 1
|
|
while True:
|
|
batch = _api_request(f"/user/repos?limit=50&page={page}")
|
|
if not batch:
|
|
break
|
|
repos.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
# Org repos
|
|
page = 1
|
|
while True:
|
|
batch = _api_request(f"/orgs/Timmy_Foundation/repos?limit=50&page={page}")
|
|
if not batch:
|
|
break
|
|
repos.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
# Deduplicate by full_name
|
|
seen = set()
|
|
unique = []
|
|
for r in repos:
|
|
fn = r["full_name"]
|
|
if fn not in seen:
|
|
seen.add(fn)
|
|
unique.append(r)
|
|
return unique
|
|
|
|
def _get_recent_comments(repo_full_name: str, since: datetime) -> list:
|
|
"""Fetch recent issue/PR comments for a repo."""
|
|
comments = []
|
|
# Issue comments
|
|
page = 1
|
|
while True:
|
|
try:
|
|
batch = _api_request(f"/repos/{repo_full_name}/issues/comments?limit=50&page={page}&since={since.isoformat()}")
|
|
except Exception:
|
|
break
|
|
if not batch:
|
|
break
|
|
comments.extend(batch)
|
|
if len(batch) < 50:
|
|
break
|
|
page += 1
|
|
return comments
|
|
|
|
def _format_alert(comment: dict, repo: str) -> str:
|
|
user = comment.get("user", {}).get("login", "unknown")
|
|
url = comment.get("html_url", "")
|
|
body_preview = comment.get("body", "")[:200].replace("\n", " ")
|
|
return f"🚨 PRIORITY ALERT\nRepo: {repo}\nUser: {user}\nURL: {url}\nPreview: {body_preview}..."
|
|
|
|
def run_watcher():
|
|
state = _load_state()
|
|
seen = set(state.get("seen_comment_ids", []))
|
|
last_run = state.get("last_run")
|
|
|
|
# Default to checking last 24h on first run
|
|
if last_run:
|
|
since = datetime.fromisoformat(last_run)
|
|
else:
|
|
since = datetime.utcnow() - timedelta(hours=24)
|
|
|
|
repos = _get_repos()
|
|
alerts = []
|
|
new_seen = set(seen)
|
|
|
|
for repo in repos:
|
|
fn = repo["full_name"]
|
|
try:
|
|
comments = _get_recent_comments(fn, since)
|
|
except Exception as e:
|
|
print(f"[SKIP] {fn}: {e}")
|
|
continue
|
|
|
|
for c in comments:
|
|
cid = c.get("id")
|
|
if cid in seen:
|
|
continue
|
|
new_seen.add(cid)
|
|
|
|
user = c.get("user", {}).get("login", "")
|
|
body = c.get("body", "")
|
|
|
|
is_priority_user = user in PRIORITY_USERS
|
|
has_mention = any(m in body for m in MENTION_TARGETS)
|
|
|
|
if is_priority_user or has_mention:
|
|
alerts.append(_format_alert(c, fn))
|
|
|
|
# Output alerts
|
|
if alerts:
|
|
for alert in alerts:
|
|
print(alert)
|
|
print("-" * 40)
|
|
else:
|
|
print(f"[{datetime.utcnow().isoformat()}] No new priority comments.")
|
|
|
|
# Save state
|
|
state["seen_comment_ids"] = sorted(new_seen)[-5000:] # cap at 5k IDs
|
|
state["last_run"] = datetime.utcnow().isoformat()
|
|
_save_state(state)
|
|
|
|
return alerts
|
|
|
|
if __name__ == "__main__":
|
|
run_watcher()
|