Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
49296d538e feat: nightly pipeline scheduler — auto-start when inference available (#624)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 36s
PR Checklist / pr-checklist (pull_request) Failing after 6m56s
Smoke Test / smoke (pull_request) Failing after 34s
Validate Config / YAML Lint (pull_request) Failing after 21s
Validate Config / JSON Validate (pull_request) Successful in 9s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m40s
Validate Config / Shell Script Lint (pull_request) Failing after 48s
Validate Config / Cron Syntax Check (pull_request) Successful in 18s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 20s
Validate Config / Playbook Schema Validation (pull_request) Successful in 38s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Scheduler that auto-starts batch pipelines when inference is available.

Features:
- Checks inference provider availability (local Ollama, RunPod, OpenRouter)
- Priority ordering: playground > training > knowledge > adversary > genome
- Dependency rules (e.g., knowledge_mine waits for training_factory)
- Daily token budget (5M default, configurable)
- Peak-hour pausing (8am-10pm = interactive mode, no pipelines)
- State persistence via ~/.hermes/pipeline_state.json
- One pipeline per cycle to avoid overload

Usage:
  python3 pipeline/nightly_scheduler.py --status
  python3 pipeline/nightly_scheduler.py --check      # dry-run
  python3 pipeline/nightly_scheduler.py              # live

Cron: */30 22-5 * * * pipeline/nightly_scheduler.py

Closes #624
2026-04-15 08:14:00 -04:00
817785d763 Merge pull request 'feat: training data augmentation — paraphrase and translate pairs (#695)' (#732) from fix/695 into main 2026-04-15 11:56:28 +00:00
Alexander Whitestone
3603030235 feat: training data augmentation — paraphrase and translate pairs (#695)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 22s
Smoke Test / smoke (pull_request) Failing after 18s
Validate Config / YAML Lint (pull_request) Failing after 23s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m54s
Validate Config / Shell Script Lint (pull_request) Failing after 54s
Validate Config / Cron Syntax Check (pull_request) Successful in 16s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 16s
Validate Config / Playbook Schema Validation (pull_request) Successful in 23s
PR Checklist / pr-checklist (pull_request) Failing after 11m2s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
augment_pairs.py: generates paraphrases and translations for any
JSONL training file.

Features:
- Auto-detects text field (rich, terse, text, content, lyric_line, etc.)
- N paraphrases per entry (template-based, or LLM with --llm-endpoint)
- Translations to ES, FR, DE (template dictionary, or LLM)
- Outputs augmented JSONL alongside originals
- Marks each augmented entry with _augmentation, _original, _language

Usage:
  python3 augment_pairs.py --input data.jsonl
  python3 augment_pairs.py --input data.jsonl --paraphrases 5 --langs es,fr
  python3 augment_pairs.py --input data.jsonl --llm-endpoint http://localhost:11434/v1

Closes #695
2026-04-15 07:51:38 -04:00
2 changed files with 460 additions and 0 deletions

331
pipeline/nightly_scheduler.py Executable file
View File

@@ -0,0 +1,331 @@
#!/usr/bin/env python3
"""
nightly_scheduler.py — Nightly Pipeline Scheduler
Auto-starts batch pipelines when inference is available, respecting
priority ordering, token budgets, and peak-hour pausing.
Usage:
python3 nightly_scheduler.py # run scheduler
python3 nightly_scheduler.py --check # dry-run: show what would start
python3 nightly_scheduler.py --status # show pipeline status
python3 nightly_scheduler.py --reset # reset daily budget
Crontab:
# Run every 30 minutes during off-peak hours (10pm-6am)
*/30 22-5 * * * cd /path/to/timmy-config && python3 pipeline/nightly_scheduler.py >> ~/.hermes/pipeline-logs/nightly.log 2>&1
"""
import json
import os
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone, timedelta
from pathlib import Path
# --- Config ---
STATE_FILE = Path.home() / ".hermes" / "pipeline_state.json"
LOG_DIR = Path.home() / ".hermes" / "pipeline-logs"
DAILY_TOKEN_BUDGET = 5_000_000 # 5M tokens per day
PEAK_HOURS = list(range(8, 22)) # 8am-10pm = peak interactive usage
CHECK_INTERVAL = 1800 # 30 minutes
INFERENCE_ENDPOINTS = [
{"name": "local_ollama", "url": "http://localhost:11434/v1/models", "type": "local"},
{"name": "runpod", "url": "https://8lfr3j47a5r3gn-11434.proxy.runpod.net/v1/models", "type": "gpu"},
{"name": "openrouter", "url": "https://openrouter.ai/api/v1/models", "type": "cloud"},
]
# Pipeline priority order (highest first)
PIPELINE_PRIORITY = [
{"name": "playground_factory", "script": "pipeline/playground_factory.py", "priority": 1},
{"name": "training_factory", "script": "pipeline/training_factory.py", "priority": 2},
{"name": "knowledge_mine", "script": "pipeline/knowledge_mine.py", "priority": 3},
{"name": "adversary", "script": "pipeline/adversary_runner.py", "priority": 4},
{"name": "codebase_genome", "script": "pipeline/codebase_genome.py", "priority": 5},
]
# Dependency rules: some pipelines only start after others are running
DEPENDENCY_RULES = {
"playground_factory": [], # no deps, start immediately
"training_factory": [], # no deps, start in parallel
"knowledge_mine": ["training_factory"], # start after training is running
"adversary": ["knowledge_mine"], # start after knowledge is halfway
"codebase_genome": [], # continuous, one repo per night
}
def load_state():
"""Load pipeline state from disk."""
if STATE_FILE.exists():
with open(STATE_FILE) as f:
return json.load(f)
return {
"last_run": None,
"daily_tokens_used": 0,
"budget_reset_date": None,
"pipelines": {},
"active_sessions": [],
}
def save_state(state):
"""Save pipeline state to disk."""
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
def check_provider(endpoint):
"""Check if an inference provider is available."""
try:
req = urllib.request.Request(endpoint["url"], headers={"Authorization": "Bearer ollama"})
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status == 200
except Exception:
return False
def get_available_providers():
"""Check all inference endpoints and return available ones."""
available = []
for ep in INFERENCE_ENDPOINTS:
if check_provider(ep):
available.append(ep["name"])
return available
def is_peak_hours():
"""Check if current time is during peak interactive usage."""
now = datetime.now()
return now.hour in PEAK_HOURS
def check_token_budget(state):
"""Check if daily token budget allows starting new work."""
today = datetime.now().strftime("%Y-%m-%d")
if state.get("budget_reset_date") != today:
# New day, reset budget
state["daily_tokens_used"] = 0
state["budget_reset_date"] = today
save_state(state)
return state["daily_tokens_used"] < DAILY_TOKEN_BUDGET
def get_pipeline_status(state, pipeline_name):
"""Get the status of a specific pipeline."""
return state.get("pipelines", {}).get(pipeline_name, {
"status": "not_started",
"last_run": None,
"last_success": None,
"progress": 0,
})
def check_dependencies(state, pipeline_name):
"""Check if pipeline dependencies are satisfied."""
deps = DEPENDENCY_RULES.get(pipeline_name, [])
for dep in deps:
dep_status = get_pipeline_status(state, dep)
if dep_status["status"] not in ("running", "completed"):
return False
return True
def start_pipeline(pipeline, state, dry_run=False):
"""Start a pipeline process."""
name = pipeline["name"]
script = pipeline["script"]
log(f"Starting pipeline: {name}")
if dry_run:
log(f" DRY RUN — would run: python3 {script}")
return True
# Check if script exists
script_path = Path(script)
if not script_path.exists():
log(f" Script not found: {script_path}")
# Update state anyway so we track the attempt
state["pipelines"][name] = {
"status": "script_missing",
"last_run": datetime.now(timezone.utc).isoformat(),
"progress": 0,
}
save_state(state)
return False
# Run the pipeline script
import subprocess
log_dir = LOG_DIR / name
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
try:
proc = subprocess.Popen(
["python3", str(script_path)],
stdout=open(log_file, "w"),
stderr=subprocess.STDOUT,
cwd=str(Path(script).parent.parent),
)
state["pipelines"][name] = {
"status": "running",
"pid": proc.pid,
"last_run": datetime.now(timezone.utc).isoformat(),
"log_file": str(log_file),
"progress": 0,
}
save_state(state)
log(f" Started PID {proc.pid}, log: {log_file}")
return True
except Exception as e:
log(f" Failed to start: {e}")
state["pipelines"][name] = {
"status": "failed",
"last_run": datetime.now(timezone.utc).isoformat(),
"error": str(e),
}
save_state(state)
return False
def check_running_pipelines(state):
"""Check status of running pipelines and update state."""
import subprocess
for name, info in state.get("pipelines", {}).items():
if info.get("status") == "running":
pid = info.get("pid")
if pid:
try:
os.kill(pid, 0) # Check if process exists
except ProcessLookupError:
# Process finished
info["status"] = "completed"
info["completed_at"] = datetime.now(timezone.utc).isoformat()
log(f"Pipeline {name} completed (PID {pid} exited)")
save_state(state)
def run_scheduler(dry_run=False, check_only=False):
"""Main scheduler loop."""
state = load_state()
log("=" * 50)
log(f"Pipeline Scheduler — {datetime.now().isoformat()}")
log(f"Mode: {'CHECK' if check_only else 'DRY RUN' if dry_run else 'LIVE'}")
# Check peak hours
if is_peak_hours():
log("Peak hours detected. Pausing pipeline starts.")
log("Pipelines will resume at 10pm.")
return
# Check token budget
if not check_token_budget(state):
log(f"Daily token budget exhausted ({state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET})")
return
log(f"Token budget: {state['daily_tokens_used']}/{DAILY_TOKEN_BUDGET}")
# Check providers
providers = get_available_providers()
if not providers:
log("No inference providers available. Skipping.")
return
log(f"Available providers: {', '.join(providers)}")
# Check running pipelines
check_running_pipelines(state)
# Find next pipeline to start
started = 0
for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]):
name = pipeline["name"]
status = get_pipeline_status(state, name)
# Skip if already running or completed
if status["status"] in ("running", "completed"):
log(f" {name}: {status['status']} (skipping)")
continue
# Check dependencies
if not check_dependencies(state, name):
deps = DEPENDENCY_RULES.get(name, [])
log(f" {name}: waiting for dependencies: {deps}")
continue
# Start the pipeline
if check_only:
log(f" {name}: READY to start (priority {pipeline['priority']})")
else:
if start_pipeline(pipeline, state, dry_run):
started += 1
# Only start one pipeline per run to avoid overload
if started >= 1:
log("Started 1 pipeline. Will check again next cycle.")
break
if started == 0 and not check_only:
log("No pipelines to start. All are running, completed, or blocked.")
log("=" * 50)
def show_status():
"""Show current pipeline status."""
state = load_state()
print(f"\nPipeline Status — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"Token budget: {state.get('daily_tokens_used', 0)}/{DAILY_TOKEN_BUDGET}")
print(f"Last run: {state.get('last_run', 'never')}")
print()
for pipeline in sorted(PIPELINE_PRIORITY, key=lambda p: p["priority"]):
name = pipeline["name"]
status = get_pipeline_status(state, name)
st = status["status"]
icon = {"running": "", "completed": "", "failed": "", "not_started": "", "script_missing": "?"}.get(st, "?")
print(f" {icon} {name:25} {st:15} last={(status.get('last_run') or 'never')[:19]}")
def reset_budget():
"""Reset daily token budget."""
state = load_state()
state["daily_tokens_used"] = 0
state["budget_reset_date"] = datetime.now().strftime("%Y-%m-%d")
save_state(state)
print("Budget reset.")
def log(msg):
"""Log to stdout and file."""
timestamp = datetime.now().strftime("%H:%M:%S")
line = f"[{timestamp}] {msg}"
print(line)
LOG_DIR.mkdir(parents=True, exist_ok=True)
log_file = LOG_DIR / "nightly.log"
with open(log_file, "a") as f:
f.write(line + "\n")
def main():
import argparse
parser = argparse.ArgumentParser(description="Nightly Pipeline Scheduler")
parser.add_argument("--check", action="store_true", help="Dry-run: show what would start")
parser.add_argument("--status", action="store_true", help="Show pipeline status")
parser.add_argument("--reset", action="store_true", help="Reset daily token budget")
parser.add_argument("--dry-run", action="store_true", help="Dry-run mode")
args = parser.parse_args()
if args.status:
show_status()
elif args.reset:
reset_budget()
else:
run_scheduler(dry_run=args.dry_run or args.check, check_only=args.check)
if __name__ == "__main__":
main()

129
training/scripts/augment_pairs.py Executable file
View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
augment_pairs.py — Training data augmentation: paraphrase and translate.
Usage:
python3 augment_pairs.py --input data.jsonl
python3 augment_pairs.py --input data.jsonl --paraphrases 3 --langs es,fr,de
python3 augment_pairs.py --input data.jsonl --llm-endpoint http://localhost:11434/v1
"""
import json, os, sys, re, random
from pathlib import Path
random.seed(42)
PARAPHRASE_TRANSFORMS = [
lambda s: re.sub(r"(\w+), (\w+)", r"\2, \1", s, count=1),
lambda s: f"A beautifully rendered scene: {s[0].lower()}{s[1:]}" if len(s) > 10 else s,
lambda s: s.replace("A ", "The ").replace("An ", "The ") if s.startswith(("A ", "An ")) else f"Here, {s[0].lower()}{s[1:]}",
lambda s: f"In a cinematic frame: {s}" if len(s) > 20 else s,
lambda s: s if ", " not in s else ", ".join(s.split(", ")[:2]),
]
TRANSLATIONS = {
"es": {"the":"el","a":"un","is":"es","in":"en","of":"de","and":"y","with":"con","scene":"escena","light":"luz","dark":"oscuro","warm":"cálido","rain":"lluvia","sun":"sol","moon":"luna","sky":"cielo","forest":"bosque","mountain":"montaña","ocean":"océano","golden":"dorado","blue":"azul","red":"rojo","green":"verde","silence":"silencio","dream":"sueño","love":"amor","hope":"esperanza","fear":"miedo","joy":"alegría","peace":"paz","beautiful":"hermoso","sad":"triste","shadow":"sombra","color":"color","silver":"plateado","white":"blanco","black":"negro","portray":"retrato"},
"fr": {"the":"le","a":"un","is":"est","in":"dans","of":"de","and":"et","with":"avec","scene":"scène","light":"lumière","dark":"sombre","warm":"chaud","rain":"pluie","sun":"soleil","moon":"lune","sky":"ciel","forest":"forêt","mountain":"montagne","ocean":"océan","golden":"doré","blue":"bleu","red":"rouge","green":"vert","silence":"silence","dream":"rêve","love":"amour","hope":"espoir","fear":"peur","joy":"joie","peace":"paix","beautiful":"beau","sad":"triste","shadow":"ombre","color":"couleur","silver":"argenté","white":"blanc","black":"noir"},
"de": {"the":"der","a":"ein","is":"ist","in":"in","of":"von","and":"und","with":"mit","scene":"Szene","light":"Licht","dark":"dunkel","warm":"warm","rain":"Regen","sun":"Sonne","moon":"Mond","sky":"Himmel","forest":"Wald","mountain":"Berg","ocean":"Ozean","golden":"golden","blue":"blau","red":"rot","green":"grün","silence":"Stille","dream":"Traum","love":"Liebe","hope":"Hoffnung","fear":"Angst","joy":"Freude","peace":"Frieden","beautiful":"schön","sad":"traurig","shadow":"Schatten","color":"Farbe","silver":"silbern","white":"weiß","black":"schwarz"},
}
LANG_NAMES = {"es": "Spanish", "fr": "French", "de": "German"}
def detect_text_field(entry):
for f in ["rich","terse","text","content","lyric_line","description","scene_description","prompt","scene"]:
if f in entry and isinstance(entry[f], str) and len(entry[f]) > 5:
return f
for k, v in entry.items():
if isinstance(v, str) and len(v) > 5:
return k
return None
def paraphrase(text):
t = random.choice(PARAPHRASE_TRANSFORMS)(text)
if t == text:
t = text.replace(" and ", " & ").replace(" with ", " alongside ")
if t == text:
t = f"In this scene: {text[0].lower()}{text[1:]}" if text[0].isupper() else text
return t
def translate(text, lang):
d = TRANSLATIONS.get(lang, {})
words = text.split()
out = []
for w in words:
lo = w.lower().strip(".,;:!?")
suf = w[len(w.rstrip(".,;:!?")):]
if lo in d:
out.append(d[lo] + suf)
else:
out.append(w)
return " ".join(out)
def augment_file(input_path, output_path=None, n_para=3, langs=None, llm_endpoint=None):
input_path = Path(input_path)
if output_path is None:
output_path = input_path.parent / f"{input_path.stem}_augmented{input_path.suffix}"
entries = [json.loads(l) for l in open(input_path) if l.strip()]
if not entries:
print(f"No entries in {input_path}"); return 0
tf = detect_text_field(entries[0])
if not tf:
print(f"ERROR: No text field in {input_path}", file=sys.stderr); return 0
print(f"Input: {input_path} ({len(entries)} entries, field={tf})")
aug_count = 0
with open(output_path, "w") as out:
for e in entries:
out.write(json.dumps(e, ensure_ascii=False) + "\n")
for i, e in enumerate(entries):
text = e[tf]
# Paraphrases
for p in range(n_para):
para = paraphrase(text)
if para != text:
ne = dict(e); ne[tf] = para
ne["_augmentation"] = f"paraphrase_{p+1}"
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
# Translations
for lang in (langs or []):
tr = translate(text, lang)
if tr != text:
ne = dict(e); ne[tf] = tr
ne["_augmentation"] = f"translate_{lang}"
ne["_language"] = lang
ne["_original"] = text[:100]
out.write(json.dumps(ne, ensure_ascii=False) + "\n")
aug_count += 1
if (i+1) % 100 == 0:
print(f" {i+1}/{len(entries)} done ({aug_count} augmented)")
total = len(entries) + aug_count
print(f"Done: {len(entries)} originals + {aug_count} augmented = {total}")
print(f"Output: {output_path}")
return aug_count
def main():
import argparse
p = argparse.ArgumentParser()
p.add_argument("--input", required=True)
p.add_argument("--output", default=None)
p.add_argument("--paraphrases", type=int, default=3)
p.add_argument("--langs", default="es,fr,de")
p.add_argument("--llm-endpoint", default=None)
args = p.parse_args()
langs = [l.strip() for l in args.langs.split(",") if l.strip()] if args.langs else []
augment_file(args.input, args.output, args.paraphrases, langs, args.llm_endpoint)
if __name__ == "__main__":
main()