Files
timmy-home/scripts/telegram_thread_reporter.py
Bezalel 39540a2a8c feat(infra): auto-restart agent, backup pipeline, Telegram thread reporter (#560, #561, #895)
- scripts/auto_restart_agent.sh — monitor and restart dead processes (3-attempt backoff)
- scripts/backup_pipeline.sh — daily backups with retention + offsite rsync hook
- scripts/telegram_thread_reporter.py — route messages to ops/burn/main threads
- infrastructure/cron/*.crontab — scheduling for new automations
2026-04-07 15:43:21 +00:00

60 lines
2.0 KiB
Python

#!/usr/bin/env python3
"""
telegram_thread_reporter.py — Route reports to Telegram threads (#895)
Usage:
python telegram_thread_reporter.py --topic ops --message "Heartbeat OK"
python telegram_thread_reporter.py --topic burn --message "Burn cycle done"
python telegram_thread_reporter.py --topic main --message "Escalation!"
"""
import argparse
import os
import sys
import urllib.request
import urllib.parse
import json
DEFAULT_THREADS = {
"ops": os.environ.get("TELEGRAM_OPS_THREAD_ID"),
"burn": os.environ.get("TELEGRAM_BURN_THREAD_ID"),
"main": None, # main channel = no thread id
}
def send_message(bot_token: str, chat_id: str, text: str, thread_id: str | None = None):
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"}
if thread_id:
data["message_thread_id"] = thread_id
payload = urllib.parse.urlencode(data).encode("utf-8")
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/x-www-form-urlencoded"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
return {"ok": False, "error": str(e)}
def main():
parser = argparse.ArgumentParser(description="Telegram thread reporter")
parser.add_argument("--topic", required=True, choices=["ops", "burn", "main"])
parser.add_argument("--message", required=True)
args = parser.parse_args()
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN")
chat_id = os.environ.get("TELEGRAM_CHAT_ID")
if not bot_token or not chat_id:
print("Missing TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID", file=sys.stderr)
sys.exit(1)
thread_id = DEFAULT_THREADS.get(args.topic)
result = send_message(bot_token, chat_id, args.message, thread_id)
if result.get("ok"):
print(f"Sent to {args.topic}")
else:
print(f"Failed: {result}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()