From a4db3fdee5b921f39092d3d61ae50c6963a61231 Mon Sep 17 00:00:00 2001 From: hjc-puro Date: Mon, 3 Nov 2025 17:42:23 -0500 Subject: [PATCH 1/2] fix leakage --- batch_runner.py | 4 ++- model_tools.py | 36 +++++++++++++------------- run_agent.py | 28 +++++++++++++-------- tools/terminal_tool.py | 57 +++++++++++++++++++++--------------------- 4 files changed, 67 insertions(+), 58 deletions(-) diff --git a/batch_runner.py b/batch_runner.py index 4954126f9..788e0b2b0 100644 --- a/batch_runner.py +++ b/batch_runner.py @@ -156,6 +156,7 @@ def _process_single_prompt( print(f" Prompt {prompt_index}: Using toolsets {selected_toolsets}") # Initialize agent with sampled toolsets + # Use prompt_index as task_id to ensure each task gets its own isolated VM agent = AIAgent( base_url=config.get("base_url"), api_key=config.get("api_key"), @@ -164,7 +165,8 @@ def _process_single_prompt( enabled_toolsets=selected_toolsets, save_trajectories=False, # We handle saving ourselves verbose_logging=config.get("verbose", False), - ephemeral_system_prompt=config.get("ephemeral_system_prompt") + ephemeral_system_prompt=config.get("ephemeral_system_prompt"), + task_id=f"task_{prompt_index}" ) # Run the agent diff --git a/model_tools.py b/model_tools.py index 26cf2aae1..261914a5d 100644 --- a/model_tools.py +++ b/model_tools.py @@ -480,14 +480,15 @@ def handle_web_function_call(function_name: str, function_args: Dict[str, Any]) else: return json.dumps({"error": f"Unknown web function: {function_name}"}) -def handle_terminal_function_call(function_name: str, function_args: Dict[str, Any]) -> str: +def handle_terminal_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str: """ Handle function calls for terminal tools. - + Args: function_name (str): Name of the terminal function to call function_args (Dict): Arguments for the function - + task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional) + Returns: str: Function result as JSON string """ @@ -498,8 +499,8 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A idle_threshold = function_args.get("idle_threshold", 5.0) timeout = function_args.get("timeout") - return terminal_tool(command, input_keys, None, background, idle_threshold, timeout) - + return terminal_tool(command, input_keys, None, background, idle_threshold, timeout, task_id) + else: return json.dumps({"error": f"Unknown terminal function: {function_name}"}) @@ -614,21 +615,22 @@ def handle_image_function_call(function_name: str, function_args: Dict[str, Any] return json.dumps({"error": f"Unknown image generation function: {function_name}"}) -def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> str: +def handle_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str: """ Main function call dispatcher that routes calls to appropriate toolsets. - + This function determines which toolset a function belongs to and dispatches the call to the appropriate handler. This makes it easy to add new toolsets without changing the main calling interface. - + Args: function_name (str): Name of the function to call function_args (Dict): Arguments for the function - + task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional) + Returns: str: Function result as JSON string - + Raises: None: Returns error as JSON string instead of raising exceptions """ @@ -636,28 +638,28 @@ def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> s # Route web tools if function_name in ["web_search", "web_extract", "web_crawl"]: return handle_web_function_call(function_name, function_args) - + # Route terminal tools elif function_name in ["terminal"]: - return handle_terminal_function_call(function_name, function_args) - + return handle_terminal_function_call(function_name, function_args, task_id) + # Route vision tools elif function_name in ["vision_analyze"]: return handle_vision_function_call(function_name, function_args) - + # Route MoA tools elif function_name in ["mixture_of_agents"]: return handle_moa_function_call(function_name, function_args) - + # Route image generation tools elif function_name in ["image_generate"]: return handle_image_function_call(function_name, function_args) - + else: error_msg = f"Unknown function: {function_name}" print(f"❌ {error_msg}") return json.dumps({"error": error_msg}) - + except Exception as e: error_msg = f"Error executing {function_name}: {str(e)}" print(f"❌ {error_msg}") diff --git a/run_agent.py b/run_agent.py index e828d3e29..c3f17ea63 100644 --- a/run_agent.py +++ b/run_agent.py @@ -54,9 +54,9 @@ class AIAgent: """ def __init__( - self, - base_url: str = None, - api_key: str = None, + self, + base_url: str = None, + api_key: str = None, model: str = "gpt-4", max_iterations: int = 10, tool_delay: float = 1.0, @@ -64,11 +64,12 @@ class AIAgent: disabled_toolsets: List[str] = None, save_trajectories: bool = False, verbose_logging: bool = False, - ephemeral_system_prompt: str = None + ephemeral_system_prompt: str = None, + task_id: str = None ): """ Initialize the AI Agent. - + Args: base_url (str): Base URL for the model API (optional) api_key (str): API key for authentication (optional, uses env var if not provided) @@ -80,6 +81,7 @@ class AIAgent: save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) verbose_logging (bool): Enable verbose logging for debugging (default: False) ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) + task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional) """ self.model = model self.max_iterations = max_iterations @@ -87,7 +89,11 @@ class AIAgent: self.save_trajectories = save_trajectories self.verbose_logging = verbose_logging self.ephemeral_system_prompt = ephemeral_system_prompt - + + # Generate unique task_id if not provided to isolate VMs between concurrent tasks + import uuid + self.task_id = task_id or str(uuid.uuid4()) + # Store toolset filtering options self.enabled_toolsets = enabled_toolsets self.disabled_toolsets = disabled_toolsets @@ -469,12 +475,12 @@ class AIAgent: function_args = {} print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())})") - + tool_start_time = time.time() - - # Execute the tool - function_result = handle_function_call(function_name, function_args) - + + # Execute the tool with task_id to isolate VMs between concurrent tasks + function_result = handle_function_call(function_name, function_args, self.task_id) + tool_duration = time.time() - tool_start_time result_preview = function_result[:200] if len(function_result) > 200 else function_result diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 44af7eda4..6b7d9de30 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -75,8 +75,9 @@ When commands enter interactive mode (vim, nano, less, git prompts, package mana # Global state for VM lifecycle management # These persist across tool calls to enable session continuity -_active_instance = None -_active_context = None +# Changed to dictionaries keyed by task_id to prevent leakage between concurrent tasks +_active_instances: Dict[str, Any] = {} +_active_contexts: Dict[str, Any] = {} _instance_lock = threading.Lock() def terminal_tool( @@ -85,23 +86,25 @@ def terminal_tool( session_id: Optional[str] = None, background: bool = False, idle_threshold: float = 5.0, - timeout: Optional[int] = None + timeout: Optional[int] = None, + task_id: Optional[str] = None ) -> str: """ Execute a command on a Morph VM with optional interactive session support. - + This tool uses Hecate's VM lifecycle management to automatically create and manage VMs. VMs are reused within the configured lifetime window and automatically cleaned up after inactivity. - + Args: command: The command to execute (optional if continuing existing session) input_keys: Keystrokes to send to interactive session (e.g., "hello\\n") session_id: ID of existing session to continue (optional) - background: Whether to run the command in the background (default: False) + background: Whether to run the command in the background (default: False) idle_threshold: Seconds to wait for output before considering session idle (default: 5.0) timeout: Command timeout in seconds (optional) - + task_id: Unique identifier for this task to isolate VMs between concurrent tasks (optional) + Returns: str: JSON string containing command output, session info, exit code, and any errors @@ -120,7 +123,7 @@ def terminal_tool( # Run a background task >>> result = terminal_tool(command="sleep 60", background=True) """ - global _active_instance, _active_context + global _active_instances, _active_contexts try: # Import required modules lazily so this module can be imported @@ -135,10 +138,8 @@ def terminal_tool( return json.dumps({ "output": "", "screen": "", - "session_id": None, "exit_code": -1, - "error": f"Terminal tool is disabled due to import error: {import_error}", - "status": "disabled" + "error": f"Terminal tool is disabled due to import error: {import_error}" }) # Get configuration from environment @@ -151,25 +152,27 @@ def terminal_tool( return json.dumps({ "output": "", "screen": "", - "session_id": None, "exit_code": -1, - "error": "MORPH_API_KEY environment variable not set", - "status": "disabled" + "error": "MORPH_API_KEY environment variable not set" }) - # Get or create VM instance and execution context + # Use task_id to isolate VMs between concurrent tasks + # If no task_id provided, use "default" for backward compatibility + effective_task_id = task_id or "default" + + # Get or create VM instance and execution context per task # This is critical for interactive session support - the context must persist! with _instance_lock: - if _active_instance is None: + if effective_task_id not in _active_instances: morph_client = MorphCloudClient(api_key=morph_api_key) - _active_instance = morph_client.instances.start(snapshot_id=snapshot_id) + _active_instances[effective_task_id] = morph_client.instances.start(snapshot_id=snapshot_id) - # Get or create persistent execution context - if _active_context is None: - _active_context = ExecutionContext() + # Get or create persistent execution context per task + if effective_task_id not in _active_contexts: + _active_contexts[effective_task_id] = ExecutionContext() - instance = _active_instance - ctx = _active_context + instance = _active_instances[effective_task_id] + ctx = _active_contexts[effective_task_id] # Build tool input based on provided parameters tool_input = {} @@ -208,15 +211,13 @@ def terminal_tool( ctx=ctx ) - # Format the result with all possible fields + # Format the result with only essential fields for the LLM # Map hecate's "stdout" to "output" for compatibility formatted_result = { "output": result.get("stdout", result.get("output", "")), "screen": result.get("screen", ""), - "session_id": result.get("session_id"), "exit_code": result.get("returncode", result.get("exit_code", -1)), - "error": result.get("error"), - "status": "active" if result.get("session_id") else "ended" + "error": result.get("error") } return json.dumps(formatted_result) @@ -225,10 +226,8 @@ def terminal_tool( return json.dumps({ "output": "", "screen": "", - "session_id": None, "exit_code": -1, - "error": f"Failed to execute terminal command: {str(e)}", - "status": "error" + "error": f"Failed to execute terminal command: {str(e)}" }) def check_hecate_requirements() -> bool: From fbd3a2fdb88e67d5ae3923f074d437bd5adeae73 Mon Sep 17 00:00:00 2001 From: hjc-puro Date: Tue, 4 Nov 2025 03:32:43 -0500 Subject: [PATCH 2/2] prevent leakage of morph instances between tasks --- batch_runner.py | 10 ++- model_tools.py | 2 +- run_agent.py | 39 ++++++----- tools/terminal_tool.py | 153 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 177 insertions(+), 27 deletions(-) diff --git a/batch_runner.py b/batch_runner.py index 788e0b2b0..5dd35955e 100644 --- a/batch_runner.py +++ b/batch_runner.py @@ -156,7 +156,6 @@ def _process_single_prompt( print(f" Prompt {prompt_index}: Using toolsets {selected_toolsets}") # Initialize agent with sampled toolsets - # Use prompt_index as task_id to ensure each task gets its own isolated VM agent = AIAgent( base_url=config.get("base_url"), api_key=config.get("api_key"), @@ -165,12 +164,11 @@ def _process_single_prompt( enabled_toolsets=selected_toolsets, save_trajectories=False, # We handle saving ourselves verbose_logging=config.get("verbose", False), - ephemeral_system_prompt=config.get("ephemeral_system_prompt"), - task_id=f"task_{prompt_index}" + ephemeral_system_prompt=config.get("ephemeral_system_prompt") ) - - # Run the agent - result = agent.run_conversation(prompt) + + # Run the agent with task_id to ensure each task gets its own isolated VM + result = agent.run_conversation(prompt, task_id=f"task_{prompt_index}") # Extract tool usage statistics tool_stats = _extract_tool_stats(result["messages"]) diff --git a/model_tools.py b/model_tools.py index 261914a5d..cbd45c591 100644 --- a/model_tools.py +++ b/model_tools.py @@ -28,7 +28,7 @@ Usage: import json import asyncio -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from tools.web_tools import web_search_tool, web_extract_tool, web_crawl_tool, check_firecrawl_api_key from tools.terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION diff --git a/run_agent.py b/run_agent.py index c3f17ea63..f06d0d019 100644 --- a/run_agent.py +++ b/run_agent.py @@ -43,6 +43,7 @@ else: # Import our tool system from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements +from tools.terminal_tool import cleanup_vm class AIAgent: @@ -64,8 +65,7 @@ class AIAgent: disabled_toolsets: List[str] = None, save_trajectories: bool = False, verbose_logging: bool = False, - ephemeral_system_prompt: str = None, - task_id: str = None + ephemeral_system_prompt: str = None ): """ Initialize the AI Agent. @@ -81,7 +81,6 @@ class AIAgent: save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) verbose_logging (bool): Enable verbose logging for debugging (default: False) ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) - task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional) """ self.model = model self.max_iterations = max_iterations @@ -90,10 +89,6 @@ class AIAgent: self.verbose_logging = verbose_logging self.ephemeral_system_prompt = ephemeral_system_prompt - # Generate unique task_id if not provided to isolate VMs between concurrent tasks - import uuid - self.task_id = task_id or str(uuid.uuid4()) - # Store toolset filtering options self.enabled_toolsets = enabled_toolsets self.disabled_toolsets = disabled_toolsets @@ -348,22 +343,27 @@ class AIAgent: print(f"⚠️ Failed to save trajectory: {e}") def run_conversation( - self, - user_message: str, - system_message: str = None, - conversation_history: List[Dict[str, Any]] = None + self, + user_message: str, + system_message: str = None, + conversation_history: List[Dict[str, Any]] = None, + task_id: str = None ) -> Dict[str, Any]: """ Run a complete conversation with tool calling until completion. - + Args: user_message (str): The user's message/question system_message (str): Custom system message (optional, overrides ephemeral_system_prompt if provided) conversation_history (List[Dict]): Previous conversation messages (optional) - + task_id (str): Unique identifier for this task to isolate VMs between concurrent tasks (optional, auto-generated if not provided) + Returns: Dict: Complete conversation result with final response and message history """ + # Generate unique task_id if not provided to isolate VMs between concurrent tasks + import uuid + effective_task_id = task_id or str(uuid.uuid4()) # Initialize conversation messages = conversation_history or [] @@ -479,7 +479,7 @@ class AIAgent: tool_start_time = time.time() # Execute the tool with task_id to isolate VMs between concurrent tasks - function_result = handle_function_call(function_name, function_args, self.task_id) + function_result = handle_function_call(function_name, function_args, effective_task_id) tool_duration = time.time() - tool_start_time result_preview = function_result[:200] if len(function_result) > 200 else function_result @@ -543,10 +543,17 @@ class AIAgent: # Determine if conversation completed successfully completed = final_response is not None and api_call_count < self.max_iterations - + # Save trajectory if enabled self._save_trajectory(messages, user_message, completed) - + + # Clean up VM for this task after conversation completes + try: + cleanup_vm(effective_task_id) + except Exception as e: + if self.verbose_logging: + logging.warning(f"Failed to cleanup VM for task {effective_task_id}: {e}") + return { "final_response": final_response, "messages": messages, diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 6b7d9de30..658a6823a 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -4,8 +4,12 @@ Terminal Tool Module This module provides a single terminal tool using Hecate's VM infrastructure. It wraps Hecate's functionality to provide a simple interface for executing commands -on Morph VMs with automatic lifecycle management. VMs live for 5 minutes after last use. -Timer resets with each use. +on Morph VMs with automatic lifecycle management. + +VM Lifecycle: +- VMs have a TTL (time to live) set at creation (default: 20 minutes) +- VMs are also cleaned up locally after 5 minutes of inactivity +- Timer resets with each use Available tool: - terminal_tool: Execute commands with optional interactive session support @@ -24,6 +28,8 @@ import json import os import uuid import threading +import time +import atexit from typing import Optional, Dict, Any # Detailed description for the terminal tool based on Hermes Terminal system prompt @@ -78,7 +84,134 @@ When commands enter interactive mode (vim, nano, less, git prompts, package mana # Changed to dictionaries keyed by task_id to prevent leakage between concurrent tasks _active_instances: Dict[str, Any] = {} _active_contexts: Dict[str, Any] = {} +_last_activity: Dict[str, float] = {} # Track last activity time for each VM _instance_lock = threading.Lock() +_cleanup_thread = None +_cleanup_running = False + +def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300): + """ + Clean up VMs that have been inactive for longer than vm_lifetime_seconds. + This function should be called periodically by a background thread. + + Args: + vm_lifetime_seconds: Maximum lifetime in seconds for inactive VMs (default: 300) + """ + global _active_instances, _active_contexts, _last_activity + + current_time = time.time() + tasks_to_cleanup = [] + + with _instance_lock: + # Find all VMs that have been inactive for too long + for task_id, last_time in list(_last_activity.items()): + if current_time - last_time > vm_lifetime_seconds: + tasks_to_cleanup.append(task_id) + + # Clean up the inactive VMs + for task_id in tasks_to_cleanup: + try: + if task_id in _active_instances: + instance = _active_instances[task_id] + # Terminate the VM instance + if hasattr(instance, 'terminate'): + instance.terminate() + elif hasattr(instance, 'stop'): + instance.stop() + elif hasattr(instance, 'delete'): + instance.delete() + + # Remove from tracking dictionaries + del _active_instances[task_id] + print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}") + + if task_id in _active_contexts: + del _active_contexts[task_id] + + if task_id in _last_activity: + del _last_activity[task_id] + + except Exception as e: + print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}") + +def _cleanup_thread_worker(): + """ + Background thread worker that periodically cleans up inactive VMs. + Runs every 60 seconds. + """ + global _cleanup_running + + while _cleanup_running: + try: + vm_lifetime = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300")) + _cleanup_inactive_vms(vm_lifetime) + except Exception as e: + print(f"[VM Cleanup] Error in cleanup thread: {e}") + + # Sleep for 60 seconds, but check every second if we should stop + for _ in range(60): + if not _cleanup_running: + break + time.sleep(1) + +def _start_cleanup_thread(): + """ + Start the background cleanup thread if it's not already running. + """ + global _cleanup_thread, _cleanup_running + + with _instance_lock: + if _cleanup_thread is None or not _cleanup_thread.is_alive(): + _cleanup_running = True + _cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True) + _cleanup_thread.start() + +def _stop_cleanup_thread(): + """ + Stop the background cleanup thread. + """ + global _cleanup_running + _cleanup_running = False + if _cleanup_thread is not None: + _cleanup_thread.join(timeout=5) + +def cleanup_vm(task_id: str): + """ + Manually clean up a specific VM by task_id. + This should be called when a task is completed. + + Args: + task_id: The task ID of the VM to clean up + """ + global _active_instances, _active_contexts, _last_activity + + with _instance_lock: + try: + if task_id in _active_instances: + instance = _active_instances[task_id] + # Terminate the VM instance + if hasattr(instance, 'terminate'): + instance.terminate() + elif hasattr(instance, 'stop'): + instance.stop() + elif hasattr(instance, 'delete'): + instance.delete() + + # Remove from tracking dictionaries + del _active_instances[task_id] + print(f"[VM Cleanup] Manually terminated VM for task: {task_id}") + + if task_id in _active_contexts: + del _active_contexts[task_id] + + if task_id in _last_activity: + del _last_activity[task_id] + + except Exception as e: + print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}") + +# Register cleanup on program exit +atexit.register(_stop_cleanup_thread) def terminal_tool( command: Optional[str] = None, @@ -144,6 +277,7 @@ def terminal_tool( # Get configuration from environment vm_lifetime_seconds = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300")) + vm_ttl_seconds = int(os.getenv("HECATE_VM_TTL_SECONDS", "1200")) # 20 minutes default snapshot_id = os.getenv("HECATE_DEFAULT_SNAPSHOT_ID", "snapshot_defv9tjg") # Check API key @@ -160,17 +294,27 @@ def terminal_tool( # If no task_id provided, use "default" for backward compatibility effective_task_id = task_id or "default" + # Start the cleanup thread if not already running + _start_cleanup_thread() + # Get or create VM instance and execution context per task # This is critical for interactive session support - the context must persist! with _instance_lock: if effective_task_id not in _active_instances: morph_client = MorphCloudClient(api_key=morph_api_key) - _active_instances[effective_task_id] = morph_client.instances.start(snapshot_id=snapshot_id) + _active_instances[effective_task_id] = morph_client.instances.start( + snapshot_id=snapshot_id, + ttl_seconds=vm_ttl_seconds, + ttl_action="stop" + ) # Get or create persistent execution context per task if effective_task_id not in _active_contexts: _active_contexts[effective_task_id] = ExecutionContext() + # Update last activity time for this VM (resets the inactivity timer) + _last_activity[effective_task_id] = time.time() + instance = _active_instances[effective_task_id] ctx = _active_contexts[effective_task_id] @@ -303,5 +447,6 @@ if __name__ == "__main__": print("\nEnvironment Variables:") print(f" MORPH_API_KEY: {'Set' if os.getenv('MORPH_API_KEY') else 'Not set'}") print(f" OPENAI_API_KEY: {'Set' if os.getenv('OPENAI_API_KEY') else 'Not set (optional)'}") - print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300)") + print(f" HECATE_VM_TTL_SECONDS: {os.getenv('HECATE_VM_TTL_SECONDS', '1200')} (default: 1200 / 20 minutes)") + print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300 / 5 minutes)") print(f" HECATE_DEFAULT_SNAPSHOT_ID: {os.getenv('HECATE_DEFAULT_SNAPSHOT_ID', 'snapshot_defv9tjg')} (default: snapshot_defv9tjg)")