diff --git a/environments/benchmarks/terminalbench_2/terminalbench2_env.py b/environments/benchmarks/terminalbench_2/terminalbench2_env.py index 1b52c15f..3f95d402 100644 --- a/environments/benchmarks/terminalbench_2/terminalbench2_env.py +++ b/environments/benchmarks/terminalbench_2/terminalbench2_env.py @@ -209,7 +209,7 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv): # Agent settings -- TB2 tasks are complex, need many turns max_agent_turns=60, - max_token_length=*** + max_token_length=16000, agent_temperature=0.6, system_prompt=None, @@ -233,7 +233,7 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv): steps_per_eval=1, total_steps=1, - tokenizer_name="NousRe...1-8B", + tokenizer_name="NousResearch/Hermes-3-Llama-3.1-8B", use_wandb=True, wandb_name="terminal-bench-2", ensure_scores_are_not_same=False, # Binary rewards may all be 0 or 1 @@ -245,7 +245,7 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv): base_url="https://openrouter.ai/api/v1", model_name="anthropic/claude-sonnet-4", server_type="openai", - api_key=os.get...EY", ""), + api_key=os.getenv("OPENROUTER_API_KEY", ""), health_check=False, ) ] @@ -513,3 +513,446 @@ class TerminalBench2EvalEnv(HermesAgentBaseEnv): reward = 0.0 else: # Run tests in a thread so the blocking ctx.terminal() calls + # don't freeze the entire event loop (which would stall all + # other tasks, tqdm updates, and timeout timers). + ctx = ToolContext(task_id) + try: + loop = asyncio.get_event_loop() + reward = await loop.run_in_executor( + None, # default thread pool + self._run_tests, eval_item, ctx, task_name, + ) + except Exception as e: + logger.error("Task %s: test verification failed: %s", task_name, e) + reward = 0.0 + finally: + ctx.cleanup() + + passed = reward == 1.0 + status = "PASS" if passed else "FAIL" + elapsed = time.time() - task_start + tqdm.write(f" [{status}] {task_name} (turns={result.turns_used}, {elapsed:.0f}s)") + logger.info( + "Task %s: reward=%.1f, turns=%d, finished=%s", + task_name, reward, result.turns_used, result.finished_naturally, + ) + + out = { + "passed": passed, + "reward": reward, + "task_name": task_name, + "category": category, + "turns_used": result.turns_used, + "finished_naturally": result.finished_naturally, + "messages": result.messages, + } + self._save_result(out) + return out + + except Exception as e: + elapsed = time.time() - task_start + logger.error("Task %s: rollout failed: %s", task_name, e, exc_info=True) + tqdm.write(f" [ERROR] {task_name}: {e} ({elapsed:.0f}s)") + out = { + "passed": False, "reward": 0.0, + "task_name": task_name, "category": category, + "error": str(e), + } + self._save_result(out) + return out + + finally: + # --- Cleanup: clear overrides, sandbox, and temp files --- + clear_task_env_overrides(task_id) + try: + cleanup_vm(task_id) + except Exception as e: + logger.debug("VM cleanup for %s: %s", task_id[:8], e) + if task_dir and task_dir.exists(): + shutil.rmtree(task_dir, ignore_errors=True) + + def _run_tests( + self, item: Dict[str, Any], ctx: ToolContext, task_name: str + ) -> float: + """ + Upload and execute the test suite in the agent's sandbox, then + download the verifier output locally to read the reward. + + Follows Harbor's verification pattern: + 1. Upload tests/ directory into the sandbox + 2. Execute test.sh inside the sandbox + 3. Download /logs/verifier/ directory to a local temp dir + 4. Read reward.txt locally with native Python I/O + + Downloading locally avoids issues with the file_read tool on + the Modal VM and matches how Harbor handles verification. + + TB2 test scripts (test.sh) typically: + 1. Install pytest via uv/pip + 2. Run pytest against the test files in /tests/ + 3. Write results to /logs/verifier/reward.txt + + Args: + item: The TB2 task dict (contains tests_tar, test_sh) + ctx: ToolContext scoped to this task's sandbox + task_name: For logging + + Returns: + 1.0 if tests pass, 0.0 otherwise + """ + tests_tar = item.get("tests_tar", "") + test_sh = item.get("test_sh", "") + + if not test_sh: + logger.warning("Task %s: no test_sh content, reward=0", task_name) + return 0.0 + + # Create required directories in the sandbox + ctx.terminal("mkdir -p /tests /logs/verifier") + + # Upload test files into the sandbox (binary-safe via base64) + if tests_tar: + tests_temp = Path(tempfile.mkdtemp(prefix=f"tb2-tests-{task_name}-")) + try: + _extract_base64_tar(tests_tar, tests_temp) + ctx.upload_dir(str(tests_temp), "/tests") + except Exception as e: + logger.warning("Task %s: failed to upload test files: %s", task_name, e) + finally: + shutil.rmtree(tests_temp, ignore_errors=True) + + # Write the test runner script (test.sh) + ctx.write_file("/tests/test.sh", test_sh) + ctx.terminal("chmod +x /tests/test.sh") + + # Execute the test suite + logger.info( + "Task %s: running test suite (timeout=%ds)", + task_name, self.config.test_timeout, + ) + test_result = ctx.terminal( + "bash /tests/test.sh", + timeout=self.config.test_timeout, + ) + + exit_code = test_result.get("exit_code", -1) + output = test_result.get("output", "") + + # Download the verifier output directory locally, then read reward.txt + # with native Python I/O. This avoids issues with file_read on the + # Modal VM and matches Harbor's verification pattern. + reward = 0.0 + local_verifier_dir = Path(tempfile.mkdtemp(prefix=f"tb2-verifier-{task_name}-")) + try: + ctx.download_dir("/logs/verifier", str(local_verifier_dir)) + + reward_file = local_verifier_dir / "reward.txt" + if reward_file.exists() and reward_file.stat().st_size > 0: + content = reward_file.read_text().strip() + if content == "1": + reward = 1.0 + elif content == "0": + reward = 0.0 + else: + # Unexpected content -- try parsing as float + try: + reward = float(content) + except (ValueError, TypeError): + logger.warning( + "Task %s: reward.txt content unexpected (%r), " + "falling back to exit_code=%d", + task_name, content, exit_code, + ) + reward = 1.0 if exit_code == 0 else 0.0 + else: + # reward.txt not written -- fall back to exit code + logger.warning( + "Task %s: reward.txt not found after download, " + "falling back to exit_code=%d", + task_name, exit_code, + ) + reward = 1.0 if exit_code == 0 else 0.0 + except Exception as e: + logger.warning( + "Task %s: failed to download verifier dir: %s, " + "falling back to exit_code=%d", + task_name, e, exit_code, + ) + reward = 1.0 if exit_code == 0 else 0.0 + finally: + shutil.rmtree(local_verifier_dir, ignore_errors=True) + + # Log test output for debugging failures + if reward == 0.0: + output_preview = output[-500:] if output else "(no output)" + logger.info( + "Task %s: FAIL (exit_code=%d)\n%s", + task_name, exit_code, output_preview, + ) + + return reward + + # ========================================================================= + # Evaluate -- main entry point for the eval subcommand + # ========================================================================= + + async def _eval_with_timeout(self, item: Dict[str, Any]) -> Dict: + """ + Wrap rollout_and_score_eval with a per-task wall-clock timeout. + + If the task exceeds task_timeout seconds, it's automatically scored + as FAIL. This prevents any single task from hanging indefinitely. + """ + task_name = item.get("task_name", "unknown") + category = item.get("category", "unknown") + try: + return await asyncio.wait_for( + self.rollout_and_score_eval(item), + timeout=self.config.task_timeout, + ) + except asyncio.TimeoutError: + from tqdm import tqdm + elapsed = self.config.task_timeout + tqdm.write(f" [TIMEOUT] {task_name} (exceeded {elapsed}s wall-clock limit)") + logger.error("Task %s: wall-clock timeout after %ds", task_name, elapsed) + out = { + "passed": False, "reward": 0.0, + "task_name": task_name, "category": category, + "error": f"timeout ({elapsed}s)", + } + self._save_result(out) + return out + + async def evaluate(self, *args, **kwargs) -> None: + """ + Run Terminal-Bench 2.0 evaluation over all tasks. + + This is the main entry point when invoked via: + python environments/terminalbench2_env.py evaluate + + Runs all tasks through rollout_and_score_eval() via asyncio.gather() + (same pattern as GPQA and other Atropos eval envs). Each task is + wrapped with a wall-clock timeout so hung tasks auto-fail. + + Suppresses noisy Modal/terminal output (HERMES_QUIET) so the tqdm + bar stays visible. + """ + start_time = time.time() + + # Route all logging through tqdm.write() so the progress bar stays + # pinned at the bottom while log lines scroll above it. + from tqdm import tqdm + + class _TqdmHandler(logging.Handler): + def emit(self, record): + try: + tqdm.write(self.format(record)) + except Exception: + self.handleError(record) + + handler = _TqdmHandler() + handler.setFormatter(logging.Formatter( + "%(asctime)s [%(name)s] %(levelname)s: %(message)s", + datefmt="%H:%M:%S", + )) + root = logging.getLogger() + root.handlers = [handler] # Replace any existing handlers + root.setLevel(logging.INFO) + + # Silence noisy third-party loggers that flood the output + logging.getLogger("httpx").setLevel(logging.WARNING) # Every HTTP request + logging.getLogger("openai").setLevel(logging.WARNING) # OpenAI client retries + logging.getLogger("rex-deploy").setLevel(logging.WARNING) # Swerex deployment + logging.getLogger("rex_image_builder").setLevel(logging.WARNING) # Image builds + + print(f"\n{'='*60}") + print("Starting Terminal-Bench 2.0 Evaluation") + print(f"{'='*60}") + print(f" Dataset: {self.config.dataset_name}") + print(f" Total tasks: {len(self.all_eval_items)}") + print(f" Max agent turns: {self.config.max_agent_turns}") + print(f" Task timeout: {self.config.task_timeout}s") + print(f" Terminal backend: {self.config.terminal_backend}") + print(f" Tool thread pool: {self.config.tool_pool_size}") + print(f" Terminal timeout: {self.config.terminal_timeout}s/cmd") + print(f" Terminal lifetime: {self.config.terminal_lifetime}s (auto: task_timeout + 120)") + print(f" Max concurrent tasks: {self.config.max_concurrent_tasks}") + print(f"{'='*60}\n") + + # Semaphore to limit concurrent Modal sandbox creations. + # Without this, all 86 tasks fire simultaneously, each creating a Modal + # sandbox via asyncio.run() inside a thread pool worker. Modal's blocking + # calls (App.lookup, etc.) deadlock when too many are created at once. + semaphore = asyncio.Semaphore(self.config.max_concurrent_tasks) + + async def _eval_with_semaphore(item): + async with semaphore: + return await self._eval_with_timeout(item) + + # Fire all tasks with wall-clock timeout, track live accuracy on the bar + total_tasks = len(self.all_eval_items) + eval_tasks = [ + asyncio.ensure_future(_eval_with_semaphore(item)) + for item in self.all_eval_items + ] + + results = [] + passed_count = 0 + pbar = tqdm(total=total_tasks, desc="Evaluating TB2", dynamic_ncols=True) + try: + for coro in asyncio.as_completed(eval_tasks): + result = await coro + results.append(result) + if result and result.get("passed"): + passed_count += 1 + done = len(results) + pct = (passed_count / done * 100) if done else 0 + pbar.set_postfix_str(f"pass={passed_count}/{done} ({pct:.1f}%)") + pbar.update(1) + except (KeyboardInterrupt, asyncio.CancelledError): + pbar.close() + print(f"\n\nInterrupted! Cleaning up {len(eval_tasks)} tasks...") + # Cancel all pending tasks + for task in eval_tasks: + task.cancel() + # Let cancellations propagate (finally blocks run cleanup_vm) + await asyncio.gather(*eval_tasks, return_exceptions=True) + # Belt-and-suspenders: clean up any remaining sandboxes + from tools.terminal_tool import cleanup_all_environments + cleanup_all_environments() + print("All sandboxes cleaned up.") + return + finally: + pbar.close() + + end_time = time.time() + + # Filter out None results (shouldn't happen, but be safe) + valid_results = [r for r in results if r is not None] + + if not valid_results: + print("Warning: No valid evaluation results obtained") + return + + # ---- Compute metrics ---- + total = len(valid_results) + passed = sum(1 for r in valid_results if r.get("passed")) + overall_pass_rate = passed / total if total > 0 else 0.0 + + # Per-category breakdown + cat_results: Dict[str, List[Dict]] = defaultdict(list) + for r in valid_results: + cat_results[r.get("category", "unknown")].append(r) + + # Build metrics dict + eval_metrics = { + "eval/pass_rate": overall_pass_rate, + "eval/total_tasks": total, + "eval/passed_tasks": passed, + "eval/evaluation_time_seconds": end_time - start_time, + } + + # Per-category metrics + for category, cat_items in sorted(cat_results.items()): + cat_passed = sum(1 for r in cat_items if r.get("passed")) + cat_total = len(cat_items) + cat_pass_rate = cat_passed / cat_total if cat_total > 0 else 0.0 + cat_key = category.replace(" ", "_").replace("-", "_").lower() + eval_metrics[f"eval/pass_rate_{cat_key}"] = cat_pass_rate + + # Store metrics for wandb_log + self.eval_metrics = [(k, v) for k, v in eval_metrics.items()] + + # ---- Print summary ---- + print(f"\n{'='*60}") + print("Terminal-Bench 2.0 Evaluation Results") + print(f"{'='*60}") + print(f"Overall Pass Rate: {overall_pass_rate:.4f} ({passed}/{total})") + print(f"Evaluation Time: {end_time - start_time:.1f} seconds") + + print("\nCategory Breakdown:") + for category, cat_items in sorted(cat_results.items()): + cat_passed = sum(1 for r in cat_items if r.get("passed")) + cat_total = len(cat_items) + cat_rate = cat_passed / cat_total if cat_total > 0 else 0.0 + print(f" {category}: {cat_rate:.1%} ({cat_passed}/{cat_total})") + + # Print individual task results + print("\nTask Results:") + for r in sorted(valid_results, key=lambda x: x.get("task_name", "")): + status = "PASS" if r.get("passed") else "FAIL" + turns = r.get("turns_used", "?") + error = r.get("error", "") + extra = f" (error: {error})" if error else "" + print(f" [{status}] {r['task_name']} (turns={turns}){extra}") + + print(f"{'='*60}\n") + + # Build sample records for evaluate_log (includes full conversations) + samples = [ + { + "task_name": r.get("task_name"), + "category": r.get("category"), + "passed": r.get("passed"), + "reward": r.get("reward"), + "turns_used": r.get("turns_used"), + "error": r.get("error"), + "messages": r.get("messages"), + } + for r in valid_results + ] + + # Log evaluation results + try: + await self.evaluate_log( + metrics=eval_metrics, + samples=samples, + start_time=start_time, + end_time=end_time, + generation_parameters={ + "temperature": self.config.agent_temperature, + "max_tokens": self.config.max_token_length, + "max_agent_turns": self.config.max_agent_turns, + "terminal_backend": self.config.terminal_backend, + }, + ) + except Exception as e: + print(f"Error logging evaluation results: {e}") + + # Close streaming file + if hasattr(self, "_streaming_file") and not self._streaming_file.closed: + self._streaming_file.close() + print(f" Live results saved to: {self._streaming_path}") + + # Kill all remaining sandboxes. Timed-out tasks leave orphaned thread + # pool workers still executing commands -- cleanup_all stops them. + from tools.terminal_tool import cleanup_all_environments + print("\nCleaning up all sandboxes...") + cleanup_all_environments() + + # Shut down the tool thread pool so orphaned workers from timed-out + # tasks are killed immediately instead of retrying against dead + # sandboxes and spamming the console with TimeoutError warnings. + from environments.agent_loop import _tool_executor + _tool_executor.shutdown(wait=False, cancel_futures=True) + print("Done.") + + # ========================================================================= + # Wandb logging + # ========================================================================= + + async def wandb_log(self, wandb_metrics: Optional[Dict] = None): + """Log TB2-specific metrics to wandb.""" + if wandb_metrics is None: + wandb_metrics = {} + + # Add stored eval metrics + for metric_name, metric_value in self.eval_metrics: + wandb_metrics[metric_name] = metric_value + self.eval_metrics = [] + + await super().wandb_log(wandb_metrics) + + +if __name__ == "__main__": + TerminalBench2EvalEnv.cli()