diff --git a/model_tools.py b/model_tools.py index 65580d7c7..023b0feff 100644 --- a/model_tools.py +++ b/model_tools.py @@ -20,11 +20,13 @@ Usage: """ import json +import asyncio from typing import Dict, Any, List # Import toolsets from web_tools import web_search_tool, web_extract_tool, web_crawl_tool, check_tavily_api_key from terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION +from vision_tools import vision_analyze_tool, check_vision_requirements def get_web_tool_definitions() -> List[Dict[str, Any]]: """ @@ -158,30 +160,234 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]: } ] -def get_tool_definitions() -> List[Dict[str, Any]]: + +def get_vision_tool_definitions() -> List[Dict[str, Any]]: """ - Get all available tool definitions for model API calls. - - This function aggregates tool definitions from all available toolsets. - Currently includes web tools, but can be extended to include other toolsets. + Get tool definitions for vision tools in OpenAI's expected format. Returns: - List[Dict]: Complete list of all available tool definitions + List[Dict]: List of vision tool definitions compatible with OpenAI API """ - tools = [] + return [ + { + "type": "function", + "function": { + "name": "vision_analyze", + "description": "Analyze images from URLs using AI vision. Provides comprehensive image description and answers specific questions about the image content. Perfect for understanding visual content, reading text in images, identifying objects, analyzing scenes, and extracting visual information.", + "parameters": { + "type": "object", + "properties": { + "image_url": { + "type": "string", + "description": "The URL of the image to analyze (must be publicly accessible HTTP/HTTPS URL)" + }, + "question": { + "type": "string", + "description": "Your specific question or request about the image to resolve. The AI will automatically provide a complete image description AND answer your specific question. Examples: 'What text can you read?', 'What architectural style is this?', 'Describe the mood and emotions', 'What safety hazards do you see?'" + }, + "model": { + "type": "string", + "description": "The vision model to use for analysis (optional, default: gemini-2.5-flash)", + "default": "gemini-2.5-flash" + } + }, + "required": ["image_url", "question"] + } + } + } + ] + + +def get_all_tool_names() -> List[str]: + """ + Get the names of all available tools across all toolsets. - # Add web tools - tools.extend(get_web_tool_definitions()) + Returns: + List[str]: List of all tool names + """ + tool_names = [] - # Add terminal tools - tools.extend(get_terminal_tool_definitions()) + # Web tools + if check_tavily_api_key(): + tool_names.extend(["web_search", "web_extract", "web_crawl"]) + + # Terminal tools + if check_hecate_requirements(): + tool_names.extend(["terminal"]) + + # Vision tools + if check_vision_requirements(): + tool_names.extend(["vision_analyze"]) # Future toolsets can be added here: - # tools.extend(get_file_tool_definitions()) - # tools.extend(get_code_tool_definitions()) - # tools.extend(get_database_tool_definitions()) + # if check_file_tools(): + # tool_names.extend(["file_read", "file_write"]) - return tools + return tool_names + + +def get_toolset_for_tool(tool_name: str) -> str: + """ + Get the toolset that a tool belongs to. + + Args: + tool_name (str): Name of the tool + + Returns: + str: Name of the toolset, or "unknown" if not found + """ + toolset_mapping = { + "web_search": "web_tools", + "web_extract": "web_tools", + "web_crawl": "web_tools", + "terminal": "terminal_tools", + "vision_analyze": "vision_tools" + # Future tools can be added here + } + + return toolset_mapping.get(tool_name, "unknown") + + +def get_tool_definitions( + enabled_tools: List[str] = None, + disabled_tools: List[str] = None, + enabled_toolsets: List[str] = None, + disabled_toolsets: List[str] = None +) -> List[Dict[str, Any]]: + """ + Get tool definitions for model API calls with optional filtering. + + This function aggregates tool definitions from all available toolsets + and applies filtering based on the provided parameters. + + Filter Priority (higher priority overrides lower): + 1. enabled_tools (highest priority - only these tools, overrides everything) + 2. disabled_tools (applied after toolset filtering) + 3. enabled_toolsets (only tools from these toolsets) + 4. disabled_toolsets (exclude tools from these toolsets) + + Args: + enabled_tools (List[str]): Only include these specific tools. If provided, + ONLY these tools will be included (overrides all other filters) + disabled_tools (List[str]): Exclude these specific tools (applied after toolset filtering) + enabled_toolsets (List[str]): Only include tools from these toolsets + disabled_toolsets (List[str]): Exclude tools from these toolsets + + Returns: + List[Dict]: Filtered list of tool definitions + + Examples: + # Only web tools + tools = get_tool_definitions(enabled_toolsets=["web_tools"]) + + # All tools except terminal + tools = get_tool_definitions(disabled_tools=["terminal"]) + + # Only specific tools (overrides toolset filters) + tools = get_tool_definitions(enabled_tools=["web_search", "web_extract"]) + + # Conflicting filters (enabled_tools wins) + tools = get_tool_definitions(enabled_toolsets=["web_tools"], enabled_tools=["terminal"]) + # Result: Only terminal tool (enabled_tools overrides enabled_toolsets) + """ + # Detect and warn about potential conflicts + conflicts_detected = False + + if enabled_tools and (enabled_toolsets or disabled_toolsets or disabled_tools): + print("⚠️ enabled_tools overrides all other filters") + conflicts_detected = True + + if enabled_toolsets and disabled_toolsets: + # Check for overlap + enabled_set = set(enabled_toolsets) + disabled_set = set(disabled_toolsets) + overlap = enabled_set & disabled_set + if overlap: + print(f"⚠️ Conflicting toolsets: {overlap} in both enabled and disabled") + print(f" → enabled_toolsets takes priority") + conflicts_detected = True + + if enabled_tools and disabled_tools: + # Check for overlap + enabled_set = set(enabled_tools) + disabled_set = set(disabled_tools) + overlap = enabled_set & disabled_set + if overlap: + print(f"⚠️ Conflicting tools: {overlap} in both enabled and disabled") + print(f" → enabled_tools takes priority") + conflicts_detected = True + + all_tools = [] + + # Collect all available tools from each toolset + toolset_tools = { + "web_tools": get_web_tool_definitions() if check_tavily_api_key() else [], + "terminal_tools": get_terminal_tool_definitions() if check_hecate_requirements() else [], + "vision_tools": get_vision_tool_definitions() if check_vision_requirements() else [] + # Future toolsets can be added here: + # "file_tools": get_file_tool_definitions() if check_file_tools() else [], + } + + # HIGHEST PRIORITY: enabled_tools (overrides everything) + if enabled_tools: + if conflicts_detected: + print(f"🎯 Using only enabled_tools: {enabled_tools}") + + # Collect all available tools first + all_available_tools = [] + for tools in toolset_tools.values(): + all_available_tools.extend(tools) + + # Only include specifically enabled tools + tool_names_to_include = set(enabled_tools) + filtered_tools = [ + tool for tool in all_available_tools + if tool["function"]["name"] in tool_names_to_include + ] + + # Warn about requested tools that aren't available + found_tools = {tool["function"]["name"] for tool in filtered_tools} + missing_tools = tool_names_to_include - found_tools + if missing_tools: + print(f"⚠️ Requested tools not available: {missing_tools}") + + return filtered_tools + + # Apply toolset-level filtering first + if enabled_toolsets: + # Only include tools from enabled toolsets + for toolset_name in enabled_toolsets: + if toolset_name in toolset_tools: + all_tools.extend(toolset_tools[toolset_name]) + else: + print(f"⚠️ Unknown toolset: {toolset_name}") + elif disabled_toolsets: + # Include all tools except from disabled toolsets + for toolset_name, tools in toolset_tools.items(): + if toolset_name not in disabled_toolsets: + all_tools.extend(tools) + else: + # Include all available tools + for tools in toolset_tools.values(): + all_tools.extend(tools) + + # Apply tool-level filtering (disabled_tools) + if disabled_tools: + tool_names_to_exclude = set(disabled_tools) + original_tools = [tool["function"]["name"] for tool in all_tools] + + all_tools = [ + tool for tool in all_tools + if tool["function"]["name"] not in tool_names_to_exclude + ] + + # Show what was actually filtered out + remaining_tools = {tool["function"]["name"] for tool in all_tools} + actually_excluded = set(original_tools) & tool_names_to_exclude + if actually_excluded: + print(f"🚫 Excluded tools: {actually_excluded}") + + return all_tools def handle_web_function_call(function_name: str, function_args: Dict[str, Any]) -> str: """ @@ -206,13 +412,15 @@ def handle_web_function_call(function_name: str, function_args: Dict[str, Any]) # Limit URLs to prevent abuse urls = urls[:5] if isinstance(urls, list) else [] format = function_args.get("format") - return web_extract_tool(urls, format) + # Run async function in event loop + return asyncio.run(web_extract_tool(urls, format)) elif function_name == "web_crawl": url = function_args.get("url", "") instructions = function_args.get("instructions") depth = function_args.get("depth", "basic") - return web_crawl_tool(url, instructions, depth) + # Run async function in event loop + return asyncio.run(web_crawl_tool(url, instructions, depth)) else: return json.dumps({"error": f"Unknown web function: {function_name}"}) @@ -240,6 +448,33 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A else: return json.dumps({"error": f"Unknown terminal function: {function_name}"}) + +def handle_vision_function_call(function_name: str, function_args: Dict[str, Any]) -> str: + """ + Handle function calls for vision tools. + + Args: + function_name (str): Name of the vision function to call + function_args (Dict): Arguments for the function + + Returns: + str: Function result as JSON string + """ + if function_name == "vision_analyze": + image_url = function_args.get("image_url", "") + question = function_args.get("question", "") + model = function_args.get("model", "gemini-2.5-flash") + + # Automatically prepend full description request to user's question + full_prompt = f"Fully describe and explain everything about this image\n\n{question}" + + # Run async function in event loop + return asyncio.run(vision_analyze_tool(image_url, full_prompt, model)) + + else: + return json.dumps({"error": f"Unknown vision function: {function_name}"}) + + def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> str: """ Main function call dispatcher that routes calls to appropriate toolsets. @@ -267,6 +502,10 @@ def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> s elif function_name in ["terminal"]: return handle_terminal_function_call(function_name, function_args) + # Route vision tools + elif function_name in ["vision_analyze"]: + return handle_vision_function_call(function_name, function_args) + # Future toolsets can be routed here: # elif function_name in ["file_read_tool", "file_write_tool"]: # return handle_file_function_call(function_name, function_args) @@ -302,6 +541,12 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]: "tools": ["terminal_tool"], "description": "Execute commands with optional interactive session support on Linux VMs", "requirements": ["MORPH_API_KEY environment variable", "hecate package"] + }, + "vision_tools": { + "available": check_vision_requirements(), + "tools": ["vision_analyze_tool"], + "description": "Analyze images from URLs using AI vision for comprehensive understanding", + "requirements": ["NOUS_API_KEY environment variable"] } # Future toolsets can be added here } @@ -317,7 +562,8 @@ def check_toolset_requirements() -> Dict[str, bool]: """ return { "web_tools": check_tavily_api_key(), - "terminal_tools": check_hecate_requirements() + "terminal_tools": check_hecate_requirements(), + "vision_tools": check_vision_requirements() } if __name__ == "__main__": @@ -334,13 +580,20 @@ if __name__ == "__main__": status = "✅" if available else "❌" print(f" {status} {toolset}: {'Available' if available else 'Missing requirements'}") - # Show available tools + # Show all available tool names + all_tool_names = get_all_tool_names() + print(f"\n🔧 Available Tools ({len(all_tool_names)} total):") + for tool_name in all_tool_names: + toolset = get_toolset_for_tool(tool_name) + print(f" 📌 {tool_name} (from {toolset})") + + # Show available tools with full definitions tools = get_tool_definitions() - print(f"\n🔧 Available Tools ({len(tools)} total):") + print(f"\n📝 Tool Definitions ({len(tools)} loaded):") for tool in tools: func_name = tool["function"]["name"] desc = tool["function"]["description"] - print(f" 📌 {func_name}: {desc[:80]}{'...' if len(desc) > 80 else ''}") + print(f" 🔹 {func_name}: {desc[:60]}{'...' if len(desc) > 60 else ''}") # Show toolset info toolsets = get_available_toolsets() @@ -351,7 +604,26 @@ if __name__ == "__main__": if not info["available"]: print(f" Requirements: {', '.join(info['requirements'])}") - print("\n💡 Usage Example:") + print("\n💡 Usage Examples:") print(" from model_tools import get_tool_definitions, handle_function_call") + print(" # All tools") print(" tools = get_tool_definitions()") - print(" result = handle_function_call('web_search_tool', {'query': 'Python'})") + print(" # Only web tools") + print(" tools = get_tool_definitions(enabled_toolsets=['web_tools'])") + print(" # Specific tools only") + print(" tools = get_tool_definitions(enabled_tools=['web_search', 'terminal'])") + print(" # All except terminal") + print(" tools = get_tool_definitions(disabled_tools=['terminal'])") + + # Example filtering + print(f"\n🧪 Filtering Examples:") + web_only = get_tool_definitions(enabled_toolsets=["web_tools"]) + print(f" Web tools only: {len(web_only)} tools") + + if len(all_tool_names) > 1: + specific_tools = get_tool_definitions(enabled_tools=["web_search"]) + print(f" Only web_search: {len(specific_tools)} tool(s)") + + if "terminal" in all_tool_names: + no_terminal = get_tool_definitions(disabled_tools=["terminal"]) + print(f" All except terminal: {len(no_terminal)} tools") diff --git a/run_agent.py b/run_agent.py index f289e2ac6..0520b14f4 100644 --- a/run_agent.py +++ b/run_agent.py @@ -45,7 +45,11 @@ class AIAgent: api_key: str = None, model: str = "gpt-4", max_iterations: int = 10, - tool_delay: float = 1.0 + tool_delay: float = 1.0, + enabled_tools: List[str] = None, + disabled_tools: List[str] = None, + enabled_toolsets: List[str] = None, + disabled_toolsets: List[str] = None ): """ Initialize the AI Agent. @@ -56,11 +60,21 @@ class AIAgent: model (str): Model name to use (default: "gpt-4") max_iterations (int): Maximum number of tool calling iterations (default: 10) tool_delay (float): Delay between tool calls in seconds (default: 1.0) + enabled_tools (List[str]): Only enable these specific tools (optional) + disabled_tools (List[str]): Disable these specific tools (optional) + enabled_toolsets (List[str]): Only enable tools from these toolsets (optional) + disabled_toolsets (List[str]): Disable tools from these toolsets (optional) """ self.model = model self.max_iterations = max_iterations self.tool_delay = tool_delay + # Store tool filtering options + self.enabled_tools = enabled_tools + self.disabled_tools = disabled_tools + self.enabled_toolsets = enabled_toolsets + self.disabled_toolsets = disabled_toolsets + # Initialize OpenAI client client_kwargs = {} if base_url: @@ -78,15 +92,37 @@ class AIAgent: except Exception as e: raise RuntimeError(f"Failed to initialize OpenAI client: {e}") - # Get available tools - self.tools = get_tool_definitions() - print(f"🛠️ Loaded {len(self.tools)} tools") + # Get available tools with filtering + self.tools = get_tool_definitions( + enabled_tools=enabled_tools, + disabled_tools=disabled_tools, + enabled_toolsets=enabled_toolsets, + disabled_toolsets=disabled_toolsets + ) + + # Show tool configuration + if self.tools: + tool_names = [tool["function"]["name"] for tool in self.tools] + print(f"🛠️ Loaded {len(self.tools)} tools: {', '.join(tool_names)}") + + # Show filtering info if applied + if enabled_tools: + print(f" ✅ Enabled tools: {', '.join(enabled_tools)}") + if disabled_tools: + print(f" ❌ Disabled tools: {', '.join(disabled_tools)}") + if enabled_toolsets: + print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}") + if disabled_toolsets: + print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}") + else: + print("🛠️ No tools loaded (all tools filtered out or unavailable)") # Check tool requirements - requirements = check_toolset_requirements() - missing_reqs = [name for name, available in requirements.items() if not available] - if missing_reqs: - print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}") + if self.tools: + requirements = check_toolset_requirements() + missing_reqs = [name for name, available in requirements.items() if not available] + if missing_reqs: + print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}") def create_system_message(self, custom_system: str = None) -> str: """ @@ -282,7 +318,12 @@ def main( model: str = "claude-opus-4-20250514", api_key: str = None, base_url: str = "https://api.anthropic.com/v1/", - max_turns: int = 10 + max_turns: int = 10, + enabled_tools: str = None, + disabled_tools: str = None, + enabled_toolsets: str = None, + disabled_toolsets: str = None, + list_tools: bool = False ): """ Main function for running the agent directly. @@ -293,17 +334,80 @@ def main( api_key (str): API key for authentication. Uses ANTHROPIC_API_KEY env var if not provided. base_url (str): Base URL for the model API. Defaults to https://api.anthropic.com/v1/ max_turns (int): Maximum number of API call iterations. Defaults to 10. + enabled_tools (str): Comma-separated list of tools to enable (e.g., "web_search,terminal") + disabled_tools (str): Comma-separated list of tools to disable (e.g., "terminal") + enabled_toolsets (str): Comma-separated list of toolsets to enable (e.g., "web_tools") + disabled_toolsets (str): Comma-separated list of toolsets to disable (e.g., "terminal_tools") + list_tools (bool): Just list available tools and exit """ print("🤖 AI Agent with Tool Calling") print("=" * 50) + # Handle tool listing + if list_tools: + from model_tools import get_all_tool_names, get_toolset_for_tool, get_available_toolsets + + print("📋 Available Tools & Toolsets:") + print("-" * 30) + + # Show toolsets + toolsets = get_available_toolsets() + print("📦 Toolsets:") + for name, info in toolsets.items(): + status = "✅" if info["available"] else "❌" + print(f" {status} {name}: {info['description']}") + if not info["available"]: + print(f" Requirements: {', '.join(info['requirements'])}") + + # Show individual tools + all_tools = get_all_tool_names() + print(f"\n🔧 Individual Tools ({len(all_tools)} available):") + for tool_name in all_tools: + toolset = get_toolset_for_tool(tool_name) + print(f" 📌 {tool_name} (from {toolset})") + + print(f"\n💡 Usage Examples:") + print(f" # Run with only web tools") + print(f" python run_agent.py --enabled_toolsets=web_tools --query='search for Python news'") + print(f" # Run with specific tools only") + print(f" python run_agent.py --enabled_tools=web_search,web_extract --query='research topic'") + print(f" # Run without terminal tools") + print(f" python run_agent.py --disabled_tools=terminal --query='web research only'") + return + + # Parse tool selection arguments + enabled_tools_list = None + disabled_tools_list = None + enabled_toolsets_list = None + disabled_toolsets_list = None + + if enabled_tools: + enabled_tools_list = [t.strip() for t in enabled_tools.split(",")] + print(f"🎯 Enabled tools: {enabled_tools_list}") + + if disabled_tools: + disabled_tools_list = [t.strip() for t in disabled_tools.split(",")] + print(f"🚫 Disabled tools: {disabled_tools_list}") + + if enabled_toolsets: + enabled_toolsets_list = [t.strip() for t in enabled_toolsets.split(",")] + print(f"🎯 Enabled toolsets: {enabled_toolsets_list}") + + if disabled_toolsets: + disabled_toolsets_list = [t.strip() for t in disabled_toolsets.split(",")] + print(f"🚫 Disabled toolsets: {disabled_toolsets_list}") + # Initialize agent with provided parameters try: agent = AIAgent( base_url=base_url, model=model, api_key=api_key, - max_iterations=max_turns + max_iterations=max_turns, + enabled_tools=enabled_tools_list, + disabled_tools=disabled_tools_list, + enabled_toolsets=enabled_toolsets_list, + disabled_toolsets=disabled_toolsets_list ) except RuntimeError as e: print(f"❌ Failed to initialize agent: {e}") diff --git a/test_run.sh b/test_run.sh index 424333f73..73be27b9f 100644 --- a/test_run.sh +++ b/test_run.sh @@ -1,6 +1,14 @@ +export WEB_TOOLS_DEBUG=true + python run_agent.py \ - --query "search up the latest docs on huggingface datasets in python 3.13 and write me basic example that's not in their docs. profile its performance" \ + --query "Tell me about this animal pictured: https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQi1nkrYXY-ijQv5aCxkwooyg2roNFxj0ewJA&s" \ --max_turns 30 \ --model claude-sonnet-4-20250514 \ --base_url https://api.anthropic.com/v1/ \ - --api_key $ANTHROPIC_API_KEY \ No newline at end of file + --api_key $ANTHROPIC_API_KEY \ + --enabled_toolsets=vision_tools + +#Possible Toolsets: +#web_tools +#vision_tools +#terminal_tools \ No newline at end of file diff --git a/vision_tools.py b/vision_tools.py new file mode 100644 index 000000000..3183713bd --- /dev/null +++ b/vision_tools.py @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 +""" +Vision Tools Module + +This module provides vision analysis tools that work with image URLs. +Uses Gemini Flash via Nous Research API for intelligent image understanding. + +Available tools: +- vision_analyze_tool: Analyze images from URLs with custom prompts + +Features: +- Comprehensive image description +- Context-aware analysis based on user queries +- Proper error handling and validation +- Debug logging support + +Usage: + from vision_tools import vision_analyze_tool + import asyncio + + # Analyze an image + result = await vision_analyze_tool( + image_url="https://example.com/image.jpg", + user_prompt="What architectural style is this building?" + ) +""" + +import json +import os +import asyncio +import uuid +import datetime +from pathlib import Path +from typing import Dict, Any, Optional +from openai import AsyncOpenAI + +# Initialize Nous Research API client for vision processing +nous_client = AsyncOpenAI( + api_key=os.getenv("NOUS_API_KEY"), + base_url="https://inference-api.nousresearch.com/v1" +) + +# Configuration for vision processing +DEFAULT_VISION_MODEL = "gemini-2.5-flash" + +# Debug mode configuration +DEBUG_MODE = os.getenv("VISION_TOOLS_DEBUG", "false").lower() == "true" +DEBUG_SESSION_ID = str(uuid.uuid4()) +DEBUG_LOG_PATH = Path("./logs") +DEBUG_DATA = { + "session_id": DEBUG_SESSION_ID, + "start_time": datetime.datetime.now().isoformat(), + "debug_enabled": DEBUG_MODE, + "tool_calls": [] +} if DEBUG_MODE else None + +# Create logs directory if debug mode is enabled +if DEBUG_MODE: + DEBUG_LOG_PATH.mkdir(exist_ok=True) + print(f"🐛 Vision debug mode enabled - Session ID: {DEBUG_SESSION_ID}") + + +def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None: + """ + Log a debug call entry to the global debug data structure. + + Args: + tool_name (str): Name of the tool being called + call_data (Dict[str, Any]): Data about the call including parameters and results + """ + if not DEBUG_MODE or not DEBUG_DATA: + return + + call_entry = { + "timestamp": datetime.datetime.now().isoformat(), + "tool_name": tool_name, + **call_data + } + + DEBUG_DATA["tool_calls"].append(call_entry) + + +def _save_debug_log() -> None: + """ + Save the current debug data to a JSON file in the logs directory. + """ + if not DEBUG_MODE or not DEBUG_DATA: + return + + try: + debug_filename = f"vision_tools_debug_{DEBUG_SESSION_ID}.json" + debug_filepath = DEBUG_LOG_PATH / debug_filename + + # Update end time + DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat() + DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"]) + + with open(debug_filepath, 'w', encoding='utf-8') as f: + json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False) + + print(f"🐛 Vision debug log saved: {debug_filepath}") + + except Exception as e: + print(f"❌ Error saving vision debug log: {str(e)}") + + +def _validate_image_url(url: str) -> bool: + """ + Basic validation of image URL format. + + Args: + url (str): The URL to validate + + Returns: + bool: True if URL appears to be valid, False otherwise + """ + if not url or not isinstance(url, str): + return False + + # Check if it's a valid URL format + if not (url.startswith('http://') or url.startswith('https://')): + return False + + # Check for common image extensions (optional, as URLs may not have extensions) + image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'] + + return True # Allow all HTTP/HTTPS URLs for flexibility + + +async def vision_analyze_tool( + image_url: str, + user_prompt: str, + model: str = DEFAULT_VISION_MODEL +) -> str: + """ + Analyze an image from a URL using vision AI. + + This tool processes images using Gemini Flash via Nous Research API. + The user_prompt parameter is expected to be pre-formatted by the calling + function (typically model_tools.py) to include both full description + requests and specific questions. + + Args: + image_url (str): The URL of the image to analyze + user_prompt (str): The pre-formatted prompt for the vision model + model (str): The vision model to use (default: gemini-2.5-flash) + + Returns: + str: JSON string containing the analysis results with the following structure: + { + "success": bool, + "analysis": str (defaults to error message if None) + } + + Raises: + Exception: If analysis fails or API key is not set + """ + debug_call_data = { + "parameters": { + "image_url": image_url, + "user_prompt": user_prompt, + "model": model + }, + "error": None, + "success": False, + "analysis_length": 0, + "model_used": model + } + + try: + print(f"🔍 Analyzing image from URL: {image_url[:60]}{'...' if len(image_url) > 60 else ''}") + print(f"📝 User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}") + + # Validate image URL + if not _validate_image_url(image_url): + raise ValueError("Invalid image URL format. Must start with http:// or https://") + + # Check API key availability + if not os.getenv("NOUS_API_KEY"): + raise ValueError("NOUS_API_KEY environment variable not set") + + # Use the prompt as provided (model_tools.py now handles full description formatting) + comprehensive_prompt = user_prompt + + # Prepare the message with image URL format + messages = [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": comprehensive_prompt + }, + { + "type": "image_url", + "image_url": { + "url": image_url + } + } + ] + } + ] + + print(f"🧠 Processing image with {model}...") + + # Call the vision API + response = await nous_client.chat.completions.create( + model=model, + messages=messages, + temperature=0.1, # Low temperature for consistent analysis + max_tokens=2000 # Generous limit for detailed analysis + ) + + # Extract the analysis + analysis = response.choices[0].message.content.strip() + analysis_length = len(analysis) + + print(f"✅ Image analysis completed ({analysis_length} characters)") + + # Prepare successful response + result = { + "success": True, + "analysis": analysis or "There was a problem with the request and the image could not be analyzed." + } + + debug_call_data["success"] = True + debug_call_data["analysis_length"] = analysis_length + + # Log debug information + _log_debug_call("vision_analyze_tool", debug_call_data) + _save_debug_log() + + return json.dumps(result, indent=2) + + except Exception as e: + error_msg = f"Error analyzing image: {str(e)}" + print(f"❌ {error_msg}") + + # Prepare error response + result = { + "success": False, + "analysis": "There was a problem with the request and the image could not be analyzed." + } + + debug_call_data["error"] = error_msg + _log_debug_call("vision_analyze_tool", debug_call_data) + _save_debug_log() + + return json.dumps(result, indent=2) + + +def check_nous_api_key() -> bool: + """ + Check if the Nous Research API key is available in environment variables. + + Returns: + bool: True if API key is set, False otherwise + """ + return bool(os.getenv("NOUS_API_KEY")) + + +def check_vision_requirements() -> bool: + """ + Check if all requirements for vision tools are met. + + Returns: + bool: True if requirements are met, False otherwise + """ + return check_nous_api_key() + + +def get_debug_session_info() -> Dict[str, Any]: + """ + Get information about the current debug session. + + Returns: + Dict[str, Any]: Dictionary containing debug session information + """ + if not DEBUG_MODE or not DEBUG_DATA: + return { + "enabled": False, + "session_id": None, + "log_path": None, + "total_calls": 0 + } + + return { + "enabled": True, + "session_id": DEBUG_SESSION_ID, + "log_path": str(DEBUG_LOG_PATH / f"vision_tools_debug_{DEBUG_SESSION_ID}.json"), + "total_calls": len(DEBUG_DATA["tool_calls"]) + } + + +if __name__ == "__main__": + """ + Simple test/demo when run directly + """ + print("👁️ Vision Tools Module") + print("=" * 40) + + # Check if API key is available + api_available = check_nous_api_key() + + if not api_available: + print("❌ NOUS_API_KEY environment variable not set") + print("Please set your API key: export NOUS_API_KEY='your-key-here'") + print("Get API key at: https://inference-api.nousresearch.com/") + exit(1) + else: + print("✅ Nous Research API key found") + + print("🛠️ Vision tools ready for use!") + print(f"🧠 Using model: {DEFAULT_VISION_MODEL}") + + # Show debug mode status + if DEBUG_MODE: + print(f"🐛 Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}") + print(f" Debug logs will be saved to: ./logs/vision_tools_debug_{DEBUG_SESSION_ID}.json") + else: + print("🐛 Debug mode disabled (set VISION_TOOLS_DEBUG=true to enable)") + + print("\nBasic usage:") + print(" from vision_tools import vision_analyze_tool") + print(" import asyncio") + print("") + print(" async def main():") + print(" result = await vision_analyze_tool(") + print(" image_url='https://example.com/image.jpg',") + print(" user_prompt='What do you see in this image?'") + print(" )") + print(" print(result)") + print(" asyncio.run(main())") + + print("\nExample prompts:") + print(" - 'What architectural style is this building?'") + print(" - 'Describe the emotions and mood in this image'") + print(" - 'What text can you read in this image?'") + print(" - 'Identify any safety hazards visible'") + print(" - 'What products or brands are shown?'") + + print("\nDebug mode:") + print(" # Enable debug logging") + print(" export VISION_TOOLS_DEBUG=true") + print(" # Debug logs capture all vision analysis calls and results") + print(" # Logs saved to: ./logs/vision_tools_debug_UUID.json") diff --git a/web_tools.py b/web_tools.py index 71b2bb224..38eb0b93d 100644 --- a/web_tools.py +++ b/web_tools.py @@ -19,6 +19,11 @@ LLM Processing: - Uses Nous Research API with Gemini 2.5 Flash for intelligent content extraction - Extracts key excerpts and creates markdown summaries to reduce token usage +Debug Mode: +- Set WEB_TOOLS_DEBUG=true to enable detailed logging +- Creates web_tools_debug_UUID.json in ./logs directory +- Captures all tool calls, results, and compression metrics + Usage: from web_tools import web_search_tool, web_extract_tool, web_crawl_tool @@ -40,6 +45,9 @@ import json import os import re import asyncio +import uuid +import datetime +from pathlib import Path from typing import List, Dict, Any, Optional from tavily import TavilyClient from openai import AsyncOpenAI @@ -57,6 +65,66 @@ nous_client = AsyncOpenAI( DEFAULT_SUMMARIZER_MODEL = "gemini-2.5-flash" DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000 +# Debug mode configuration +DEBUG_MODE = os.getenv("WEB_TOOLS_DEBUG", "false").lower() == "true" +DEBUG_SESSION_ID = str(uuid.uuid4()) +DEBUG_LOG_PATH = Path("./logs") +DEBUG_DATA = { + "session_id": DEBUG_SESSION_ID, + "start_time": datetime.datetime.now().isoformat(), + "debug_enabled": DEBUG_MODE, + "tool_calls": [] +} if DEBUG_MODE else None + +# Create logs directory if debug mode is enabled +if DEBUG_MODE: + DEBUG_LOG_PATH.mkdir(exist_ok=True) + print(f"🐛 Debug mode enabled - Session ID: {DEBUG_SESSION_ID}") + + +def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None: + """ + Log a debug call entry to the global debug data structure. + + Args: + tool_name (str): Name of the tool being called + call_data (Dict[str, Any]): Data about the call including parameters and results + """ + if not DEBUG_MODE or not DEBUG_DATA: + return + + call_entry = { + "timestamp": datetime.datetime.now().isoformat(), + "tool_name": tool_name, + **call_data + } + + DEBUG_DATA["tool_calls"].append(call_entry) + + +def _save_debug_log() -> None: + """ + Save the current debug data to a JSON file in the logs directory. + """ + if not DEBUG_MODE or not DEBUG_DATA: + return + + try: + debug_filename = f"web_tools_debug_{DEBUG_SESSION_ID}.json" + debug_filepath = DEBUG_LOG_PATH / debug_filename + + # Update end time + DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat() + DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"]) + + with open(debug_filepath, 'w', encoding='utf-8') as f: + json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False) + + print(f"🐛 Debug log saved: {debug_filepath}") + + except Exception as e: + print(f"❌ Error saving debug log: {str(e)}") + async def process_content_with_llm( content: str, @@ -208,21 +276,51 @@ def web_search_tool(query: str, limit: int = 5) -> str: Raises: Exception: If search fails or API key is not set """ + debug_call_data = { + "parameters": { + "query": query, + "limit": limit + }, + "error": None, + "results_count": 0, + "original_response_size": 0, + "final_response_size": 0 + } + try: print(f"🔍 Searching the web for: '{query}' (limit: {limit})") # Use Tavily's search functionality response = tavily_client.search(query=query, max_results=limit, search_depth="advanced") - print(f"✅ Found {len(response.get('results', []))} results") + results_count = len(response.get('results', [])) + print(f"✅ Found {results_count} results") + + # Capture debug information + debug_call_data["results_count"] = results_count + debug_call_data["original_response_size"] = len(json.dumps(response)) result_json = json.dumps(response, indent=2) # Clean base64 images from search results - return clean_base64_images(result_json) + cleaned_result = clean_base64_images(result_json) + + debug_call_data["final_response_size"] = len(cleaned_result) + debug_call_data["compression_applied"] = "base64_image_removal" + + # Log debug information + _log_debug_call("web_search_tool", debug_call_data) + _save_debug_log() + + return cleaned_result except Exception as e: error_msg = f"Error searching web: {str(e)}" print(f"❌ {error_msg}") + + debug_call_data["error"] = error_msg + _log_debug_call("web_search_tool", debug_call_data) + _save_debug_log() + return json.dumps({"error": error_msg}) @@ -253,17 +351,39 @@ async def web_extract_tool( Raises: Exception: If extraction fails or API key is not set """ + debug_call_data = { + "parameters": { + "urls": urls, + "format": format, + "use_llm_processing": use_llm_processing, + "model": model, + "min_length": min_length + }, + "error": None, + "pages_extracted": 0, + "pages_processed_with_llm": 0, + "original_response_size": 0, + "final_response_size": 0, + "compression_metrics": [], + "processing_applied": [] + } + try: print(f"📄 Extracting content from {len(urls)} URL(s)") # Use Tavily's extract functionality response = tavily_client.extract(urls=urls, format=format) - print(f"✅ Extracted content from {len(response.get('results', []))} pages") + pages_extracted = len(response.get('results', [])) + print(f"✅ Extracted content from {pages_extracted} pages") + + debug_call_data["pages_extracted"] = pages_extracted + debug_call_data["original_response_size"] = len(json.dumps(response)) # Process each result with LLM if enabled if use_llm_processing and os.getenv("NOUS_API_KEY"): print("🧠 Processing extracted content with LLM...") + debug_call_data["processing_applied"].append("llm_processing") for result in response.get('results', []): url = result.get('url', 'Unknown URL') @@ -271,24 +391,48 @@ async def web_extract_tool( raw_content = result.get('raw_content', '') or result.get('content', '') if raw_content: + original_size = len(raw_content) + # Process content with LLM processed = await process_content_with_llm( raw_content, url, title, model, min_length ) if processed: + processed_size = len(processed) + compression_ratio = processed_size / original_size if original_size > 0 else 1.0 + + # Capture compression metrics + debug_call_data["compression_metrics"].append({ + "url": url, + "original_size": original_size, + "processed_size": processed_size, + "compression_ratio": compression_ratio, + "model_used": model + }) + # Replace content with processed version result['content'] = processed # Keep raw content in separate field for reference result['raw_content'] = raw_content + debug_call_data["pages_processed_with_llm"] += 1 print(f" 📝 {url} (processed)") else: + debug_call_data["compression_metrics"].append({ + "url": url, + "original_size": original_size, + "processed_size": original_size, + "compression_ratio": 1.0, + "model_used": None, + "reason": "content_too_short" + }) print(f" 📝 {url} (no processing - content too short)") else: print(f" ⚠️ {url} (no content to process)") else: if use_llm_processing and not os.getenv("NOUS_API_KEY"): print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content") + debug_call_data["processing_applied"].append("llm_processing_unavailable") # Print summary of extracted pages for debugging (original behavior) for result in response.get('results', []): @@ -298,11 +442,25 @@ async def web_extract_tool( result_json = json.dumps(response, indent=2) # Clean base64 images from extracted content - return clean_base64_images(result_json) + cleaned_result = clean_base64_images(result_json) + + debug_call_data["final_response_size"] = len(cleaned_result) + debug_call_data["processing_applied"].append("base64_image_removal") + + # Log debug information + _log_debug_call("web_extract_tool", debug_call_data) + _save_debug_log() + + return cleaned_result except Exception as e: error_msg = f"Error extracting content: {str(e)}" print(f"❌ {error_msg}") + + debug_call_data["error"] = error_msg + _log_debug_call("web_extract_tool", debug_call_data) + _save_debug_log() + return json.dumps({"error": error_msg}) @@ -336,6 +494,24 @@ async def web_crawl_tool( Raises: Exception: If crawling fails or API key is not set """ + debug_call_data = { + "parameters": { + "url": url, + "instructions": instructions, + "depth": depth, + "use_llm_processing": use_llm_processing, + "model": model, + "min_length": min_length + }, + "error": None, + "pages_crawled": 0, + "pages_processed_with_llm": 0, + "original_response_size": 0, + "final_response_size": 0, + "compression_metrics": [], + "processing_applied": [] + } + try: instructions_text = f" with instructions: '{instructions}'" if instructions else "" print(f"🕷️ Crawling {url}{instructions_text}") @@ -348,11 +524,16 @@ async def web_crawl_tool( extract_depth=depth ) - print(f"✅ Crawled {len(response.get('results', []))} pages") + pages_crawled = len(response.get('results', [])) + print(f"✅ Crawled {pages_crawled} pages") + + debug_call_data["pages_crawled"] = pages_crawled + debug_call_data["original_response_size"] = len(json.dumps(response)) # Process each result with LLM if enabled if use_llm_processing and os.getenv("NOUS_API_KEY"): print("🧠 Processing crawled content with LLM...") + debug_call_data["processing_applied"].append("llm_processing") for result in response.get('results', []): page_url = result.get('url', 'Unknown URL') @@ -360,24 +541,48 @@ async def web_crawl_tool( content = result.get('content', '') if content: + original_size = len(content) + # Process content with LLM processed = await process_content_with_llm( content, page_url, title, model, min_length ) if processed: + processed_size = len(processed) + compression_ratio = processed_size / original_size if original_size > 0 else 1.0 + + # Capture compression metrics + debug_call_data["compression_metrics"].append({ + "url": page_url, + "original_size": original_size, + "processed_size": processed_size, + "compression_ratio": compression_ratio, + "model_used": model + }) + # Keep original content in raw_content field result['raw_content'] = content # Replace content with processed version result['content'] = processed + debug_call_data["pages_processed_with_llm"] += 1 print(f" 🌐 {page_url} (processed)") else: + debug_call_data["compression_metrics"].append({ + "url": page_url, + "original_size": original_size, + "processed_size": original_size, + "compression_ratio": 1.0, + "model_used": None, + "reason": "content_too_short" + }) print(f" 🌐 {page_url} (no processing - content too short)") else: print(f" ⚠️ {page_url} (no content to process)") else: if use_llm_processing and not os.getenv("NOUS_API_KEY"): print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content") + debug_call_data["processing_applied"].append("llm_processing_unavailable") # Print summary of crawled pages for debugging (original behavior) for result in response.get('results', []): @@ -387,11 +592,25 @@ async def web_crawl_tool( result_json = json.dumps(response, indent=2) # Clean base64 images from crawled content - return clean_base64_images(result_json) + cleaned_result = clean_base64_images(result_json) + + debug_call_data["final_response_size"] = len(cleaned_result) + debug_call_data["processing_applied"].append("base64_image_removal") + + # Log debug information + _log_debug_call("web_crawl_tool", debug_call_data) + _save_debug_log() + + return cleaned_result except Exception as e: error_msg = f"Error crawling website: {str(e)}" print(f"❌ {error_msg}") + + debug_call_data["error"] = error_msg + _log_debug_call("web_crawl_tool", debug_call_data) + _save_debug_log() + return json.dumps({"error": error_msg}) @@ -416,6 +635,33 @@ def check_nous_api_key() -> bool: return bool(os.getenv("NOUS_API_KEY")) +def get_debug_session_info() -> Dict[str, Any]: + """ + Get information about the current debug session. + + Returns: + Dict[str, Any]: Dictionary containing debug session information: + - enabled: Whether debug mode is enabled + - session_id: Current session UUID (if enabled) + - log_path: Path where debug logs are saved (if enabled) + - total_calls: Number of tool calls logged so far (if enabled) + """ + if not DEBUG_MODE or not DEBUG_DATA: + return { + "enabled": False, + "session_id": None, + "log_path": None, + "total_calls": 0 + } + + return { + "enabled": True, + "session_id": DEBUG_SESSION_ID, + "log_path": str(DEBUG_LOG_PATH / f"web_tools_debug_{DEBUG_SESSION_ID}.json"), + "total_calls": len(DEBUG_DATA["tool_calls"]) + } + + if __name__ == "__main__": """ Simple test/demo when run directly @@ -451,6 +697,13 @@ if __name__ == "__main__": print("🧠 LLM content processing available with Gemini 2.5 Flash") print(f" Default min length for processing: {DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION} chars") + # Show debug mode status + if DEBUG_MODE: + print(f"🐛 Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}") + print(f" Debug logs will be saved to: ./logs/web_tools_debug_{DEBUG_SESSION_ID}.json") + else: + print("🐛 Debug mode disabled (set WEB_TOOLS_DEBUG=true to enable)") + print("\nBasic usage:") print(" from web_tools import web_search_tool, web_extract_tool, web_crawl_tool") print(" import asyncio") @@ -480,4 +733,14 @@ if __name__ == "__main__": print(" # Disable LLM processing") print(" raw_content = await web_extract_tool(['https://example.com'], use_llm_processing=False)") + print("\nDebug mode:") + print(" # Enable debug logging") + print(" export WEB_TOOLS_DEBUG=true") + print(" # Debug logs capture:") + print(" # - All tool calls with parameters") + print(" # - Original API responses") + print(" # - LLM compression metrics") + print(" # - Final processed results") + print(" # Logs saved to: ./logs/web_tools_debug_UUID.json") + print(f"\n📝 Run 'python test_web_tools_llm.py' to test LLM processing capabilities")