diff --git a/batch_runner.py b/batch_runner.py index 9d21aebc..2487d9fb 100644 --- a/batch_runner.py +++ b/batch_runner.py @@ -98,10 +98,9 @@ def _extract_tool_stats(messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, i # Terminal wraps its response in a "content" field if "content" in content_json and isinstance(content_json["content"], dict): inner_content = content_json["content"] - # Check for actual error (non-null error field or non-zero exit code) - has_error = (inner_content.get("error") is not None or - inner_content.get("exit_code", 0) != 0) - if has_error: + # Check for actual error (non-null error field) + # Note: non-zero exit codes are not failures - the model can self-correct + if inner_content.get("error") is not None: is_success = False # Check for "success": false pattern used by some tools diff --git a/model_tools.py b/model_tools.py index 9e5a6e8c..eb27b753 100644 --- a/model_tools.py +++ b/model_tools.py @@ -31,7 +31,9 @@ import asyncio from typing import Dict, Any, List, Optional from tools.web_tools import web_search_tool, web_extract_tool, web_crawl_tool, check_firecrawl_api_key -from tools.terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION +from tools.simple_terminal_tool import simple_terminal_tool, check_requirements as check_simple_terminal_requirements, SIMPLE_TERMINAL_TOOL_DESCRIPTION +# Keep old terminal tool for backwards compatibility if needed +# from tools.terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION from tools.vision_tools import vision_analyze_tool, check_vision_requirements from tools.mixture_of_agents_tool import mixture_of_agents_tool, check_moa_requirements from tools.image_generation_tool import image_generate_tool, check_image_generation_requirements @@ -111,7 +113,7 @@ def get_web_tool_definitions() -> List[Dict[str, Any]]: def get_terminal_tool_definitions() -> List[Dict[str, Any]]: """ Get tool definitions for terminal tools in OpenAI's expected format. - + Returns: List[Dict]: List of terminal tool definitions compatible with OpenAI API """ @@ -120,7 +122,7 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "terminal", - "description": TERMINAL_TOOL_DESCRIPTION, + "description": SIMPLE_TERMINAL_TOOL_DESCRIPTION, "parameters": { "type": "object", "properties": { @@ -128,28 +130,18 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]: "type": "string", "description": "The command to execute on the VM" }, - "input_keys": { - "type": "string", - "description": "Keystrokes to send to the most recent interactive session (e.g., 'hello\\n' for typing hello + Enter). If no active session exists, this will be ignored." - }, "background": { "type": "boolean", "description": "Whether to run the command in the background (default: false)", "default": False }, - "idle_threshold": { - "type": "number", - "description": "Seconds to wait for output before considering session idle (default: 5.0)", - "default": 5.0, - "minimum": 0.1 - }, "timeout": { "type": "integer", "description": "Command timeout in seconds (optional)", "minimum": 1 } }, - "required": [] + "required": ["command"] } } } @@ -262,11 +254,11 @@ def get_all_tool_names() -> List[str]: # Web tools if check_firecrawl_api_key(): tool_names.extend(["web_search", "web_extract", "web_crawl"]) - - # Terminal tools - if check_hecate_requirements(): + + # Terminal tools + if check_simple_terminal_requirements(): tool_names.extend(["terminal"]) - + # Vision tools if check_vision_requirements(): tool_names.extend(["vision_analyze"]) @@ -346,11 +338,11 @@ def get_tool_definitions( if check_firecrawl_api_key(): for tool in get_web_tool_definitions(): all_available_tools_map[tool["function"]["name"]] = tool - - if check_hecate_requirements(): + + if check_simple_terminal_requirements(): for tool in get_terminal_tool_definitions(): all_available_tools_map[tool["function"]["name"]] = tool - + if check_vision_requirements(): for tool in get_vision_tool_definitions(): all_available_tools_map[tool["function"]["name"]] = tool @@ -494,12 +486,10 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A """ if function_name == "terminal": command = function_args.get("command") - input_keys = function_args.get("input_keys") background = function_args.get("background", False) - idle_threshold = function_args.get("idle_threshold", 5.0) timeout = function_args.get("timeout") - return terminal_tool(command, input_keys, None, background, idle_threshold, timeout, task_id) + return simple_terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id) else: return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False) @@ -681,10 +671,10 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]: "requirements": ["FIRECRAWL_API_KEY environment variable"] }, "terminal_tools": { - "available": check_hecate_requirements(), - "tools": ["terminal_tool"], - "description": "Execute commands with optional interactive session support on Linux VMs", - "requirements": ["MORPH_API_KEY environment variable", "hecate package"] + "available": check_simple_terminal_requirements(), + "tools": ["simple_terminal_tool"], + "description": "Execute commands on secure Linux VMs without session persistence", + "requirements": ["MORPH_API_KEY environment variable"] }, "vision_tools": { "available": check_vision_requirements(), @@ -711,13 +701,13 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]: def check_toolset_requirements() -> Dict[str, bool]: """ Check if all requirements for available toolsets are met. - + Returns: Dict: Status of each toolset's requirements """ return { "web_tools": check_firecrawl_api_key(), - "terminal_tools": check_hecate_requirements(), + "terminal_tools": check_simple_terminal_requirements(), "vision_tools": check_vision_requirements(), "moa_tools": check_moa_requirements(), "image_tools": check_image_generation_requirements() diff --git a/run_agent.py b/run_agent.py index e7abde80..97fb3708 100644 --- a/run_agent.py +++ b/run_agent.py @@ -388,7 +388,7 @@ class AIAgent: while api_call_count < self.max_iterations: api_call_count += 1 - print(f"\nšŸ”„ Making API call #{api_call_count}...") + print(f"\nšŸ”„ Making OpenAI-compatible API call #{api_call_count}...") # Log request details if verbose if self.verbose_logging: @@ -397,8 +397,8 @@ class AIAgent: api_start_time = time.time() retry_count = 0 - max_retries = 3 - + max_retries = 6 # Increased to allow longer backoff periods + while retry_count <= max_retries: try: # Prepare messages for API call @@ -407,30 +407,30 @@ class AIAgent: if active_system_prompt: # Insert system message at the beginning api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages - + # Make API call with tools response = self.client.chat.completions.create( model=self.model, messages=api_messages, tools=self.tools if self.tools else None, - timeout=60.0 # Add explicit timeout + timeout=300.0 # 5 minute timeout for long-running agent tasks ) - + api_duration = time.time() - api_start_time - print(f"ā±ļø API call completed in {api_duration:.2f}s") - + print(f"ā±ļø OpenAI-compatible API call completed in {api_duration:.2f}s") + if self.verbose_logging: logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}") - + break # Success, exit retry loop - + except Exception as api_error: retry_count += 1 if retry_count > max_retries: raise api_error - - wait_time = min(2 ** retry_count, 10) # Exponential backoff, max 10s - print(f"āš ļø API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}") + + wait_time = min(2 ** retry_count, 60) # Exponential backoff: 2s, 4s, 8s, 16s, 32s, 60s, 60s + print(f"āš ļø OpenAI-compatible API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}") print(f"ā³ Retrying in {wait_time}s...") logging.warning(f"API retry {retry_count}/{max_retries} after error: {api_error}") time.sleep(wait_time) @@ -522,11 +522,11 @@ class AIAgent: "content": final_response }) - print(f"šŸŽ‰ Conversation completed after {api_call_count} API call(s)") + print(f"šŸŽ‰ Conversation completed after {api_call_count} OpenAI-compatible API call(s)") break except Exception as e: - error_msg = f"Error during API call #{api_call_count}: {str(e)}" + error_msg = f"Error during OpenAI-compatible API call #{api_call_count}: {str(e)}" print(f"āŒ {error_msg}") if self.verbose_logging: diff --git a/tools/mixture_of_agents_tool.py b/tools/mixture_of_agents_tool.py index a0e8db1e..c94d9e1d 100644 --- a/tools/mixture_of_agents_tool.py +++ b/tools/mixture_of_agents_tool.py @@ -161,11 +161,11 @@ def _construct_aggregator_prompt(system_prompt: str, responses: List[str]) -> st async def _run_reference_model_safe( - model: str, - user_prompt: str, + model: str, + user_prompt: str, temperature: float = REFERENCE_TEMPERATURE, max_tokens: int = 32000, - max_retries: int = 3 + max_retries: int = 6 ) -> tuple[str, str, bool]: """ Run a single reference model with retry logic and graceful failure handling. @@ -212,8 +212,8 @@ async def _run_reference_model_safe( print(f"āš ļø {model} unknown error (attempt {attempt + 1}): {error_str}") if attempt < max_retries - 1: - # Exponential backoff for rate limiting - sleep_time = 2 ** attempt + # Exponential backoff for rate limiting: 2s, 4s, 8s, 16s, 32s, 60s + sleep_time = min(2 ** (attempt + 1), 60) print(f" Retrying in {sleep_time}s...") await asyncio.sleep(sleep_time) else: diff --git a/tools/simple_terminal_tool.py b/tools/simple_terminal_tool.py new file mode 100644 index 00000000..6ebfeeda --- /dev/null +++ b/tools/simple_terminal_tool.py @@ -0,0 +1,395 @@ +#!/usr/bin/env python3 +""" +Simple Terminal Tool Module + +A simplified terminal tool that executes commands on MorphCloud VMs without tmux. +No session persistence, no interactive app support - just simple command execution. + +Features: +- Direct SSH command execution +- Background task support +- VM lifecycle management with TTL +- Automatic cleanup after inactivity + +Usage: + from simple_terminal_tool import simple_terminal_tool + + # Execute a simple command + result = simple_terminal_tool("ls -la") + + # Execute in background + result = simple_terminal_tool("python server.py", background=True) +""" + +import json +import os +import time +import threading +import atexit +from typing import Optional, Dict, Any + +# Tool description for LLM +SIMPLE_TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux VM environment. + +**Environment:** +- Minimal Debian-based OS with internet access +- Automatic VM lifecycle management (creates on-demand, reuses, cleans up) +- Filesystem is persisted between tool calls but environment variables, venvs, etc are reset. + +**Command Execution:** +- Simple commands: Just provide the 'command' parameter +- Background processes: Set 'background': True for servers/long-running tasks +- Command timeout: Optional 'timeout' parameter in seconds + +**Examples:** +- Run command: `{"command": "ls -la"}` +- Background task: `{"command": "source path/to/my/venv/bin/activate && python server.py", "background": True}` +- With timeout: `{"command": "long_task.sh", "timeout": 300}` + +**Best Practices:** +- Run servers/long processes in background +- Monitor disk usage for large tasks +- Install whatever tools you need with sudo apt-get +- Do not be afraid to run pip with --break-system-packages + +**Things to avoid** +- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. Even git sometimes becomes interactive if the output is large. If you're not sure pipe to cat. +""" + +# Global state for VM lifecycle management +_active_instances: Dict[str, Any] = {} +_last_activity: Dict[str, float] = {} +_instance_lock = threading.Lock() +_cleanup_thread = None +_cleanup_running = False + + +def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300): + """Clean up VMs that have been inactive for longer than vm_lifetime_seconds.""" + global _active_instances, _last_activity + + current_time = time.time() + tasks_to_cleanup = [] + + with _instance_lock: + for task_id, last_time in list(_last_activity.items()): + if current_time - last_time > vm_lifetime_seconds: + tasks_to_cleanup.append(task_id) + + for task_id in tasks_to_cleanup: + try: + if task_id in _active_instances: + instance = _active_instances[task_id] + if hasattr(instance, 'terminate'): + instance.terminate() + elif hasattr(instance, 'stop'): + instance.stop() + elif hasattr(instance, 'delete'): + instance.delete() + + del _active_instances[task_id] + print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}") + + if task_id in _last_activity: + del _last_activity[task_id] + + except Exception as e: + # 404 errors are benign - VM already cleaned up by TTL + error_str = str(e) + if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower(): + print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)") + else: + print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}") + + +def _cleanup_thread_worker(): + """Background thread worker that periodically cleans up inactive VMs.""" + global _cleanup_running + + while _cleanup_running: + try: + vm_lifetime = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300")) + _cleanup_inactive_vms(vm_lifetime) + except Exception as e: + print(f"[VM Cleanup] Error in cleanup thread: {e}") + + for _ in range(60): + if not _cleanup_running: + break + time.sleep(1) + + +def _start_cleanup_thread(): + """Start the background cleanup thread if not already running.""" + global _cleanup_thread, _cleanup_running + + with _instance_lock: + if _cleanup_thread is None or not _cleanup_thread.is_alive(): + _cleanup_running = True + _cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True) + _cleanup_thread.start() + + +def _stop_cleanup_thread(): + """Stop the background cleanup thread.""" + global _cleanup_running + _cleanup_running = False + if _cleanup_thread is not None: + _cleanup_thread.join(timeout=5) + + +def cleanup_vm(task_id: str): + """Manually clean up a specific VM by task_id.""" + global _active_instances, _last_activity + + with _instance_lock: + try: + if task_id in _active_instances: + instance = _active_instances[task_id] + if hasattr(instance, 'terminate'): + instance.terminate() + elif hasattr(instance, 'stop'): + instance.stop() + elif hasattr(instance, 'delete'): + instance.delete() + + del _active_instances[task_id] + print(f"[VM Cleanup] Manually terminated VM for task: {task_id}") + + if task_id in _last_activity: + del _last_activity[task_id] + + except Exception as e: + # 404 errors are benign - VM already cleaned up by TTL + error_str = str(e) + if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower(): + print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)") + else: + print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}") + + +atexit.register(_stop_cleanup_thread) + + +def _execute_ssh_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]: + """ + Execute a command via SSH on the VM instance. + + Args: + instance: MorphVM instance + command: Command to execute + timeout: Optional timeout in seconds + + Returns: + dict with stdout, stderr, returncode + """ + ssh_context_manager = None + try: + # Use the instance's SSH context manager + ssh_context_manager = instance.ssh() + ssh_context = ssh_context_manager.__enter__() + + # Execute the command + result = ssh_context.run(command, get_pty=False, timeout=timeout or 120) + + # Close the SSH connection + if ssh_context_manager: + try: + ssh_context_manager.__exit__(None, None, None) + except: + pass + + return { + "stdout": result.stdout or "", + "stderr": result.stderr or "", + "returncode": result.returncode + } + + except Exception as e: + # Close connection on error + if ssh_context_manager: + try: + ssh_context_manager.__exit__(None, None, None) + except: + pass + + # Check if it's a timeout + error_str = str(e).lower() + if "timeout" in error_str: + return { + "stdout": "", + "stderr": f"Command timed out after {timeout or 120} seconds", + "returncode": 124 + } + + return { + "stdout": "", + "stderr": f"SSH execution failed: {str(e)}", + "returncode": -1 + } + + +def simple_terminal_tool( + command: str, + background: bool = False, + timeout: Optional[int] = None, + task_id: Optional[str] = None +) -> str: + """ + Execute a command on a MorphCloud VM without session persistence. + + Args: + command: The command to execute + background: Whether to run in background (default: False) + timeout: Command timeout in seconds (default: 120) + task_id: Unique identifier for VM isolation (optional) + + Returns: + str: JSON string with output, exit_code, and error fields + + Examples: + # Execute a simple command + >>> result = simple_terminal_tool(command="ls -la /tmp") + + # Run a background task + >>> result = simple_terminal_tool(command="python server.py", background=True) + + # With custom timeout + >>> result = simple_terminal_tool(command="long_task.sh", timeout=300) + """ + global _active_instances, _last_activity + + try: + # Import required modules + try: + from morphcloud.api import MorphCloudClient + except ImportError as import_error: + return json.dumps({ + "output": "", + "exit_code": -1, + "error": f"Terminal tool disabled: {import_error}", + "status": "disabled" + }, ensure_ascii=False) + + # Get configuration + vm_ttl_seconds = int(os.getenv("HECATE_VM_TTL_SECONDS", "1200")) + snapshot_id = os.getenv("HECATE_DEFAULT_SNAPSHOT_ID", "snapshot_defv9tjg") + + # Check API key + morph_api_key = os.getenv("MORPH_API_KEY") + if not morph_api_key: + return json.dumps({ + "output": "", + "exit_code": -1, + "error": "MORPH_API_KEY environment variable not set", + "status": "disabled" + }, ensure_ascii=False) + + # Use task_id for VM isolation + effective_task_id = task_id or "default" + + # Start cleanup thread + _start_cleanup_thread() + + # Get or create VM instance + with _instance_lock: + if effective_task_id not in _active_instances: + morph_client = MorphCloudClient(api_key=morph_api_key) + _active_instances[effective_task_id] = morph_client.instances.start( + snapshot_id=snapshot_id, + ttl_seconds=vm_ttl_seconds, + ttl_action="stop" + ) + + # Update last activity time + _last_activity[effective_task_id] = time.time() + instance = _active_instances[effective_task_id] + + # Wait for instance to be ready + instance.wait_until_ready() + + # Prepare command for execution + if background: + # Run in background with nohup and redirect output + exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &" + result = _execute_ssh_command(instance, exec_command, timeout=10) + + # For background tasks, return immediately with info + if result["returncode"] == 0: + return json.dumps({ + "output": "Background task started successfully", + "exit_code": 0, + "error": None + }, ensure_ascii=False) + else: + return json.dumps({ + "output": result["stdout"], + "exit_code": result["returncode"], + "error": result["stderr"] + }, ensure_ascii=False) + else: + # Run foreground command + result = _execute_ssh_command(instance, command, timeout=timeout) + + # Combine stdout and stderr for output + output = result["stdout"] + if result["stderr"] and result["returncode"] != 0: + output = f"{output}\n{result['stderr']}" if output else result["stderr"] + + return json.dumps({ + "output": output.strip(), + "exit_code": result["returncode"], + "error": result["stderr"] if result["returncode"] != 0 else None + }, ensure_ascii=False) + + except Exception as e: + return json.dumps({ + "output": "", + "exit_code": -1, + "error": f"Failed to execute command: {str(e)}", + "status": "error" + }, ensure_ascii=False) + + +def check_requirements() -> bool: + """Check if all requirements for the simple terminal tool are met.""" + required_vars = ["MORPH_API_KEY"] + missing_required = [var for var in required_vars if not os.getenv(var)] + + if missing_required: + print(f"Missing required environment variables: {', '.join(missing_required)}") + return False + + try: + from morphcloud.api import MorphCloudClient + return True + except Exception as e: + print(f"MorphCloud not available: {e}") + return False + + +if __name__ == "__main__": + """Simple test when run directly.""" + print("Simple Terminal Tool Module") + print("=" * 40) + + if not check_requirements(): + print("Requirements not met. Please check the messages above.") + exit(1) + + print("All requirements met!") + print("\nAvailable Tool:") + print(" - simple_terminal_tool: Execute commands without session persistence") + + print("\nUsage Examples:") + print(" # Execute a command") + print(" result = simple_terminal_tool(command='ls -la')") + print(" ") + print(" # Run a background task") + print(" result = simple_terminal_tool(command='python server.py', background=True)") + + print("\nEnvironment Variables:") + print(f" MORPH_API_KEY: {'Set' if os.getenv('MORPH_API_KEY') else 'Not set'}") + print(f" HECATE_VM_TTL_SECONDS: {os.getenv('HECATE_VM_TTL_SECONDS', '1200')} (default: 1200 / 20 minutes)") + print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300 / 5 minutes)") + print(f" HECATE_DEFAULT_SNAPSHOT_ID: {os.getenv('HECATE_DEFAULT_SNAPSHOT_ID', 'snapshot_defv9tjg')}") diff --git a/tools/web_tools.py b/tools/web_tools.py index e3a65f71..3f7df9f4 100644 --- a/tools/web_tools.py +++ b/tools/web_tools.py @@ -184,10 +184,10 @@ Your goal is to preserve ALL important information while reducing length. Never Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights.""" # Call the LLM asynchronously with retry logic for flaky API - max_retries = 3 + max_retries = 6 retry_delay = 2 # Start with 2 seconds last_error = None - + for attempt in range(max_retries): try: response = await nous_client.chat.completions.create( @@ -206,7 +206,7 @@ Create a markdown summary that captures all key information in a well-organized, print(f"āš ļø LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}") print(f" Retrying in {retry_delay}s...") await asyncio.sleep(retry_delay) - retry_delay *= 2 # Exponential backoff: 2s, 4s, 8s + retry_delay = min(retry_delay * 2, 60) # Exponential backoff: 2s, 4s, 8s, 16s, 32s, 60s else: # All retries exhausted raise last_error