"""Timmy's scheduled work — orchestration, sovereignty, heartbeat.""" import json import glob import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path # Gitea client lives in sovereign-orchestration sys.path.insert(0, str(Path.home() / ".timmy" / "sovereign-orchestration" / "src")) from orchestration import huey from huey import crontab from gitea_client import GiteaClient HERMES_HOME = Path.home() / ".hermes" TIMMY_HOME = Path.home() / ".timmy" REPOS = [ "Timmy_Foundation/the-nexus", "Timmy_Foundation/timmy-config", "Timmy_Foundation/sovereign-orchestration", "Rockachopa/the-matrix", ] NET_LINE_LIMIT = 10 # ── Existing: Orchestration ────────────────────────────────────────── @huey.periodic_task(crontab(minute="*/15")) def triage_issues(): """Score and assign unassigned issues across all repos.""" g = GiteaClient() found = 0 for repo in REPOS: for issue in g.find_unassigned_issues(repo, limit=10): found += 1 g.create_comment( repo, issue.number, "🔍 Triaged by Huey — needs assignment." ) return {"triaged": found} @huey.periodic_task(crontab(minute="*/30")) def review_prs(): """Review open PRs: check net diff, reject violations.""" g = GiteaClient() reviewed, rejected = 0, 0 for repo in REPOS: for pr in g.list_pulls(repo, state="open", limit=20): reviewed += 1 files = g.get_pull_files(repo, pr.number) net = sum(f.additions - f.deletions for f in files) if net > NET_LINE_LIMIT: rejected += 1 g.create_comment( repo, pr.number, f"❌ Net +{net} lines exceeds the {NET_LINE_LIMIT}-line limit. " f"Find {net - NET_LINE_LIMIT} lines to cut. See CONTRIBUTING.md." ) return {"reviewed": reviewed, "rejected": rejected} @huey.periodic_task(crontab(minute="*/10")) def dispatch_assigned(): """Pick up issues assigned to agents and kick off work.""" g = GiteaClient() agents = ["claude", "gemini", "kimi", "grok", "perplexity"] dispatched = 0 for repo in REPOS: for agent in agents: for issue in g.find_agent_issues(repo, agent, limit=5): comments = g.list_comments(repo, issue.number, limit=5) if any(c.body and "dispatched" in c.body.lower() for c in comments): continue dispatch_work(repo, issue.number, agent) dispatched += 1 return {"dispatched": dispatched} @huey.task(retries=3, retry_delay=60) def dispatch_work(repo, issue_number, agent): """Dispatch a single issue to an agent. Huey handles retry.""" g = GiteaClient() g.create_comment( repo, issue_number, f"⚡ Dispatched to `{agent}`. Huey task queued." ) # ── NEW 1: Config Sync ─────────────────────────────────────────────── @huey.periodic_task(crontab(minute="0")) # every hour on the hour def sync_config_up(): """Push live ~/.hermes config changes UP to timmy-config repo.""" script = TIMMY_HOME / "timmy-config" / "bin" / "sync-up.sh" if not script.exists(): return {"error": "sync-up.sh not found"} result = subprocess.run( ["bash", str(script)], capture_output=True, text=True, timeout=60 ) return { "exit_code": result.returncode, "output": result.stdout[-500:] if result.stdout else "", "error": result.stderr[-200:] if result.stderr else "", } # ── NEW 2: Session Export for DPO ──────────────────────────────────── @huey.periodic_task(crontab(hour="*/4", minute="30")) # every 4 hours def session_export(): """Scan recent sessions, extract conversation pairs for DPO training.""" sessions_dir = HERMES_HOME / "sessions" export_dir = TIMMY_HOME / "training-data" / "dpo-pairs" export_dir.mkdir(parents=True, exist_ok=True) marker_file = export_dir / ".last_export" last_export = "" if marker_file.exists(): last_export = marker_file.read_text().strip() exported = 0 session_files = sorted(sessions_dir.glob("session_*.json")) for sf in session_files: if sf.name <= last_export: continue try: data = json.loads(sf.read_text()) messages = data.get("messages", []) # Extract user->assistant pairs (raw material for DPO curation) pairs = [] for i, msg in enumerate(messages): if msg.get("role") == "user" and i + 1 < len(messages): next_msg = messages[i + 1] if next_msg.get("role") == "assistant": pairs.append({ "prompt": msg.get("content", "")[:2000], "chosen": next_msg.get("content", "")[:2000], "session": sf.name, }) if pairs: out_file = export_dir / sf.name out_file.write_text(json.dumps(pairs, indent=2)) exported += 1 except (json.JSONDecodeError, KeyError): continue # Update marker if session_files: marker_file.write_text(session_files[-1].name) return {"exported": exported, "total_sessions": len(session_files)} # ── NEW 3: Model Health Check ──────────────────────────────────────── @huey.periodic_task(crontab(minute="*/5")) # every 5 minutes def model_health(): """Check Ollama is running, a model is loaded, inference responds.""" checks = {} # 1. Is Ollama process running? try: result = subprocess.run( ["pgrep", "-f", "ollama"], capture_output=True, timeout=5 ) checks["ollama_running"] = result.returncode == 0 except Exception: checks["ollama_running"] = False # 2. Can we hit the API? try: import urllib.request req = urllib.request.Request("http://localhost:11434/api/tags") with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read()) models = [m["name"] for m in data.get("models", [])] checks["models_loaded"] = models checks["api_responding"] = True except Exception as e: checks["api_responding"] = False checks["error"] = str(e) # 3. Can we do a tiny inference? if checks.get("api_responding"): try: payload = json.dumps({ "model": "hermes3:8b", "messages": [{"role": "user", "content": "ping"}], "max_tokens": 5, "stream": False, }).encode() req = urllib.request.Request( "http://localhost:11434/v1/chat/completions", data=payload, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=30) as resp: checks["inference_ok"] = resp.status == 200 except Exception as e: checks["inference_ok"] = False checks["inference_error"] = str(e) # Write health status to a file for other tools to read health_file = HERMES_HOME / "model_health.json" checks["timestamp"] = datetime.now(timezone.utc).isoformat() health_file.write_text(json.dumps(checks, indent=2)) return checks # ── NEW 4: Heartbeat Tick ──────────────────────────────────────────── @huey.periodic_task(crontab(minute="*/10")) # every 10 minutes def heartbeat_tick(): """Perceive — Reflect — Remember — Decide — Act — Learn. This is the nervous system. Each tick: 1. Perceive: gather state (Gitea activity, model health, open issues) 2. Reflect: what changed since last tick? 3. Remember: log perception to episodic memory 4. Decide: anything need action? 5. Act: create comments, close issues, alert 6. Learn: log outcome for training data """ tick_dir = TIMMY_HOME / "heartbeat" tick_dir.mkdir(parents=True, exist_ok=True) now = datetime.now(timezone.utc) tick_id = now.strftime("%Y%m%d_%H%M%S") perception = {} # PERCEIVE: gather state try: g = GiteaClient() perception["gitea_alive"] = g.ping() except Exception: perception["gitea_alive"] = False # Model health (read from health file) health_file = HERMES_HOME / "model_health.json" if health_file.exists(): try: perception["model_health"] = json.loads(health_file.read_text()) except Exception: perception["model_health"] = "unreadable" # Open issue/PR counts if perception.get("gitea_alive"): try: g = GiteaClient() for repo in REPOS: issues = g.list_issues(repo, state="open", limit=1) pulls = g.list_pulls(repo, state="open", limit=1) perception[repo] = { "open_issues": len(issues), "open_prs": len(pulls), } except Exception as e: perception["gitea_error"] = str(e) # Huey consumer alive (we're running, so yes) perception["huey_alive"] = True # REFLECT + REMEMBER: compare to last tick, log last_tick_file = tick_dir / "last_tick.json" last_tick = {} if last_tick_file.exists(): try: last_tick = json.loads(last_tick_file.read_text()) except Exception: pass tick_record = { "tick_id": tick_id, "timestamp": now.isoformat(), "perception": perception, "previous_tick": last_tick.get("tick_id", "none"), } # DECIDE + ACT: check for problems actions = [] if not perception.get("gitea_alive"): actions.append("ALERT: Gitea unreachable") health = perception.get("model_health", {}) if isinstance(health, dict) and not health.get("ollama_running"): actions.append("ALERT: Ollama not running") tick_record["actions"] = actions # Save tick last_tick_file.write_text(json.dumps(tick_record, indent=2)) # LEARN: append to episodic log log_file = tick_dir / f"ticks_{now.strftime('%Y%m%d')}.jsonl" with open(log_file, "a") as f: f.write(json.dumps(tick_record) + "\n") return tick_record # ── NEW 5: Memory Compress (Morning Briefing) ─────────────────────── @huey.periodic_task(crontab(hour="8", minute="0")) # 8 AM daily def memory_compress(): """Morning briefing — compress recent heartbeat ticks into summary. Reads yesterday's tick log, compresses into a briefing file that can be injected into system prompt at startup. """ tick_dir = TIMMY_HOME / "heartbeat" briefing_dir = TIMMY_HOME / "briefings" briefing_dir.mkdir(parents=True, exist_ok=True) # Find yesterday's tick log from datetime import timedelta yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).strftime("%Y%m%d") tick_log = tick_dir / f"ticks_{yesterday}.jsonl" if not tick_log.exists(): return {"status": "no ticks from yesterday"} # Read all ticks ticks = [] for line in tick_log.read_text().strip().split("\n"): try: ticks.append(json.loads(line)) except Exception: continue if not ticks: return {"status": "empty tick log"} # Compress: extract key facts alerts = [] gitea_down_count = 0 ollama_down_count = 0 for t in ticks: for action in t.get("actions", []): alerts.append(f"[{t['tick_id']}] {action}") p = t.get("perception", {}) if not p.get("gitea_alive"): gitea_down_count += 1 health = p.get("model_health", {}) if isinstance(health, dict) and not health.get("ollama_running"): ollama_down_count += 1 # Last tick's perception = current state last = ticks[-1].get("perception", {}) briefing = { "date": yesterday, "total_ticks": len(ticks), "alerts": alerts[-10:], # last 10 alerts "gitea_downtime_ticks": gitea_down_count, "ollama_downtime_ticks": ollama_down_count, "last_known_state": last, } briefing_file = briefing_dir / f"briefing_{yesterday}.json" briefing_file.write_text(json.dumps(briefing, indent=2)) return briefing # ── NEW 6: Repo Watchdog ───────────────────────────────────────────── @huey.periodic_task(crontab(minute="*/20")) # every 20 minutes def repo_watchdog(): """Poll Gitea for new issues/PRs since last check. No webhooks needed.""" state_file = HERMES_HOME / "watchdog_state.json" state = {} if state_file.exists(): try: state = json.loads(state_file.read_text()) except Exception: pass g = GiteaClient() new_items = [] for repo in REPOS: repo_state = state.get(repo, {"last_issue": 0, "last_pr": 0}) # Check issues try: issues = g.list_issues(repo, state="open", sort="created", direction="desc", limit=5) for issue in issues: if issue.number > repo_state["last_issue"]: new_items.append({ "type": "issue", "repo": repo, "number": issue.number, "title": issue.title, "creator": issue.user.login if hasattr(issue, 'user') and issue.user else "unknown", }) if issues: repo_state["last_issue"] = max(i.number for i in issues) except Exception: pass # Check PRs try: prs = g.list_pulls(repo, state="open", sort="newest", limit=5) for pr in prs: if pr.number > repo_state.get("last_pr", 0): new_items.append({ "type": "pr", "repo": repo, "number": pr.number, "title": pr.title, }) if prs: repo_state["last_pr"] = max(p.number for p in prs) except Exception: pass state[repo] = repo_state state_file.write_text(json.dumps(state, indent=2)) return {"new_items": len(new_items), "items": new_items[:10]}