diff --git a/scripts/nightly-pipeline-scheduler.sh b/scripts/nightly-pipeline-scheduler.sh new file mode 100644 index 00000000..2d2fe8e3 --- /dev/null +++ b/scripts/nightly-pipeline-scheduler.sh @@ -0,0 +1,383 @@ +#!/usr/bin/env bash +# nightly-pipeline-scheduler.sh — Auto-start batch pipelines when inference is available. +# +# Checks provider health, pipeline progress, token budget, and interactive load. +# Starts the highest-priority incomplete pipeline that can run. +# +# Usage: +# ./scripts/nightly-pipeline-scheduler.sh # Normal run +# ./scripts/nightly-pipeline-scheduler.sh --dry-run # Show what would start +# ./scripts/nightly-pipeline-scheduler.sh --status # Pipeline status report + +set -euo pipefail + +# --- Configuration --- +HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}" +BUDGET_FILE="${HERMES_HOME}/pipeline_budget.json" +STATE_FILE="${HERMES_HOME}/pipeline_state.json" +LOG_FILE="${HERMES_HOME}/logs/pipeline-scheduler.log" +TOKEN_DAILY_LIMIT="${PIPELINE_TOKEN_LIMIT:-500000}" +PEAK_HOURS_START="${PIPELINE_PEAK_START:-9}" +PEAK_HOURS_END="${PIPELINE_PEAK_END:-18}" + +# Pipeline definitions (priority order) +# Each pipeline: name, script, max_tokens, dependencies +PIPELINES=( + "playground-factory|scripts/pipeline_playground_factory.sh|100000|none" + "training-factory|scripts/pipeline_training_factory.sh|150000|none" + "knowledge-mine|scripts/pipeline_knowledge_mine.sh|80000|training-factory" + "adversary|scripts/pipeline_adversary.sh|50000|knowledge-mine" + "codebase-genome|scripts/pipeline_codebase_genome.sh|120000|none" +) + +# --- Colors --- +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[0;33m' +CYAN='\033[0;36m' +NC='\033[0m' + +# --- Helpers --- +now_hour() { date +%-H; } +is_peak_hours() { + local h=$(now_hour) + [[ $h -ge $PEAK_HOURS_START && $h -lt $PEAK_HOURS_END ]] +} + +ensure_dirs() { + mkdir -p "$(dirname "$LOG_FILE")" "$(dirname "$BUDGET_FILE")" "$(dirname "$STATE_FILE")" +} + +log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"; } + +get_budget_used_today() { + if [[ -f "$BUDGET_FILE" ]]; then + local today=$(date +%Y-%m-%d) + python3 -c " +import json, sys +with open('$BUDGET_FILE') as f: + d = json.load(f) +print(d.get('daily', {}).get('$today', {}).get('tokens_used', 0)) +" 2>/dev/null || echo 0 + else + echo 0 + fi +} + +get_budget_remaining() { + local used=$(get_budget_used_today) + echo $((TOKEN_DAILY_LIMIT - used)) +} + +update_budget() { + local pipeline="$1" + local tokens="$2" + local today=$(date +%Y-%m-%d) + python3 -c " +import json, os +path = '$BUDGET_FILE' +d = {} +if os.path.exists(path): + with open(path) as f: + d = json.load(f) +daily = d.setdefault('daily', {}) +day = daily.setdefault('$today', {'tokens_used': 0, 'pipelines': {}}) +day['tokens_used'] = day.get('tokens_used', 0) + $tokens +day['pipelines']['$pipeline'] = day['pipelines'].get('$pipeline', 0) + $tokens +with open(path, 'w') as f: + json.dump(d, f, indent=2) +" +} + +get_pipeline_state() { + if [[ -f "$STATE_FILE" ]]; then + cat "$STATE_FILE" + else + echo "{}" + fi +} + +set_pipeline_state() { + local pipeline="$1" + local state="$2" # running, complete, failed, skipped + python3 -c " +import json, os +path = '$STATE_FILE' +d = {} +if os.path.exists(path): + with open(path) as f: + d = json.load(f) +d['$pipeline'] = {'state': '$state', 'updated': '$(date -Iseconds)'} +with open(path, 'w') as f: + json.dump(d, f, indent=2) +" +} + +is_pipeline_complete() { + local pipeline="$1" + python3 -c " +import json, os +path = '$STATE_FILE' +if not os.path.exists(path): + print('false') +else: + with open(path) as f: + d = json.load(f) + state = d.get('$pipeline', {}).get('state', 'not_started') + print('true' if state == 'complete' else 'false') +" 2>/dev/null || echo false +} + +is_pipeline_running() { + local pipeline="$1" + python3 -c " +import json, os +path = '$STATE_FILE' +if not os.path.exists(path): + print('false') +else: + with open(path) as f: + d = json.load(f) + state = d.get('$pipeline', {}).get('state', 'not_started') + print('true' if state == 'running' else 'false') +" 2>/dev/null || echo false +} + +check_dependency() { + local dep="$1" + if [[ "$dep" == "none" ]]; then + return 0 + fi + # For knowledge-mine: training-factory must be running or complete + if [[ "$dep" == "training-factory" ]]; then + local state=$(python3 -c " +import json, os +path = '$STATE_FILE' +if not os.path.exists(path): + print('not_started') +else: + with open(path) as f: + d = json.load(f) + print(d.get('training-factory', {}).get('state', 'not_started')) +" 2>/dev/null || echo "not_started") + [[ "$state" == "running" || "$state" == "complete" ]] + return $? + fi + # For adversary: knowledge-mine must be at least 50% done + # Simplified: check if it's running (we'd need progress tracking for 50%) + if [[ "$dep" == "knowledge-mine" ]]; then + local state=$(python3 -c " +import json, os +path = '$STATE_FILE' +if not os.path.exists(path): + print('not_started') +else: + with open(path) as f: + d = json.load(f) + print(d.get('knowledge-mine', {}).get('state', 'not_started')) +" 2>/dev/null || echo "not_started") + [[ "$state" == "running" || "$state" == "complete" ]] + return $? + fi + return 0 +} + +check_inference_available() { + # Check if any inference provider is responding + # 1. Check OpenRouter + local or_ok=$(curl -s -o /dev/null -w "%{http_code}" \ + --connect-timeout 5 "https://openrouter.ai/api/v1/models" 2>/dev/null || echo "000") + + # 2. Check local Ollama + local ollama_ok=$(curl -s -o /dev/null -w "%{http_code}" \ + --connect-timeout 5 "http://localhost:11434/api/tags" 2>/dev/null || echo "000") + + # 3. Check RunPod (if configured) + local runpod_ok="000" + if [[ -n "${RUNPOD_ENDPOINT:-}" ]]; then + runpod_ok=$(curl -s -o /dev/null -w "%{http_code}" \ + --connect-timeout 5 "$RUNPOD_ENDPOINT/health" 2>/dev/null || echo "000") + fi + + if [[ "$or_ok" == "200" || "$ollama_ok" == "200" || "$runpod_ok" == "200" ]]; then + return 0 + fi + return 1 +} + +check_interactive_load() { + # Check if there are active interactive sessions (don't fight with live users) + # Look for tmux panes with active hermes sessions + local active=$(tmux list-panes -a -F '#{pane_pid} #{pane_current_command}' 2>/dev/null \ + | grep -c "hermes\|python3" || echo 0) + + # If more than 3 interactive sessions, skip pipeline start + if [[ $active -gt 3 ]]; then + return 1 + fi + return 0 +} + +start_pipeline() { + local name="$1" + local script="$2" + local max_tokens="$3" + local budget_remaining="$4" + local mode="${5:-run}" + + if [[ "$budget_remaining" -lt "$max_tokens" ]]; then + log "SKIP $name: insufficient budget ($budget_remaining < $max_tokens tokens)" + return 1 + fi + + if [[ ! -f "$script" ]]; then + log "SKIP $name: script not found ($script)" + return 1 + fi + + if [[ "$mode" == "dry-run" ]]; then + log "DRY-RUN: Would start $name (budget: $budget_remaining, needs: $max_tokens)" + return 0 + fi + + log "START $name (budget: $budget_remaining, max_tokens: $max_tokens)" + set_pipeline_state "$name" "running" + + # Run in background, capture output + local log_path="${HERMES_HOME}/logs/pipeline-${name}.log" + bash "$script" --max-tokens "$max_tokens" >> "$log_path" 2>&1 & + local pid=$! + + # Wait a moment to check if it started OK + sleep 2 + if kill -0 $pid 2>/dev/null; then + log "RUNNING $name (PID: $pid, log: $log_path)" + # Record the PID + python3 -c " +import json, os +path = '$STATE_FILE' +d = {} +if os.path.exists(path): + with open(path) as f: + d = json.load(f) +d['$name']['pid'] = $pid +with open(path, 'w') as f: + json.dump(d, f, indent=2) +" + return 0 + else + log "FAIL $name: script exited immediately" + set_pipeline_state "$name" "failed" + return 1 + fi +} + +# --- Main --- +main() { + local mode="${1:-run}" + ensure_dirs + + log "=== Pipeline Scheduler ($mode) ===" + + # Check 1: Is inference available? + if ! check_inference_available; then + log "No inference provider available. Skipping all pipelines." + exit 0 + fi + log "Inference: AVAILABLE" + + # Check 2: Is it peak hours? + if is_peak_hours && [[ "$mode" != "--force" ]]; then + local h=$(now_hour) + log "Peak hours ($h:00). Skipping pipeline start. Use --force to override." + exit 0 + fi + log "Off-peak: OK" + + # Check 3: Interactive load + if ! check_interactive_load && [[ "$mode" != "--force" ]]; then + log "High interactive load. Skipping pipeline start." + exit 0 + fi + log "Interactive load: OK" + + # Check 4: Token budget + local budget=$(get_budget_remaining) + log "Token budget remaining: $budget / $TOKEN_DAILY_LIMIT" + + if [[ $budget -le 0 ]]; then + log "Daily token budget exhausted. Stopping." + exit 0 + fi + + # Check 5: Pipeline status + if [[ "$mode" == "--status" ]]; then + echo -e "${CYAN}Pipeline Status:${NC}" + echo "────────────────────────────────────────────────────" + for entry in "${PIPELINES[@]}"; do + IFS='|' read -r name script max_tokens dep <<< "$entry" + local state=$(python3 -c " +import json, os +path = '$STATE_FILE' +if not os.path.exists(path): + print('not_started') +else: + with open(path) as f: + d = json.load(f) + print(d.get('$name', {}).get('state', 'not_started')) +" 2>/dev/null || echo "not_started") + + local color=$NC + case "$state" in + running) color=$YELLOW ;; + complete) color=$GREEN ;; + failed) color=$RED ;; + esac + printf " %-25s %b%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$NC" "$max_tokens" "$dep" + done + echo "────────────────────────────────────────────────────" + echo " Budget: $budget / $TOKEN_DAILY_LIMIT tokens remaining" + echo " Peak hours: $PEAK_HOURS_START:00 - $PEAK_HOURS_END:00" + exit 0 + fi + + # Find and start the highest-priority incomplete pipeline + local started=0 + for entry in "${PIPELINES[@]}"; do + IFS='|' read -r name script max_tokens dep <<< "$entry" + + # Skip if already running or complete + if [[ "$(is_pipeline_running $name)" == "true" ]]; then + log "SKIP $name: already running" + continue + fi + if [[ "$(is_pipeline_complete $name)" == "true" ]]; then + log "SKIP $name: already complete" + continue + fi + + # Check dependency + if ! check_dependency "$dep"; then + log "SKIP $name: dependency $dep not met" + continue + fi + + # Try to start + if start_pipeline "$name" "$script" "$max_tokens" "$budget" "$mode"; then + started=1 + # Only start one pipeline per run (let it claim tokens before next check) + # Exception: playground-factory and training-factory can run in parallel + if [[ "$name" != "playground-factory" && "$name" != "training-factory" ]]; then + break + fi + fi + done + + if [[ $started -eq 0 ]]; then + log "No pipelines to start (all complete, running, or blocked)." + fi + + log "=== Pipeline Scheduler done ===" +} + +main "$@"