From 852dd8f210ee4490b76b05b9e68e320ba3eb5995 Mon Sep 17 00:00:00 2001 From: Merge Bot Date: Thu, 16 Apr 2026 05:11:26 +0000 Subject: [PATCH] Merge PR #626: pipelines/README.md --- pipelines/README.md | 94 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 pipelines/README.md diff --git a/pipelines/README.md b/pipelines/README.md new file mode 100644 index 00000000..01daf5fd --- /dev/null +++ b/pipelines/README.md @@ -0,0 +1,94 @@ +# Pipeline Infrastructure + +Shared orchestrator for all batch pipelines. + +## Components + +### orchestrator.py +Shared orchestrator providing: +- **Job Queue**: SQLite-backed with priority support +- **Worker Pool**: Configurable parallelism (default 10) +- **Token Budget**: Per-job tracking and limits +- **Checkpointing**: Resume from any point after restart +- **Rate Limiting**: Provider-aware request throttling +- **Retry Logic**: Exponential backoff with configurable retries +- **Reporting**: Generate summary reports + +## Usage + +### Python API +```python +from pipelines.orchestrator import PipelineOrchestrator, JobPriority + +# Create orchestrator +orchestrator = PipelineOrchestrator(max_workers=10) + +# Register pipeline handler +def my_handler(job): + # Process job.task + return {"result": "done"} + +orchestrator.register_handler("my_pipeline", my_handler) + +# Submit jobs +job_id = orchestrator.submit_job( + pipeline="my_pipeline", + task={"action": "process", "data": "..."}, + priority=JobPriority.HIGH, + token_budget=100000 +) + +# Run orchestrator +orchestrator.run() +``` + +### CLI +```bash +# Submit a job +python -m pipelines.orchestrator submit my_pipeline --task '{"action": "process"}' + +# Run orchestrator +python -m pipelines.orchestrator run --workers 10 --max-jobs 100 + +# Check job status +python -m pipelines.orchestrator status + +# Resume paused job +python -m pipelines.orchestrator resume + +# Show stats +python -m pipelines.orchestrator stats + +# Generate report +python -m pipelines.orchestrator report +``` + +## Database + +Jobs are stored in `~/.hermes/pipelines/orchestrator.db`: +- `jobs` - Job queue and state +- `checkpoints` - Resume points +- `reports` - Generated reports + +## Configuration + +### Rate Limits +```python +orchestrator.configure_rate_limit("Nous", rpm=60, tpm=1000000) +orchestrator.configure_rate_limit("Anthropic", rpm=50, tpm=800000) +``` + +### Token Budgets +Default: 1M tokens per job. Override per-job: +```python +orchestrator.submit_job("pipeline", task, token_budget=500000) +``` + +## Pipelines + +All pipelines share this orchestrator: +1. **batch-runner** - Run prompts across datasets +2. **data-gen** - Generate training data +3. **eval-runner** - Run evaluations +4. **trajectory-compress** - Compress trajectories +5. **web-research** - Research tasks