diff --git a/pipeline/nightly_scheduler.py b/pipeline/nightly_scheduler.py new file mode 100755 index 00000000..b4bb58d0 --- /dev/null +++ b/pipeline/nightly_scheduler.py @@ -0,0 +1,331 @@ +#!/usr/bin/env python3 +""" +nightly_scheduler.py — Nightly Pipeline Scheduler + +Auto-starts batch pipelines when inference is available, respecting +priority ordering, token budgets, and peak-hour pausing. + +Usage: + python3 nightly_scheduler.py # run scheduler + python3 nightly_scheduler.py --check # dry-run: show what would start + python3 nightly_scheduler.py --status # show pipeline status + python3 nightly_scheduler.py --reset # reset daily budget + +Crontab: + # Run every 30 minutes during off-peak hours (10pm-6am) + */30 22-5 * * * cd /path/to/timmy-config && python3 pipeline/nightly_scheduler.py >> ~/.hermes/pipeline-logs/nightly.log 2>&1 +""" + +import json +import os +import sys +import time +import urllib.request +import urllib.error +from datetime import datetime, timezone, timedelta +from pathlib import Path + +# --- Config --- +STATE_FILE = Path.home() / ".hermes" / "pipeline_state.json" +LOG_DIR = Path.home() / ".hermes" / "pipeline-logs" +DAILY_TOKEN_BUDGET = 5_000_000 # 5M tokens per day +PEAK_HOURS = list(range(8, 22)) # 8am-10pm = peak interactive usage +CHECK_INTERVAL = 1800 # 30 minutes + +INFERENCE_ENDPOINTS = [ + {"name": "local_ollama", "url": "http://localhost:11434/v1/models", "type": "local"}, + {"name": "runpod", "url": "https://8lfr3j47a5r3gn-11434.proxy.runpod.net/v1/models", "type": "gpu"}, + {"name": "openrouter", "url": "https://openrouter.ai/api/v1/models", "type": "cloud"}, +] + +# Pipeline priority order (highest first) +PIPELINE_PRIORITY = [ + {"name": "playground_factory", "script": "pipeline/playground_factory.py", "priority": 1}, + {"name": "training_factory", "script": "pipeline/training_factory.py", "priority": 2}, + {"name": "knowledge_mine", "script": "pipeline/knowledge_mine.py", "priority": 3}, + {"name": "adversary", "script": "pipeline/adversary_runner.py", "priority": 4}, + {"name": "codebase_genome", "script": "pipeline/codebase_genome.py", "priority": 5}, +] + +# Dependency rules: some pipelines only start after others are running +DEPENDENCY_RULES = { + "playground_factory": [], # no deps, start immediately + "training_factory": [], # no deps, start in parallel + "knowledge_mine": ["training_factory"], # start after training is running + "adversary": ["knowledge_mine"], # start after knowledge is halfway + "codebase_genome": [], # continuous, one repo per night +} + + +def load_state(): + """Load pipeline state from disk.""" + if STATE_FILE.exists(): + with open(STATE_FILE) as f: + return json.load(f) + return { + "last_run": None, + "daily_tokens_used": 0, + "budget_reset_date": None, + "pipelines": {}, + "active_sessions": [], + } + + +def save_state(state): + """Save pipeline state to disk.""" + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + with open(STATE_FILE, "w") as f: + json.dump(state, f, indent=2) + + +def check_provider(endpoint): + """Check if an inference provider is available.""" + try: + req = urllib.request.Request(endpoint["url"], headers={"Authorization": "Bearer ollama"}) + with urllib.request.urlopen(req, timeout=10) as resp: + return resp.status == 200 + except Exception: + return False + + +def get_available_providers(): + """Check all inference endpoints and return available ones.""" + available = [] + for ep in INFERENCE_ENDPOINTS: + if check_provider(ep): + available.append(ep["name"]) + return available + + +def is_peak_hours(): + """Check if current time is during peak interactive usage.""" + now = datetime.now() + return now.hour in PEAK_HOURS + + +def check_token_budget(state): + """Check if daily token budget allows starting new work.""" + today = datetime.now().strftime("%Y-%m-%d") + if state.get("budget_reset_date") != today: + # New day, reset budget + state["daily_tokens_used"] = 0 + state["budget_reset_date"] = today + save_state(state) + return state["daily_tokens_used"] < DAILY_TOKEN_BUDGET + + +def get_pipeline_status(state, pipeline_name): + """Get the status of a specific pipeline.""" + return state.get("pipelines", {}).get(pipeline_name, { + "status": "not_started", + "last_run": None, + "last_success": None, + "progress": 0, + }) + + +def check_dependencies(state, pipeline_name): + """Check if pipeline dependencies are satisfied.""" + deps = DEPENDENCY_RULES.get(pipeline_name, []) + for dep in deps: + dep_status = get_pipeline_status(state, dep) + if dep_status["status"] not in ("running", "completed"): + return False + return True + + +def start_pipeline(pipeline, state, dry_run=False): + """Start a pipeline process.""" + name = pipeline["name"] + script = pipeline["script"] + + log(f"Starting pipeline: {name}") + + if dry_run: + log(f" DRY RUN — would run: python3 {script}") + return True + + # Check if script exists + script_path = Path(script) + if not script_path.exists(): + log(f" Script not found: {script_path}") + # Update state anyway so we track the attempt + state["pipelines"][name] = { + "status": "script_missing", + "last_run": datetime.now(timezone.utc).isoformat(), + "progress": 0, + } + save_state(state) + return False + + # Run the pipeline script + import subprocess + log_dir = LOG_DIR / name + log_dir.mkdir(parents=True, exist_ok=True) + log_file = log_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" + + try: + proc = subprocess.Popen( + ["python3", str(script_path)], + stdout=open(log_file, "w"), + stderr=subprocess.STDOUT, + cwd=str(Path(script).parent.parent), + ) + + state["pipelines"][name] = { + "status": "running", + "pid": proc.pid, + "last_run": datetime.now(timezone.utc).isoformat(), + "log_file": str(log_file), + "progress": 0, + } + save_state(state) + log(f" Started PID {proc.pid}, log: {log_file}") + return True + except Exception as e: + log(f" Failed to start: {e}") + state["pipelines"][name] = { + "status": "failed", + "last_run": datetime.now(timezone.utc).isoformat(), + "error": str(e), + } + save_state(state) + return False + + +def check_running_pipelines(state): + """Check status of running pipelines and update state.""" + import subprocess + for name, info in state.get("pipelines", {}).items(): + if info.get("status") == "running": + pid = info.get("pid") + if pid: + try: + os.kill(pid, 0) # Check if process exists + except ProcessLookupError: + # Process finished + info["status"] = "completed" + info["completed_at"] = datetime.now(timezone.utc).isoformat() + log(f"Pipeline {name} completed (PID {pid} exited)") + save_state(state) + + +def run_scheduler(dry_run=False, check_only=False): + """Main scheduler loop.""" + state = load_state() + + log("=" * 50) + log(f"Pipeline Scheduler — {datetime.now().isoformat()}") + log(f"Mode: {'CHECK' if check_only else 'DRY RUN' if dry_run else 'LIVE'}") + + # Check peak hours + if is_peak_hours(): + log("Peak hours detected. Pausing pipeline starts.") + log("Pipelines will resume at 10pm.") + return + + # Check token budget + if not check_token_budget(state): + log(f"Daily token budget exhausted ({state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET})") + return + log(f"Token budget: {state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET}") + + # Check providers + providers = get_available_providers() + if not providers: + log("No inference providers available. Skipping.") + return + log(f"Available providers: {', '.join(providers)}") + + # Check running pipelines + check_running_pipelines(state) + + # Find next pipeline to start + started = 0 + for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]): + name = pipeline["name"] + status = get_pipeline_status(state, name) + + # Skip if already running or completed + if status["status"] in ("running", "completed"): + log(f" {name}: {status['status']} (skipping)") + continue + + # Check dependencies + if not check_dependencies(state, name): + deps = DEPENDENCY_RULES.get(name, []) + log(f" {name}: waiting for dependencies: {deps}") + continue + + # Start the pipeline + if check_only: + log(f" {name}: READY to start (priority {pipeline['priority']})") + else: + if start_pipeline(pipeline, state, dry_run): + started += 1 + # Only start one pipeline per run to avoid overload + if started >= 1: + log("Started 1 pipeline. Will check again next cycle.") + break + + if started == 0 and not check_only: + log("No pipelines to start. All are running, completed, or blocked.") + + log("=" * 50) + + +def show_status(): + """Show current pipeline status.""" + state = load_state() + print(f"\nPipeline Status — {datetime.now().strftime('%Y-%m-%d %H:%M')}") + print(f"Token budget: {state.get('daily_tokens_used', 0)}/{DAILY_TOKEN_BUDGET}") + print(f"Last run: {state.get('last_run', 'never')}") + print() + + for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]): + name = pipeline["name"] + status = get_pipeline_status(state, name) + st = status["status"] + icon = {"running": "●", "completed": "✓", "failed": "✗", "not_started": "○", "script_missing": "?"}.get(st, "?") + print(f" {icon} {name:25} {st:15} last={(status.get('last_run') or 'never')[:19]}") + + +def reset_budget(): + """Reset daily token budget.""" + state = load_state() + state["daily_tokens_used"] = 0 + state["budget_reset_date"] = datetime.now().strftime("%Y-%m-%d") + save_state(state) + print("Budget reset.") + + +def log(msg): + """Log to stdout and file.""" + timestamp = datetime.now().strftime("%H:%M:%S") + line = f"[{timestamp}] {msg}" + print(line) + LOG_DIR.mkdir(parents=True, exist_ok=True) + log_file = LOG_DIR / "nightly.log" + with open(log_file, "a") as f: + f.write(line + "\n") + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Nightly Pipeline Scheduler") + parser.add_argument("--check", action="store_true", help="Dry-run: show what would start") + parser.add_argument("--status", action="store_true", help="Show pipeline status") + parser.add_argument("--reset", action="store_true", help="Reset daily token budget") + parser.add_argument("--dry-run", action="store_true", help="Dry-run mode") + args = parser.parse_args() + + if args.status: + show_status() + elif args.reset: + reset_budget() + else: + run_scheduler(dry_run=args.dry_run or args.check, check_only=args.check) + + +if __name__ == "__main__": + main()