add vision model tool, cli updates for exclusive and inclusive toolsets

This commit is contained in:
Teknium
2025-08-04 00:14:16 -07:00
parent bf4223f381
commit cde7e64418
5 changed files with 1034 additions and 41 deletions

View File

@@ -20,11 +20,13 @@ Usage:
"""
import json
import asyncio
from typing import Dict, Any, List
# Import toolsets
from web_tools import web_search_tool, web_extract_tool, web_crawl_tool, check_tavily_api_key
from terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION
from vision_tools import vision_analyze_tool, check_vision_requirements
def get_web_tool_definitions() -> List[Dict[str, Any]]:
"""
@@ -158,30 +160,234 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
}
]
def get_tool_definitions() -> List[Dict[str, Any]]:
def get_vision_tool_definitions() -> List[Dict[str, Any]]:
"""
Get all available tool definitions for model API calls.
This function aggregates tool definitions from all available toolsets.
Currently includes web tools, but can be extended to include other toolsets.
Get tool definitions for vision tools in OpenAI's expected format.
Returns:
List[Dict]: Complete list of all available tool definitions
List[Dict]: List of vision tool definitions compatible with OpenAI API
"""
tools = []
return [
{
"type": "function",
"function": {
"name": "vision_analyze",
"description": "Analyze images from URLs using AI vision. Provides comprehensive image description and answers specific questions about the image content. Perfect for understanding visual content, reading text in images, identifying objects, analyzing scenes, and extracting visual information.",
"parameters": {
"type": "object",
"properties": {
"image_url": {
"type": "string",
"description": "The URL of the image to analyze (must be publicly accessible HTTP/HTTPS URL)"
},
"question": {
"type": "string",
"description": "Your specific question or request about the image to resolve. The AI will automatically provide a complete image description AND answer your specific question. Examples: 'What text can you read?', 'What architectural style is this?', 'Describe the mood and emotions', 'What safety hazards do you see?'"
},
"model": {
"type": "string",
"description": "The vision model to use for analysis (optional, default: gemini-2.5-flash)",
"default": "gemini-2.5-flash"
}
},
"required": ["image_url", "question"]
}
}
}
]
def get_all_tool_names() -> List[str]:
"""
Get the names of all available tools across all toolsets.
# Add web tools
tools.extend(get_web_tool_definitions())
Returns:
List[str]: List of all tool names
"""
tool_names = []
# Add terminal tools
tools.extend(get_terminal_tool_definitions())
# Web tools
if check_tavily_api_key():
tool_names.extend(["web_search", "web_extract", "web_crawl"])
# Terminal tools
if check_hecate_requirements():
tool_names.extend(["terminal"])
# Vision tools
if check_vision_requirements():
tool_names.extend(["vision_analyze"])
# Future toolsets can be added here:
# tools.extend(get_file_tool_definitions())
# tools.extend(get_code_tool_definitions())
# tools.extend(get_database_tool_definitions())
# if check_file_tools():
# tool_names.extend(["file_read", "file_write"])
return tools
return tool_names
def get_toolset_for_tool(tool_name: str) -> str:
"""
Get the toolset that a tool belongs to.
Args:
tool_name (str): Name of the tool
Returns:
str: Name of the toolset, or "unknown" if not found
"""
toolset_mapping = {
"web_search": "web_tools",
"web_extract": "web_tools",
"web_crawl": "web_tools",
"terminal": "terminal_tools",
"vision_analyze": "vision_tools"
# Future tools can be added here
}
return toolset_mapping.get(tool_name, "unknown")
def get_tool_definitions(
enabled_tools: List[str] = None,
disabled_tools: List[str] = None,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None
) -> List[Dict[str, Any]]:
"""
Get tool definitions for model API calls with optional filtering.
This function aggregates tool definitions from all available toolsets
and applies filtering based on the provided parameters.
Filter Priority (higher priority overrides lower):
1. enabled_tools (highest priority - only these tools, overrides everything)
2. disabled_tools (applied after toolset filtering)
3. enabled_toolsets (only tools from these toolsets)
4. disabled_toolsets (exclude tools from these toolsets)
Args:
enabled_tools (List[str]): Only include these specific tools. If provided,
ONLY these tools will be included (overrides all other filters)
disabled_tools (List[str]): Exclude these specific tools (applied after toolset filtering)
enabled_toolsets (List[str]): Only include tools from these toolsets
disabled_toolsets (List[str]): Exclude tools from these toolsets
Returns:
List[Dict]: Filtered list of tool definitions
Examples:
# Only web tools
tools = get_tool_definitions(enabled_toolsets=["web_tools"])
# All tools except terminal
tools = get_tool_definitions(disabled_tools=["terminal"])
# Only specific tools (overrides toolset filters)
tools = get_tool_definitions(enabled_tools=["web_search", "web_extract"])
# Conflicting filters (enabled_tools wins)
tools = get_tool_definitions(enabled_toolsets=["web_tools"], enabled_tools=["terminal"])
# Result: Only terminal tool (enabled_tools overrides enabled_toolsets)
"""
# Detect and warn about potential conflicts
conflicts_detected = False
if enabled_tools and (enabled_toolsets or disabled_toolsets or disabled_tools):
print("⚠️ enabled_tools overrides all other filters")
conflicts_detected = True
if enabled_toolsets and disabled_toolsets:
# Check for overlap
enabled_set = set(enabled_toolsets)
disabled_set = set(disabled_toolsets)
overlap = enabled_set & disabled_set
if overlap:
print(f"⚠️ Conflicting toolsets: {overlap} in both enabled and disabled")
print(f" → enabled_toolsets takes priority")
conflicts_detected = True
if enabled_tools and disabled_tools:
# Check for overlap
enabled_set = set(enabled_tools)
disabled_set = set(disabled_tools)
overlap = enabled_set & disabled_set
if overlap:
print(f"⚠️ Conflicting tools: {overlap} in both enabled and disabled")
print(f" → enabled_tools takes priority")
conflicts_detected = True
all_tools = []
# Collect all available tools from each toolset
toolset_tools = {
"web_tools": get_web_tool_definitions() if check_tavily_api_key() else [],
"terminal_tools": get_terminal_tool_definitions() if check_hecate_requirements() else [],
"vision_tools": get_vision_tool_definitions() if check_vision_requirements() else []
# Future toolsets can be added here:
# "file_tools": get_file_tool_definitions() if check_file_tools() else [],
}
# HIGHEST PRIORITY: enabled_tools (overrides everything)
if enabled_tools:
if conflicts_detected:
print(f"🎯 Using only enabled_tools: {enabled_tools}")
# Collect all available tools first
all_available_tools = []
for tools in toolset_tools.values():
all_available_tools.extend(tools)
# Only include specifically enabled tools
tool_names_to_include = set(enabled_tools)
filtered_tools = [
tool for tool in all_available_tools
if tool["function"]["name"] in tool_names_to_include
]
# Warn about requested tools that aren't available
found_tools = {tool["function"]["name"] for tool in filtered_tools}
missing_tools = tool_names_to_include - found_tools
if missing_tools:
print(f"⚠️ Requested tools not available: {missing_tools}")
return filtered_tools
# Apply toolset-level filtering first
if enabled_toolsets:
# Only include tools from enabled toolsets
for toolset_name in enabled_toolsets:
if toolset_name in toolset_tools:
all_tools.extend(toolset_tools[toolset_name])
else:
print(f"⚠️ Unknown toolset: {toolset_name}")
elif disabled_toolsets:
# Include all tools except from disabled toolsets
for toolset_name, tools in toolset_tools.items():
if toolset_name not in disabled_toolsets:
all_tools.extend(tools)
else:
# Include all available tools
for tools in toolset_tools.values():
all_tools.extend(tools)
# Apply tool-level filtering (disabled_tools)
if disabled_tools:
tool_names_to_exclude = set(disabled_tools)
original_tools = [tool["function"]["name"] for tool in all_tools]
all_tools = [
tool for tool in all_tools
if tool["function"]["name"] not in tool_names_to_exclude
]
# Show what was actually filtered out
remaining_tools = {tool["function"]["name"] for tool in all_tools}
actually_excluded = set(original_tools) & tool_names_to_exclude
if actually_excluded:
print(f"🚫 Excluded tools: {actually_excluded}")
return all_tools
def handle_web_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
"""
@@ -206,13 +412,15 @@ def handle_web_function_call(function_name: str, function_args: Dict[str, Any])
# Limit URLs to prevent abuse
urls = urls[:5] if isinstance(urls, list) else []
format = function_args.get("format")
return web_extract_tool(urls, format)
# Run async function in event loop
return asyncio.run(web_extract_tool(urls, format))
elif function_name == "web_crawl":
url = function_args.get("url", "")
instructions = function_args.get("instructions")
depth = function_args.get("depth", "basic")
return web_crawl_tool(url, instructions, depth)
# Run async function in event loop
return asyncio.run(web_crawl_tool(url, instructions, depth))
else:
return json.dumps({"error": f"Unknown web function: {function_name}"})
@@ -240,6 +448,33 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
else:
return json.dumps({"error": f"Unknown terminal function: {function_name}"})
def handle_vision_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
"""
Handle function calls for vision tools.
Args:
function_name (str): Name of the vision function to call
function_args (Dict): Arguments for the function
Returns:
str: Function result as JSON string
"""
if function_name == "vision_analyze":
image_url = function_args.get("image_url", "")
question = function_args.get("question", "")
model = function_args.get("model", "gemini-2.5-flash")
# Automatically prepend full description request to user's question
full_prompt = f"Fully describe and explain everything about this image\n\n{question}"
# Run async function in event loop
return asyncio.run(vision_analyze_tool(image_url, full_prompt, model))
else:
return json.dumps({"error": f"Unknown vision function: {function_name}"})
def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
"""
Main function call dispatcher that routes calls to appropriate toolsets.
@@ -267,6 +502,10 @@ def handle_function_call(function_name: str, function_args: Dict[str, Any]) -> s
elif function_name in ["terminal"]:
return handle_terminal_function_call(function_name, function_args)
# Route vision tools
elif function_name in ["vision_analyze"]:
return handle_vision_function_call(function_name, function_args)
# Future toolsets can be routed here:
# elif function_name in ["file_read_tool", "file_write_tool"]:
# return handle_file_function_call(function_name, function_args)
@@ -302,6 +541,12 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
"tools": ["terminal_tool"],
"description": "Execute commands with optional interactive session support on Linux VMs",
"requirements": ["MORPH_API_KEY environment variable", "hecate package"]
},
"vision_tools": {
"available": check_vision_requirements(),
"tools": ["vision_analyze_tool"],
"description": "Analyze images from URLs using AI vision for comprehensive understanding",
"requirements": ["NOUS_API_KEY environment variable"]
}
# Future toolsets can be added here
}
@@ -317,7 +562,8 @@ def check_toolset_requirements() -> Dict[str, bool]:
"""
return {
"web_tools": check_tavily_api_key(),
"terminal_tools": check_hecate_requirements()
"terminal_tools": check_hecate_requirements(),
"vision_tools": check_vision_requirements()
}
if __name__ == "__main__":
@@ -334,13 +580,20 @@ if __name__ == "__main__":
status = "" if available else ""
print(f" {status} {toolset}: {'Available' if available else 'Missing requirements'}")
# Show available tools
# Show all available tool names
all_tool_names = get_all_tool_names()
print(f"\n🔧 Available Tools ({len(all_tool_names)} total):")
for tool_name in all_tool_names:
toolset = get_toolset_for_tool(tool_name)
print(f" 📌 {tool_name} (from {toolset})")
# Show available tools with full definitions
tools = get_tool_definitions()
print(f"\n🔧 Available Tools ({len(tools)} total):")
print(f"\n📝 Tool Definitions ({len(tools)} loaded):")
for tool in tools:
func_name = tool["function"]["name"]
desc = tool["function"]["description"]
print(f" 📌 {func_name}: {desc[:80]}{'...' if len(desc) > 80 else ''}")
print(f" 🔹 {func_name}: {desc[:60]}{'...' if len(desc) > 60 else ''}")
# Show toolset info
toolsets = get_available_toolsets()
@@ -351,7 +604,26 @@ if __name__ == "__main__":
if not info["available"]:
print(f" Requirements: {', '.join(info['requirements'])}")
print("\n💡 Usage Example:")
print("\n💡 Usage Examples:")
print(" from model_tools import get_tool_definitions, handle_function_call")
print(" # All tools")
print(" tools = get_tool_definitions()")
print(" result = handle_function_call('web_search_tool', {'query': 'Python'})")
print(" # Only web tools")
print(" tools = get_tool_definitions(enabled_toolsets=['web_tools'])")
print(" # Specific tools only")
print(" tools = get_tool_definitions(enabled_tools=['web_search', 'terminal'])")
print(" # All except terminal")
print(" tools = get_tool_definitions(disabled_tools=['terminal'])")
# Example filtering
print(f"\n🧪 Filtering Examples:")
web_only = get_tool_definitions(enabled_toolsets=["web_tools"])
print(f" Web tools only: {len(web_only)} tools")
if len(all_tool_names) > 1:
specific_tools = get_tool_definitions(enabled_tools=["web_search"])
print(f" Only web_search: {len(specific_tools)} tool(s)")
if "terminal" in all_tool_names:
no_terminal = get_tool_definitions(disabled_tools=["terminal"])
print(f" All except terminal: {len(no_terminal)} tools")

View File

@@ -45,7 +45,11 @@ class AIAgent:
api_key: str = None,
model: str = "gpt-4",
max_iterations: int = 10,
tool_delay: float = 1.0
tool_delay: float = 1.0,
enabled_tools: List[str] = None,
disabled_tools: List[str] = None,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None
):
"""
Initialize the AI Agent.
@@ -56,11 +60,21 @@ class AIAgent:
model (str): Model name to use (default: "gpt-4")
max_iterations (int): Maximum number of tool calling iterations (default: 10)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
enabled_tools (List[str]): Only enable these specific tools (optional)
disabled_tools (List[str]): Disable these specific tools (optional)
enabled_toolsets (List[str]): Only enable tools from these toolsets (optional)
disabled_toolsets (List[str]): Disable tools from these toolsets (optional)
"""
self.model = model
self.max_iterations = max_iterations
self.tool_delay = tool_delay
# Store tool filtering options
self.enabled_tools = enabled_tools
self.disabled_tools = disabled_tools
self.enabled_toolsets = enabled_toolsets
self.disabled_toolsets = disabled_toolsets
# Initialize OpenAI client
client_kwargs = {}
if base_url:
@@ -78,15 +92,37 @@ class AIAgent:
except Exception as e:
raise RuntimeError(f"Failed to initialize OpenAI client: {e}")
# Get available tools
self.tools = get_tool_definitions()
print(f"🛠️ Loaded {len(self.tools)} tools")
# Get available tools with filtering
self.tools = get_tool_definitions(
enabled_tools=enabled_tools,
disabled_tools=disabled_tools,
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets
)
# Show tool configuration
if self.tools:
tool_names = [tool["function"]["name"] for tool in self.tools]
print(f"🛠️ Loaded {len(self.tools)} tools: {', '.join(tool_names)}")
# Show filtering info if applied
if enabled_tools:
print(f" ✅ Enabled tools: {', '.join(enabled_tools)}")
if disabled_tools:
print(f" ❌ Disabled tools: {', '.join(disabled_tools)}")
if enabled_toolsets:
print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}")
if disabled_toolsets:
print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}")
else:
print("🛠️ No tools loaded (all tools filtered out or unavailable)")
# Check tool requirements
requirements = check_toolset_requirements()
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
if self.tools:
requirements = check_toolset_requirements()
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
def create_system_message(self, custom_system: str = None) -> str:
"""
@@ -282,7 +318,12 @@ def main(
model: str = "claude-opus-4-20250514",
api_key: str = None,
base_url: str = "https://api.anthropic.com/v1/",
max_turns: int = 10
max_turns: int = 10,
enabled_tools: str = None,
disabled_tools: str = None,
enabled_toolsets: str = None,
disabled_toolsets: str = None,
list_tools: bool = False
):
"""
Main function for running the agent directly.
@@ -293,17 +334,80 @@ def main(
api_key (str): API key for authentication. Uses ANTHROPIC_API_KEY env var if not provided.
base_url (str): Base URL for the model API. Defaults to https://api.anthropic.com/v1/
max_turns (int): Maximum number of API call iterations. Defaults to 10.
enabled_tools (str): Comma-separated list of tools to enable (e.g., "web_search,terminal")
disabled_tools (str): Comma-separated list of tools to disable (e.g., "terminal")
enabled_toolsets (str): Comma-separated list of toolsets to enable (e.g., "web_tools")
disabled_toolsets (str): Comma-separated list of toolsets to disable (e.g., "terminal_tools")
list_tools (bool): Just list available tools and exit
"""
print("🤖 AI Agent with Tool Calling")
print("=" * 50)
# Handle tool listing
if list_tools:
from model_tools import get_all_tool_names, get_toolset_for_tool, get_available_toolsets
print("📋 Available Tools & Toolsets:")
print("-" * 30)
# Show toolsets
toolsets = get_available_toolsets()
print("📦 Toolsets:")
for name, info in toolsets.items():
status = "" if info["available"] else ""
print(f" {status} {name}: {info['description']}")
if not info["available"]:
print(f" Requirements: {', '.join(info['requirements'])}")
# Show individual tools
all_tools = get_all_tool_names()
print(f"\n🔧 Individual Tools ({len(all_tools)} available):")
for tool_name in all_tools:
toolset = get_toolset_for_tool(tool_name)
print(f" 📌 {tool_name} (from {toolset})")
print(f"\n💡 Usage Examples:")
print(f" # Run with only web tools")
print(f" python run_agent.py --enabled_toolsets=web_tools --query='search for Python news'")
print(f" # Run with specific tools only")
print(f" python run_agent.py --enabled_tools=web_search,web_extract --query='research topic'")
print(f" # Run without terminal tools")
print(f" python run_agent.py --disabled_tools=terminal --query='web research only'")
return
# Parse tool selection arguments
enabled_tools_list = None
disabled_tools_list = None
enabled_toolsets_list = None
disabled_toolsets_list = None
if enabled_tools:
enabled_tools_list = [t.strip() for t in enabled_tools.split(",")]
print(f"🎯 Enabled tools: {enabled_tools_list}")
if disabled_tools:
disabled_tools_list = [t.strip() for t in disabled_tools.split(",")]
print(f"🚫 Disabled tools: {disabled_tools_list}")
if enabled_toolsets:
enabled_toolsets_list = [t.strip() for t in enabled_toolsets.split(",")]
print(f"🎯 Enabled toolsets: {enabled_toolsets_list}")
if disabled_toolsets:
disabled_toolsets_list = [t.strip() for t in disabled_toolsets.split(",")]
print(f"🚫 Disabled toolsets: {disabled_toolsets_list}")
# Initialize agent with provided parameters
try:
agent = AIAgent(
base_url=base_url,
model=model,
api_key=api_key,
max_iterations=max_turns
max_iterations=max_turns,
enabled_tools=enabled_tools_list,
disabled_tools=disabled_tools_list,
enabled_toolsets=enabled_toolsets_list,
disabled_toolsets=disabled_toolsets_list
)
except RuntimeError as e:
print(f"❌ Failed to initialize agent: {e}")

View File

@@ -1,6 +1,14 @@
export WEB_TOOLS_DEBUG=true
python run_agent.py \
--query "search up the latest docs on huggingface datasets in python 3.13 and write me basic example that's not in their docs. profile its performance" \
--query "Tell me about this animal pictured: https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQi1nkrYXY-ijQv5aCxkwooyg2roNFxj0ewJA&s" \
--max_turns 30 \
--model claude-sonnet-4-20250514 \
--base_url https://api.anthropic.com/v1/ \
--api_key $ANTHROPIC_API_KEY
--api_key $ANTHROPIC_API_KEY \
--enabled_toolsets=vision_tools
#Possible Toolsets:
#web_tools
#vision_tools
#terminal_tools

346
vision_tools.py Normal file
View File

@@ -0,0 +1,346 @@
#!/usr/bin/env python3
"""
Vision Tools Module
This module provides vision analysis tools that work with image URLs.
Uses Gemini Flash via Nous Research API for intelligent image understanding.
Available tools:
- vision_analyze_tool: Analyze images from URLs with custom prompts
Features:
- Comprehensive image description
- Context-aware analysis based on user queries
- Proper error handling and validation
- Debug logging support
Usage:
from vision_tools import vision_analyze_tool
import asyncio
# Analyze an image
result = await vision_analyze_tool(
image_url="https://example.com/image.jpg",
user_prompt="What architectural style is this building?"
)
"""
import json
import os
import asyncio
import uuid
import datetime
from pathlib import Path
from typing import Dict, Any, Optional
from openai import AsyncOpenAI
# Initialize Nous Research API client for vision processing
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
# Configuration for vision processing
DEFAULT_VISION_MODEL = "gemini-2.5-flash"
# Debug mode configuration
DEBUG_MODE = os.getenv("VISION_TOOLS_DEBUG", "false").lower() == "true"
DEBUG_SESSION_ID = str(uuid.uuid4())
DEBUG_LOG_PATH = Path("./logs")
DEBUG_DATA = {
"session_id": DEBUG_SESSION_ID,
"start_time": datetime.datetime.now().isoformat(),
"debug_enabled": DEBUG_MODE,
"tool_calls": []
} if DEBUG_MODE else None
# Create logs directory if debug mode is enabled
if DEBUG_MODE:
DEBUG_LOG_PATH.mkdir(exist_ok=True)
print(f"🐛 Vision debug mode enabled - Session ID: {DEBUG_SESSION_ID}")
def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None:
"""
Log a debug call entry to the global debug data structure.
Args:
tool_name (str): Name of the tool being called
call_data (Dict[str, Any]): Data about the call including parameters and results
"""
if not DEBUG_MODE or not DEBUG_DATA:
return
call_entry = {
"timestamp": datetime.datetime.now().isoformat(),
"tool_name": tool_name,
**call_data
}
DEBUG_DATA["tool_calls"].append(call_entry)
def _save_debug_log() -> None:
"""
Save the current debug data to a JSON file in the logs directory.
"""
if not DEBUG_MODE or not DEBUG_DATA:
return
try:
debug_filename = f"vision_tools_debug_{DEBUG_SESSION_ID}.json"
debug_filepath = DEBUG_LOG_PATH / debug_filename
# Update end time
DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat()
DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"])
with open(debug_filepath, 'w', encoding='utf-8') as f:
json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False)
print(f"🐛 Vision debug log saved: {debug_filepath}")
except Exception as e:
print(f"❌ Error saving vision debug log: {str(e)}")
def _validate_image_url(url: str) -> bool:
"""
Basic validation of image URL format.
Args:
url (str): The URL to validate
Returns:
bool: True if URL appears to be valid, False otherwise
"""
if not url or not isinstance(url, str):
return False
# Check if it's a valid URL format
if not (url.startswith('http://') or url.startswith('https://')):
return False
# Check for common image extensions (optional, as URLs may not have extensions)
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg']
return True # Allow all HTTP/HTTPS URLs for flexibility
async def vision_analyze_tool(
image_url: str,
user_prompt: str,
model: str = DEFAULT_VISION_MODEL
) -> str:
"""
Analyze an image from a URL using vision AI.
This tool processes images using Gemini Flash via Nous Research API.
The user_prompt parameter is expected to be pre-formatted by the calling
function (typically model_tools.py) to include both full description
requests and specific questions.
Args:
image_url (str): The URL of the image to analyze
user_prompt (str): The pre-formatted prompt for the vision model
model (str): The vision model to use (default: gemini-2.5-flash)
Returns:
str: JSON string containing the analysis results with the following structure:
{
"success": bool,
"analysis": str (defaults to error message if None)
}
Raises:
Exception: If analysis fails or API key is not set
"""
debug_call_data = {
"parameters": {
"image_url": image_url,
"user_prompt": user_prompt,
"model": model
},
"error": None,
"success": False,
"analysis_length": 0,
"model_used": model
}
try:
print(f"🔍 Analyzing image from URL: {image_url[:60]}{'...' if len(image_url) > 60 else ''}")
print(f"📝 User prompt: {user_prompt[:100]}{'...' if len(user_prompt) > 100 else ''}")
# Validate image URL
if not _validate_image_url(image_url):
raise ValueError("Invalid image URL format. Must start with http:// or https://")
# Check API key availability
if not os.getenv("NOUS_API_KEY"):
raise ValueError("NOUS_API_KEY environment variable not set")
# Use the prompt as provided (model_tools.py now handles full description formatting)
comprehensive_prompt = user_prompt
# Prepare the message with image URL format
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": comprehensive_prompt
},
{
"type": "image_url",
"image_url": {
"url": image_url
}
}
]
}
]
print(f"🧠 Processing image with {model}...")
# Call the vision API
response = await nous_client.chat.completions.create(
model=model,
messages=messages,
temperature=0.1, # Low temperature for consistent analysis
max_tokens=2000 # Generous limit for detailed analysis
)
# Extract the analysis
analysis = response.choices[0].message.content.strip()
analysis_length = len(analysis)
print(f"✅ Image analysis completed ({analysis_length} characters)")
# Prepare successful response
result = {
"success": True,
"analysis": analysis or "There was a problem with the request and the image could not be analyzed."
}
debug_call_data["success"] = True
debug_call_data["analysis_length"] = analysis_length
# Log debug information
_log_debug_call("vision_analyze_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
except Exception as e:
error_msg = f"Error analyzing image: {str(e)}"
print(f"{error_msg}")
# Prepare error response
result = {
"success": False,
"analysis": "There was a problem with the request and the image could not be analyzed."
}
debug_call_data["error"] = error_msg
_log_debug_call("vision_analyze_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
def check_nous_api_key() -> bool:
"""
Check if the Nous Research API key is available in environment variables.
Returns:
bool: True if API key is set, False otherwise
"""
return bool(os.getenv("NOUS_API_KEY"))
def check_vision_requirements() -> bool:
"""
Check if all requirements for vision tools are met.
Returns:
bool: True if requirements are met, False otherwise
"""
return check_nous_api_key()
def get_debug_session_info() -> Dict[str, Any]:
"""
Get information about the current debug session.
Returns:
Dict[str, Any]: Dictionary containing debug session information
"""
if not DEBUG_MODE or not DEBUG_DATA:
return {
"enabled": False,
"session_id": None,
"log_path": None,
"total_calls": 0
}
return {
"enabled": True,
"session_id": DEBUG_SESSION_ID,
"log_path": str(DEBUG_LOG_PATH / f"vision_tools_debug_{DEBUG_SESSION_ID}.json"),
"total_calls": len(DEBUG_DATA["tool_calls"])
}
if __name__ == "__main__":
"""
Simple test/demo when run directly
"""
print("👁️ Vision Tools Module")
print("=" * 40)
# Check if API key is available
api_available = check_nous_api_key()
if not api_available:
print("❌ NOUS_API_KEY environment variable not set")
print("Please set your API key: export NOUS_API_KEY='your-key-here'")
print("Get API key at: https://inference-api.nousresearch.com/")
exit(1)
else:
print("✅ Nous Research API key found")
print("🛠️ Vision tools ready for use!")
print(f"🧠 Using model: {DEFAULT_VISION_MODEL}")
# Show debug mode status
if DEBUG_MODE:
print(f"🐛 Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}")
print(f" Debug logs will be saved to: ./logs/vision_tools_debug_{DEBUG_SESSION_ID}.json")
else:
print("🐛 Debug mode disabled (set VISION_TOOLS_DEBUG=true to enable)")
print("\nBasic usage:")
print(" from vision_tools import vision_analyze_tool")
print(" import asyncio")
print("")
print(" async def main():")
print(" result = await vision_analyze_tool(")
print(" image_url='https://example.com/image.jpg',")
print(" user_prompt='What do you see in this image?'")
print(" )")
print(" print(result)")
print(" asyncio.run(main())")
print("\nExample prompts:")
print(" - 'What architectural style is this building?'")
print(" - 'Describe the emotions and mood in this image'")
print(" - 'What text can you read in this image?'")
print(" - 'Identify any safety hazards visible'")
print(" - 'What products or brands are shown?'")
print("\nDebug mode:")
print(" # Enable debug logging")
print(" export VISION_TOOLS_DEBUG=true")
print(" # Debug logs capture all vision analysis calls and results")
print(" # Logs saved to: ./logs/vision_tools_debug_UUID.json")

View File

@@ -19,6 +19,11 @@ LLM Processing:
- Uses Nous Research API with Gemini 2.5 Flash for intelligent content extraction
- Extracts key excerpts and creates markdown summaries to reduce token usage
Debug Mode:
- Set WEB_TOOLS_DEBUG=true to enable detailed logging
- Creates web_tools_debug_UUID.json in ./logs directory
- Captures all tool calls, results, and compression metrics
Usage:
from web_tools import web_search_tool, web_extract_tool, web_crawl_tool
@@ -40,6 +45,9 @@ import json
import os
import re
import asyncio
import uuid
import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from tavily import TavilyClient
from openai import AsyncOpenAI
@@ -57,6 +65,66 @@ nous_client = AsyncOpenAI(
DEFAULT_SUMMARIZER_MODEL = "gemini-2.5-flash"
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
# Debug mode configuration
DEBUG_MODE = os.getenv("WEB_TOOLS_DEBUG", "false").lower() == "true"
DEBUG_SESSION_ID = str(uuid.uuid4())
DEBUG_LOG_PATH = Path("./logs")
DEBUG_DATA = {
"session_id": DEBUG_SESSION_ID,
"start_time": datetime.datetime.now().isoformat(),
"debug_enabled": DEBUG_MODE,
"tool_calls": []
} if DEBUG_MODE else None
# Create logs directory if debug mode is enabled
if DEBUG_MODE:
DEBUG_LOG_PATH.mkdir(exist_ok=True)
print(f"🐛 Debug mode enabled - Session ID: {DEBUG_SESSION_ID}")
def _log_debug_call(tool_name: str, call_data: Dict[str, Any]) -> None:
"""
Log a debug call entry to the global debug data structure.
Args:
tool_name (str): Name of the tool being called
call_data (Dict[str, Any]): Data about the call including parameters and results
"""
if not DEBUG_MODE or not DEBUG_DATA:
return
call_entry = {
"timestamp": datetime.datetime.now().isoformat(),
"tool_name": tool_name,
**call_data
}
DEBUG_DATA["tool_calls"].append(call_entry)
def _save_debug_log() -> None:
"""
Save the current debug data to a JSON file in the logs directory.
"""
if not DEBUG_MODE or not DEBUG_DATA:
return
try:
debug_filename = f"web_tools_debug_{DEBUG_SESSION_ID}.json"
debug_filepath = DEBUG_LOG_PATH / debug_filename
# Update end time
DEBUG_DATA["end_time"] = datetime.datetime.now().isoformat()
DEBUG_DATA["total_calls"] = len(DEBUG_DATA["tool_calls"])
with open(debug_filepath, 'w', encoding='utf-8') as f:
json.dump(DEBUG_DATA, f, indent=2, ensure_ascii=False)
print(f"🐛 Debug log saved: {debug_filepath}")
except Exception as e:
print(f"❌ Error saving debug log: {str(e)}")
async def process_content_with_llm(
content: str,
@@ -208,21 +276,51 @@ def web_search_tool(query: str, limit: int = 5) -> str:
Raises:
Exception: If search fails or API key is not set
"""
debug_call_data = {
"parameters": {
"query": query,
"limit": limit
},
"error": None,
"results_count": 0,
"original_response_size": 0,
"final_response_size": 0
}
try:
print(f"🔍 Searching the web for: '{query}' (limit: {limit})")
# Use Tavily's search functionality
response = tavily_client.search(query=query, max_results=limit, search_depth="advanced")
print(f"✅ Found {len(response.get('results', []))} results")
results_count = len(response.get('results', []))
print(f"✅ Found {results_count} results")
# Capture debug information
debug_call_data["results_count"] = results_count
debug_call_data["original_response_size"] = len(json.dumps(response))
result_json = json.dumps(response, indent=2)
# Clean base64 images from search results
return clean_base64_images(result_json)
cleaned_result = clean_base64_images(result_json)
debug_call_data["final_response_size"] = len(cleaned_result)
debug_call_data["compression_applied"] = "base64_image_removal"
# Log debug information
_log_debug_call("web_search_tool", debug_call_data)
_save_debug_log()
return cleaned_result
except Exception as e:
error_msg = f"Error searching web: {str(e)}"
print(f"{error_msg}")
debug_call_data["error"] = error_msg
_log_debug_call("web_search_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
@@ -253,17 +351,39 @@ async def web_extract_tool(
Raises:
Exception: If extraction fails or API key is not set
"""
debug_call_data = {
"parameters": {
"urls": urls,
"format": format,
"use_llm_processing": use_llm_processing,
"model": model,
"min_length": min_length
},
"error": None,
"pages_extracted": 0,
"pages_processed_with_llm": 0,
"original_response_size": 0,
"final_response_size": 0,
"compression_metrics": [],
"processing_applied": []
}
try:
print(f"📄 Extracting content from {len(urls)} URL(s)")
# Use Tavily's extract functionality
response = tavily_client.extract(urls=urls, format=format)
print(f"✅ Extracted content from {len(response.get('results', []))} pages")
pages_extracted = len(response.get('results', []))
print(f"✅ Extracted content from {pages_extracted} pages")
debug_call_data["pages_extracted"] = pages_extracted
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled
if use_llm_processing and os.getenv("NOUS_API_KEY"):
print("🧠 Processing extracted content with LLM...")
debug_call_data["processing_applied"].append("llm_processing")
for result in response.get('results', []):
url = result.get('url', 'Unknown URL')
@@ -271,24 +391,48 @@ async def web_extract_tool(
raw_content = result.get('raw_content', '') or result.get('content', '')
if raw_content:
original_size = len(raw_content)
# Process content with LLM
processed = await process_content_with_llm(
raw_content, url, title, model, min_length
)
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Capture compression metrics
debug_call_data["compression_metrics"].append({
"url": url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
})
# Replace content with processed version
result['content'] = processed
# Keep raw content in separate field for reference
result['raw_content'] = raw_content
debug_call_data["pages_processed_with_llm"] += 1
print(f" 📝 {url} (processed)")
else:
debug_call_data["compression_metrics"].append({
"url": url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
})
print(f" 📝 {url} (no processing - content too short)")
else:
print(f" ⚠️ {url} (no content to process)")
else:
if use_llm_processing and not os.getenv("NOUS_API_KEY"):
print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of extracted pages for debugging (original behavior)
for result in response.get('results', []):
@@ -298,11 +442,25 @@ async def web_extract_tool(
result_json = json.dumps(response, indent=2)
# Clean base64 images from extracted content
return clean_base64_images(result_json)
cleaned_result = clean_base64_images(result_json)
debug_call_data["final_response_size"] = len(cleaned_result)
debug_call_data["processing_applied"].append("base64_image_removal")
# Log debug information
_log_debug_call("web_extract_tool", debug_call_data)
_save_debug_log()
return cleaned_result
except Exception as e:
error_msg = f"Error extracting content: {str(e)}"
print(f"{error_msg}")
debug_call_data["error"] = error_msg
_log_debug_call("web_extract_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
@@ -336,6 +494,24 @@ async def web_crawl_tool(
Raises:
Exception: If crawling fails or API key is not set
"""
debug_call_data = {
"parameters": {
"url": url,
"instructions": instructions,
"depth": depth,
"use_llm_processing": use_llm_processing,
"model": model,
"min_length": min_length
},
"error": None,
"pages_crawled": 0,
"pages_processed_with_llm": 0,
"original_response_size": 0,
"final_response_size": 0,
"compression_metrics": [],
"processing_applied": []
}
try:
instructions_text = f" with instructions: '{instructions}'" if instructions else ""
print(f"🕷️ Crawling {url}{instructions_text}")
@@ -348,11 +524,16 @@ async def web_crawl_tool(
extract_depth=depth
)
print(f"✅ Crawled {len(response.get('results', []))} pages")
pages_crawled = len(response.get('results', []))
print(f"✅ Crawled {pages_crawled} pages")
debug_call_data["pages_crawled"] = pages_crawled
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled
if use_llm_processing and os.getenv("NOUS_API_KEY"):
print("🧠 Processing crawled content with LLM...")
debug_call_data["processing_applied"].append("llm_processing")
for result in response.get('results', []):
page_url = result.get('url', 'Unknown URL')
@@ -360,24 +541,48 @@ async def web_crawl_tool(
content = result.get('content', '')
if content:
original_size = len(content)
# Process content with LLM
processed = await process_content_with_llm(
content, page_url, title, model, min_length
)
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Capture compression metrics
debug_call_data["compression_metrics"].append({
"url": page_url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
})
# Keep original content in raw_content field
result['raw_content'] = content
# Replace content with processed version
result['content'] = processed
debug_call_data["pages_processed_with_llm"] += 1
print(f" 🌐 {page_url} (processed)")
else:
debug_call_data["compression_metrics"].append({
"url": page_url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
})
print(f" 🌐 {page_url} (no processing - content too short)")
else:
print(f" ⚠️ {page_url} (no content to process)")
else:
if use_llm_processing and not os.getenv("NOUS_API_KEY"):
print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of crawled pages for debugging (original behavior)
for result in response.get('results', []):
@@ -387,11 +592,25 @@ async def web_crawl_tool(
result_json = json.dumps(response, indent=2)
# Clean base64 images from crawled content
return clean_base64_images(result_json)
cleaned_result = clean_base64_images(result_json)
debug_call_data["final_response_size"] = len(cleaned_result)
debug_call_data["processing_applied"].append("base64_image_removal")
# Log debug information
_log_debug_call("web_crawl_tool", debug_call_data)
_save_debug_log()
return cleaned_result
except Exception as e:
error_msg = f"Error crawling website: {str(e)}"
print(f"{error_msg}")
debug_call_data["error"] = error_msg
_log_debug_call("web_crawl_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
@@ -416,6 +635,33 @@ def check_nous_api_key() -> bool:
return bool(os.getenv("NOUS_API_KEY"))
def get_debug_session_info() -> Dict[str, Any]:
"""
Get information about the current debug session.
Returns:
Dict[str, Any]: Dictionary containing debug session information:
- enabled: Whether debug mode is enabled
- session_id: Current session UUID (if enabled)
- log_path: Path where debug logs are saved (if enabled)
- total_calls: Number of tool calls logged so far (if enabled)
"""
if not DEBUG_MODE or not DEBUG_DATA:
return {
"enabled": False,
"session_id": None,
"log_path": None,
"total_calls": 0
}
return {
"enabled": True,
"session_id": DEBUG_SESSION_ID,
"log_path": str(DEBUG_LOG_PATH / f"web_tools_debug_{DEBUG_SESSION_ID}.json"),
"total_calls": len(DEBUG_DATA["tool_calls"])
}
if __name__ == "__main__":
"""
Simple test/demo when run directly
@@ -451,6 +697,13 @@ if __name__ == "__main__":
print("🧠 LLM content processing available with Gemini 2.5 Flash")
print(f" Default min length for processing: {DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION} chars")
# Show debug mode status
if DEBUG_MODE:
print(f"🐛 Debug mode ENABLED - Session ID: {DEBUG_SESSION_ID}")
print(f" Debug logs will be saved to: ./logs/web_tools_debug_{DEBUG_SESSION_ID}.json")
else:
print("🐛 Debug mode disabled (set WEB_TOOLS_DEBUG=true to enable)")
print("\nBasic usage:")
print(" from web_tools import web_search_tool, web_extract_tool, web_crawl_tool")
print(" import asyncio")
@@ -480,4 +733,14 @@ if __name__ == "__main__":
print(" # Disable LLM processing")
print(" raw_content = await web_extract_tool(['https://example.com'], use_llm_processing=False)")
print("\nDebug mode:")
print(" # Enable debug logging")
print(" export WEB_TOOLS_DEBUG=true")
print(" # Debug logs capture:")
print(" # - All tool calls with parameters")
print(" # - Original API responses")
print(" # - LLM compression metrics")
print(" # - Final processed results")
print(" # Logs saved to: ./logs/web_tools_debug_UUID.json")
print(f"\n📝 Run 'python test_web_tools_llm.py' to test LLM processing capabilities")