diff --git a/channel_directory.json b/channel_directory.json index 42323e3a..4bb69aa2 100644 --- a/channel_directory.json +++ b/channel_directory.json @@ -1,5 +1,5 @@ { - "updated_at": "2026-03-24T15:41:38.471593", + "updated_at": "2026-03-25T19:19:11.011202", "platforms": { "discord": [ { @@ -21,7 +21,14 @@ "thread_id": null } ], - "telegram": [], + "telegram": [ + { + "id": "-1003664764329", + "name": "Timmy Time", + "type": "group", + "thread_id": null + } + ], "whatsapp": [], "signal": [], "email": [], diff --git a/tasks.py b/tasks.py index 7ff35538..4fddfe4f 100644 --- a/tasks.py +++ b/tasks.py @@ -1,6 +1,11 @@ -"""Timmy's scheduled work — triage, PR review, dispatch.""" +"""Timmy's scheduled work — orchestration, sovereignty, heartbeat.""" +import json +import glob +import os +import subprocess import sys +from datetime import datetime, timezone from pathlib import Path # Gitea client lives in sovereign-orchestration @@ -10,6 +15,8 @@ from orchestration import huey from huey import crontab from gitea_client import GiteaClient +HERMES_HOME = Path.home() / ".hermes" +TIMMY_HOME = Path.home() / ".timmy" REPOS = [ "Timmy_Foundation/the-nexus", "Timmy_Foundation/autolora", @@ -18,6 +25,8 @@ REPOS = [ NET_LINE_LIMIT = 10 +# ── Existing: Orchestration ────────────────────────────────────────── + @huey.periodic_task(crontab(minute="*/15")) def triage_issues(): """Score and assign unassigned issues across all repos.""" @@ -78,3 +87,344 @@ def dispatch_work(repo, issue_number, agent): repo, issue_number, f"⚡ Dispatched to `{agent}`. Huey task queued." ) + + +# ── NEW 1: Config Sync ─────────────────────────────────────────────── + +@huey.periodic_task(crontab(minute="0")) # every hour on the hour +def sync_config_up(): + """Push live ~/.hermes config changes UP to timmy-config repo.""" + script = TIMMY_HOME / "timmy-config" / "bin" / "sync-up.sh" + if not script.exists(): + return {"error": "sync-up.sh not found"} + result = subprocess.run( + ["bash", str(script)], + capture_output=True, text=True, timeout=60 + ) + return { + "exit_code": result.returncode, + "output": result.stdout[-500:] if result.stdout else "", + "error": result.stderr[-200:] if result.stderr else "", + } + + +# ── NEW 2: Session Export for DPO ──────────────────────────────────── + +@huey.periodic_task(crontab(hour="*/4", minute="30")) # every 4 hours +def session_export(): + """Scan recent sessions, extract conversation pairs for DPO training.""" + sessions_dir = HERMES_HOME / "sessions" + export_dir = TIMMY_HOME / "training-data" / "dpo-pairs" + export_dir.mkdir(parents=True, exist_ok=True) + + marker_file = export_dir / ".last_export" + last_export = "" + if marker_file.exists(): + last_export = marker_file.read_text().strip() + + exported = 0 + session_files = sorted(sessions_dir.glob("session_*.json")) + + for sf in session_files: + if sf.name <= last_export: + continue + try: + data = json.loads(sf.read_text()) + messages = data.get("messages", []) + # Extract user->assistant pairs (raw material for DPO curation) + pairs = [] + for i, msg in enumerate(messages): + if msg.get("role") == "user" and i + 1 < len(messages): + next_msg = messages[i + 1] + if next_msg.get("role") == "assistant": + pairs.append({ + "prompt": msg.get("content", "")[:2000], + "chosen": next_msg.get("content", "")[:2000], + "session": sf.name, + }) + if pairs: + out_file = export_dir / sf.name + out_file.write_text(json.dumps(pairs, indent=2)) + exported += 1 + except (json.JSONDecodeError, KeyError): + continue + + # Update marker + if session_files: + marker_file.write_text(session_files[-1].name) + + return {"exported": exported, "total_sessions": len(session_files)} + + +# ── NEW 3: Model Health Check ──────────────────────────────────────── + +@huey.periodic_task(crontab(minute="*/5")) # every 5 minutes +def model_health(): + """Check Ollama is running, a model is loaded, inference responds.""" + checks = {} + + # 1. Is Ollama process running? + try: + result = subprocess.run( + ["pgrep", "-f", "ollama"], + capture_output=True, timeout=5 + ) + checks["ollama_running"] = result.returncode == 0 + except Exception: + checks["ollama_running"] = False + + # 2. Can we hit the API? + try: + import urllib.request + req = urllib.request.Request("http://localhost:11434/api/tags") + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read()) + models = [m["name"] for m in data.get("models", [])] + checks["models_loaded"] = models + checks["api_responding"] = True + except Exception as e: + checks["api_responding"] = False + checks["error"] = str(e) + + # 3. Can we do a tiny inference? + if checks.get("api_responding"): + try: + payload = json.dumps({ + "model": "hermes3:8b", + "messages": [{"role": "user", "content": "ping"}], + "max_tokens": 5, + "stream": False, + }).encode() + req = urllib.request.Request( + "http://localhost:11434/v1/chat/completions", + data=payload, + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=30) as resp: + checks["inference_ok"] = resp.status == 200 + except Exception as e: + checks["inference_ok"] = False + checks["inference_error"] = str(e) + + # Write health status to a file for other tools to read + health_file = HERMES_HOME / "model_health.json" + checks["timestamp"] = datetime.now(timezone.utc).isoformat() + health_file.write_text(json.dumps(checks, indent=2)) + + return checks + + +# ── NEW 4: Heartbeat Tick ──────────────────────────────────────────── + +@huey.periodic_task(crontab(minute="*/10")) # every 10 minutes +def heartbeat_tick(): + """Perceive — Reflect — Remember — Decide — Act — Learn. + + This is the nervous system. Each tick: + 1. Perceive: gather state (Gitea activity, model health, open issues) + 2. Reflect: what changed since last tick? + 3. Remember: log perception to episodic memory + 4. Decide: anything need action? + 5. Act: create comments, close issues, alert + 6. Learn: log outcome for training data + """ + tick_dir = TIMMY_HOME / "heartbeat" + tick_dir.mkdir(parents=True, exist_ok=True) + + now = datetime.now(timezone.utc) + tick_id = now.strftime("%Y%m%d_%H%M%S") + + perception = {} + + # PERCEIVE: gather state + try: + g = GiteaClient() + perception["gitea_alive"] = g.ping() + except Exception: + perception["gitea_alive"] = False + + # Model health (read from health file) + health_file = HERMES_HOME / "model_health.json" + if health_file.exists(): + try: + perception["model_health"] = json.loads(health_file.read_text()) + except Exception: + perception["model_health"] = "unreadable" + + # Open issue/PR counts + if perception.get("gitea_alive"): + try: + g = GiteaClient() + for repo in REPOS: + issues = g.list_issues(repo, state="open", limit=1) + pulls = g.list_pulls(repo, state="open", limit=1) + perception[repo] = { + "open_issues": len(issues), + "open_prs": len(pulls), + } + except Exception as e: + perception["gitea_error"] = str(e) + + # Huey consumer alive (we're running, so yes) + perception["huey_alive"] = True + + # REFLECT + REMEMBER: compare to last tick, log + last_tick_file = tick_dir / "last_tick.json" + last_tick = {} + if last_tick_file.exists(): + try: + last_tick = json.loads(last_tick_file.read_text()) + except Exception: + pass + + tick_record = { + "tick_id": tick_id, + "timestamp": now.isoformat(), + "perception": perception, + "previous_tick": last_tick.get("tick_id", "none"), + } + + # DECIDE + ACT: check for problems + actions = [] + if not perception.get("gitea_alive"): + actions.append("ALERT: Gitea unreachable") + health = perception.get("model_health", {}) + if isinstance(health, dict) and not health.get("ollama_running"): + actions.append("ALERT: Ollama not running") + + tick_record["actions"] = actions + + # Save tick + last_tick_file.write_text(json.dumps(tick_record, indent=2)) + + # LEARN: append to episodic log + log_file = tick_dir / f"ticks_{now.strftime('%Y%m%d')}.jsonl" + with open(log_file, "a") as f: + f.write(json.dumps(tick_record) + "\n") + + return tick_record + + +# ── NEW 5: Memory Compress (Morning Briefing) ─────────────────────── + +@huey.periodic_task(crontab(hour="8", minute="0")) # 8 AM daily +def memory_compress(): + """Morning briefing — compress recent heartbeat ticks into summary. + + Reads yesterday's tick log, compresses into a briefing file + that can be injected into system prompt at startup. + """ + tick_dir = TIMMY_HOME / "heartbeat" + briefing_dir = TIMMY_HOME / "briefings" + briefing_dir.mkdir(parents=True, exist_ok=True) + + # Find yesterday's tick log + from datetime import timedelta + yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y%m%d") + tick_log = tick_dir / f"ticks_{yesterday}.jsonl" + + if not tick_log.exists(): + return {"status": "no ticks from yesterday"} + + # Read all ticks + ticks = [] + for line in tick_log.read_text().strip().split("\n"): + try: + ticks.append(json.loads(line)) + except Exception: + continue + + if not ticks: + return {"status": "empty tick log"} + + # Compress: extract key facts + alerts = [] + gitea_down_count = 0 + ollama_down_count = 0 + + for t in ticks: + for action in t.get("actions", []): + alerts.append(f"[{t['tick_id']}] {action}") + p = t.get("perception", {}) + if not p.get("gitea_alive"): + gitea_down_count += 1 + health = p.get("model_health", {}) + if isinstance(health, dict) and not health.get("ollama_running"): + ollama_down_count += 1 + + # Last tick's perception = current state + last = ticks[-1].get("perception", {}) + + briefing = { + "date": yesterday, + "total_ticks": len(ticks), + "alerts": alerts[-10:], # last 10 alerts + "gitea_downtime_ticks": gitea_down_count, + "ollama_downtime_ticks": ollama_down_count, + "last_known_state": last, + } + + briefing_file = briefing_dir / f"briefing_{yesterday}.json" + briefing_file.write_text(json.dumps(briefing, indent=2)) + + return briefing + + +# ── NEW 6: Repo Watchdog ───────────────────────────────────────────── + +@huey.periodic_task(crontab(minute="*/20")) # every 20 minutes +def repo_watchdog(): + """Poll Gitea for new issues/PRs since last check. No webhooks needed.""" + state_file = HERMES_HOME / "watchdog_state.json" + + state = {} + if state_file.exists(): + try: + state = json.loads(state_file.read_text()) + except Exception: + pass + + g = GiteaClient() + new_items = [] + + for repo in REPOS: + repo_state = state.get(repo, {"last_issue": 0, "last_pr": 0}) + + # Check issues + try: + issues = g.list_issues(repo, state="open", sort="created", direction="desc", limit=5) + for issue in issues: + if issue.number > repo_state["last_issue"]: + new_items.append({ + "type": "issue", + "repo": repo, + "number": issue.number, + "title": issue.title, + "creator": issue.user.login if hasattr(issue, 'user') and issue.user else "unknown", + }) + if issues: + repo_state["last_issue"] = max(i.number for i in issues) + except Exception: + pass + + # Check PRs + try: + prs = g.list_pulls(repo, state="open", sort="newest", limit=5) + for pr in prs: + if pr.number > repo_state.get("last_pr", 0): + new_items.append({ + "type": "pr", + "repo": repo, + "number": pr.number, + "title": pr.title, + }) + if prs: + repo_state["last_pr"] = max(p.number for p in prs) + except Exception: + pass + + state[repo] = repo_state + + state_file.write_text(json.dumps(state, indent=2)) + + return {"new_items": len(new_items), "items": new_items[:10]}