diff --git a/mixture_of_agents_tool.py b/mixture_of_agents_tool.py index 35fe4477..206b1496 100644 --- a/mixture_of_agents_tool.py +++ b/mixture_of_agents_tool.py @@ -65,7 +65,7 @@ nous_client = AsyncOpenAI( REFERENCE_MODELS = [ "claude-opus-4-20250514", "gemini-2.5-pro", - "o4-mini", + "gpt-5", "deepseek-r1" ] @@ -164,7 +164,7 @@ async def _run_reference_model_safe( model: str, user_prompt: str, temperature: float = REFERENCE_TEMPERATURE, - max_tokens: int = 128000, + max_tokens: int = 32000, max_retries: int = 3 ) -> tuple[str, str, bool]: """ @@ -184,12 +184,18 @@ async def _run_reference_model_safe( try: print(f"🤖 Querying {model} (attempt {attempt + 1}/{max_retries})") - response = await nous_client.chat.completions.create( - model=model, - messages=[{"role": "user", "content": user_prompt}], - temperature=temperature, - max_tokens=max_tokens - ) + # Build parameters for the API call + api_params = { + "model": model, + "messages": [{"role": "user", "content": user_prompt}] + } + + # GPT models (especially gpt-4o-mini) don't support custom temperature values + # Only include temperature for non-GPT models + if not model.lower().startswith('gpt-'): + api_params["temperature"] = temperature + + response = await nous_client.chat.completions.create(**api_params) content = response.choices[0].message.content.strip() print(f"✅ {model} responded ({len(content)} characters)") @@ -220,7 +226,7 @@ async def _run_aggregator_model( system_prompt: str, user_prompt: str, temperature: float = AGGREGATOR_TEMPERATURE, - max_tokens: int = 16000 + max_tokens: int = None ) -> str: """ Run the aggregator model to synthesize the final response. @@ -236,15 +242,21 @@ async def _run_aggregator_model( """ print(f"🧠 Running aggregator model: {AGGREGATOR_MODEL}") - response = await nous_client.chat.completions.create( - model=AGGREGATOR_MODEL, - messages=[ + # Build parameters for the API call + api_params = { + "model": AGGREGATOR_MODEL, + "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} - ], - temperature=temperature, - max_tokens=max_tokens - ) + ] + } + + # GPT models (especially gpt-4o-mini) don't support custom temperature values + # Only include temperature for non-GPT models + if not AGGREGATOR_MODEL.lower().startswith('gpt-'): + api_params["temperature"] = temperature + + response = await nous_client.chat.completions.create(**api_params) content = response.choices[0].message.content.strip() print(f"✅ Aggregation complete ({len(content)} characters)") diff --git a/model_tools.py b/model_tools.py index c5aa0827..83586742 100644 --- a/model_tools.py +++ b/model_tools.py @@ -42,7 +42,7 @@ def get_web_tool_definitions() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "web_search", - "description": "Search the web for information on any topic. Returns relevant results with titles, URLs, content snippets, and answers. Uses advanced search depth for comprehensive results.", + "description": "Search the web for information on any topic. Returns relevant results with titles and URLs. Uses advanced search depth for comprehensive results.", "parameters": { "type": "object", "properties": { diff --git a/run_agent.py b/run_agent.py index 0520b14f..eec9d63c 100644 --- a/run_agent.py +++ b/run_agent.py @@ -26,6 +26,7 @@ import time from typing import List, Dict, Any, Optional from openai import OpenAI import fire +from datetime import datetime # Import our tool system from model_tools import get_tool_definitions, handle_function_call, check_toolset_requirements @@ -49,7 +50,8 @@ class AIAgent: enabled_tools: List[str] = None, disabled_tools: List[str] = None, enabled_toolsets: List[str] = None, - disabled_toolsets: List[str] = None + disabled_toolsets: List[str] = None, + save_trajectories: bool = False ): """ Initialize the AI Agent. @@ -64,10 +66,12 @@ class AIAgent: disabled_tools (List[str]): Disable these specific tools (optional) enabled_toolsets (List[str]): Only enable tools from these toolsets (optional) disabled_toolsets (List[str]): Disable tools from these toolsets (optional) + save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) """ self.model = model self.max_iterations = max_iterations self.tool_delay = tool_delay + self.save_trajectories = save_trajectories # Store tool filtering options self.enabled_tools = enabled_tools @@ -123,31 +127,184 @@ class AIAgent: missing_reqs = [name for name, available in requirements.items() if not available] if missing_reqs: print(f"âš ī¸ Some tools may not work due to missing requirements: {missing_reqs}") + + # Show trajectory saving status + if self.save_trajectories: + print("📝 Trajectory saving enabled") - def create_system_message(self, custom_system: str = None) -> str: + def _format_tools_for_system_message(self) -> str: """ - Create the system message for the agent. + Format tool definitions for the system message in the trajectory format. + + Returns: + str: JSON string representation of tool definitions + """ + if not self.tools: + return "[]" + + # Convert tool definitions to the format expected in trajectories + formatted_tools = [] + for tool in self.tools: + func = tool["function"] + formatted_tool = { + "name": func["name"], + "description": func.get("description", ""), + "parameters": func.get("parameters", {}), + "required": None # Match the format in the example + } + formatted_tools.append(formatted_tool) + + return json.dumps(formatted_tools) + + def _convert_to_trajectory_format(self, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]: + """ + Convert internal message format to trajectory format for saving. Args: - custom_system (str): Custom system message (optional) + messages (List[Dict]): Internal message history + user_query (str): Original user query + completed (bool): Whether the conversation completed successfully Returns: - str: System message content + List[Dict]: Messages in trajectory format """ - if custom_system: - return custom_system + trajectory = [] - return ( - "You are an AI assistant that provides helpful responses. You may use extremely long chains of thought " - "to deeply consider the problem and deliberate with yourself via systematic reasoning processes to help " - "come to a correct solution prior to answering. You should enclose your thoughts and internal monologue " - "inside tags.\n\n" - "You are equipped with web research tools that allow you to search the web, extract content from web pages, " - "and crawl websites. Use these tools to gather current information and provide accurate, well-researched responses. " - "You can call multiple tools in parallel if they are not reliant on each other's results. You can also use " - "sequential tool calls to build on data you've collected from previous tool calls. Continue using tools until " - "you feel confident you have enough information to provide a comprehensive answer." + # Add system message with tool definitions + system_msg = ( + "You are a function calling AI model. You are provided with function signatures within XML tags. " + "You may call one or more functions to assist with the user query. If available tools are not relevant in assisting " + "with user query, just respond in natural conversational language. Don't make assumptions about what values to plug " + "into functions. After calling & executing the functions, you will be provided with function results within " + " XML tags. Here are the available tools:\n" + f"\n{self._format_tools_for_system_message()}\n\n" + "For each function call return a JSON object, with the following pydantic model json schema for each:\n" + "{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, " + "'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n" + "Each function call should be enclosed within XML tags.\n" + "Example:\n\n{'name': ,'arguments': }\n" ) + + trajectory.append({ + "from": "system", + "value": system_msg + }) + + # Add the initial user message + trajectory.append({ + "from": "human", + "value": user_query + }) + + # Process remaining messages + i = 1 # Skip the first user message as we already added it + while i < len(messages): + msg = messages[i] + + if msg["role"] == "assistant": + # Check if this message has tool calls + if "tool_calls" in msg and msg["tool_calls"]: + # Format assistant message with tool calls + content = "" + if msg.get("content") and msg["content"].strip(): + content = msg["content"] + "\n" + + # Add tool calls wrapped in XML tags + for tool_call in msg["tool_calls"]: + tool_call_json = { + "name": tool_call["function"]["name"], + "arguments": json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"] + } + content += f"\n{json.dumps(tool_call_json)}\n\n" + + trajectory.append({ + "from": "gpt", + "value": content.rstrip() + }) + + # Collect all subsequent tool responses + tool_responses = [] + j = i + 1 + while j < len(messages) and messages[j]["role"] == "tool": + tool_msg = messages[j] + # Format tool response with XML tags + tool_response = f"\n" + + # Try to parse tool content as JSON if it looks like JSON + tool_content = tool_msg["content"] + try: + if tool_content.strip().startswith(("{", "[")): + tool_content = json.loads(tool_content) + except (json.JSONDecodeError, AttributeError): + pass # Keep as string if not valid JSON + + tool_response += json.dumps({ + "tool_call_id": tool_msg.get("tool_call_id", ""), + "name": msg["tool_calls"][len(tool_responses)]["function"]["name"] if len(tool_responses) < len(msg["tool_calls"]) else "unknown", + "content": tool_content + }) + tool_response += "\n" + tool_responses.append(tool_response) + j += 1 + + # Add all tool responses as a single message + if tool_responses: + trajectory.append({ + "from": "tool", + "value": "\n".join(tool_responses) + }) + i = j - 1 # Skip the tool messages we just processed + + else: + # Regular assistant message without tool calls + trajectory.append({ + "from": "gpt", + "value": msg["content"] or "" + }) + + elif msg["role"] == "user": + trajectory.append({ + "from": "human", + "value": msg["content"] + }) + + i += 1 + + return trajectory + + def _save_trajectory(self, messages: List[Dict[str, Any]], user_query: str, completed: bool): + """ + Save conversation trajectory to JSONL file. + + Args: + messages (List[Dict]): Complete message history + user_query (str): Original user query + completed (bool): Whether the conversation completed successfully + """ + if not self.save_trajectories: + return + + # Convert messages to trajectory format + trajectory = self._convert_to_trajectory_format(messages, user_query, completed) + + # Determine which file to save to + filename = "trajectory_samples.jsonl" if completed else "failed_trajectories.jsonl" + + # Create trajectory entry + entry = { + "conversations": trajectory, + "timestamp": datetime.now().isoformat(), + "model": self.model, + "completed": completed + } + + # Append to JSONL file + try: + with open(filename, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + print(f"💾 Trajectory saved to {filename}") + except Exception as e: + print(f"âš ī¸ Failed to save trajectory: {e}") def run_conversation( self, @@ -169,13 +326,6 @@ class AIAgent: # Initialize conversation messages = conversation_history or [] - # Add system message if not already present - if not messages or messages[0]["role"] != "system": - messages.insert(0, { - "role": "system", - "content": self.create_system_message(system_message) - }) - # Add user message messages.append({ "role": "user", @@ -292,11 +442,17 @@ class AIAgent: if final_response is None: final_response = "I've reached the maximum number of iterations. Here's what I found so far." + # Determine if conversation completed successfully + completed = final_response is not None and api_call_count < self.max_iterations + + # Save trajectory if enabled + self._save_trajectory(messages, user_message, completed) + return { "final_response": final_response, "messages": messages, "api_calls": api_call_count, - "completed": final_response is not None + "completed": completed } def chat(self, message: str) -> str: @@ -323,7 +479,8 @@ def main( disabled_tools: str = None, enabled_toolsets: str = None, disabled_toolsets: str = None, - list_tools: bool = False + list_tools: bool = False, + save_trajectories: bool = False ): """ Main function for running the agent directly. @@ -339,6 +496,7 @@ def main( enabled_toolsets (str): Comma-separated list of toolsets to enable (e.g., "web_tools") disabled_toolsets (str): Comma-separated list of toolsets to disable (e.g., "terminal_tools") list_tools (bool): Just list available tools and exit + save_trajectories (bool): Save conversation trajectories to JSONL files. Defaults to False. """ print("🤖 AI Agent with Tool Calling") print("=" * 50) @@ -373,6 +531,8 @@ def main( print(f" python run_agent.py --enabled_tools=web_search,web_extract --query='research topic'") print(f" # Run without terminal tools") print(f" python run_agent.py --disabled_tools=terminal --query='web research only'") + print(f" # Run with trajectory saving enabled") + print(f" python run_agent.py --save_trajectories --query='your question here'") return # Parse tool selection arguments @@ -397,6 +557,11 @@ def main( disabled_toolsets_list = [t.strip() for t in disabled_toolsets.split(",")] print(f"đŸšĢ Disabled toolsets: {disabled_toolsets_list}") + if save_trajectories: + print(f"💾 Trajectory saving: ENABLED") + print(f" - Successful conversations → trajectory_samples.jsonl") + print(f" - Failed conversations → failed_trajectories.jsonl") + # Initialize agent with provided parameters try: agent = AIAgent( @@ -407,7 +572,8 @@ def main( enabled_tools=enabled_tools_list, disabled_tools=disabled_tools_list, enabled_toolsets=enabled_toolsets_list, - disabled_toolsets=disabled_toolsets_list + disabled_toolsets=disabled_toolsets_list, + save_trajectories=save_trajectories ) except RuntimeError as e: print(f"❌ Failed to initialize agent: {e}") diff --git a/test_web_tools.py b/test_web_tools.py new file mode 100644 index 00000000..7c86becb --- /dev/null +++ b/test_web_tools.py @@ -0,0 +1,620 @@ +#!/usr/bin/env python3 +""" +Comprehensive Test Suite for Web Tools Module + +This script tests all web tools functionality to ensure they work correctly. +Run this after any updates to the web_tools.py module or Firecrawl library. + +Usage: + python test_web_tools.py # Run all tests + python test_web_tools.py --no-llm # Skip LLM processing tests + python test_web_tools.py --verbose # Show detailed output + +Requirements: + - FIRECRAWL_API_KEY environment variable must be set + - NOUS_API_KEY environment vitinariable (optional, for LLM tests) +""" + +import json +import asyncio +import sys +import os +import argparse +from datetime import datetime +from typing import List, Dict, Any + +# Import the web tools to test +from web_tools import ( + web_search_tool, + web_extract_tool, + web_crawl_tool, + check_firecrawl_api_key, + check_nous_api_key, + get_debug_session_info +) + + +class Colors: + """ANSI color codes for terminal output""" + HEADER = '\033[95m' + BLUE = '\033[94m' + CYAN = '\033[96m' + GREEN = '\033[92m' + WARNING = '\033[93m' + FAIL = '\033[91m' + ENDC = '\033[0m' + BOLD = '\033[1m' + UNDERLINE = '\033[4m' + + +def print_header(text: str): + """Print a formatted header""" + print(f"\n{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}") + print(f"{Colors.HEADER}{Colors.BOLD}{text}{Colors.ENDC}") + print(f"{Colors.HEADER}{Colors.BOLD}{'='*60}{Colors.ENDC}") + + +def print_section(text: str): + """Print a formatted section header""" + print(f"\n{Colors.CYAN}{Colors.BOLD}📌 {text}{Colors.ENDC}") + print(f"{Colors.CYAN}{'-'*50}{Colors.ENDC}") + + +def print_success(text: str): + """Print success message""" + print(f"{Colors.GREEN}✅ {text}{Colors.ENDC}") + + +def print_error(text: str): + """Print error message""" + print(f"{Colors.FAIL}❌ {text}{Colors.ENDC}") + + +def print_warning(text: str): + """Print warning message""" + print(f"{Colors.WARNING}âš ī¸ {text}{Colors.ENDC}") + + +def print_info(text: str, indent: int = 0): + """Print info message""" + indent_str = " " * indent + print(f"{indent_str}{Colors.BLUE}â„šī¸ {text}{Colors.ENDC}") + + +class WebToolsTester: + """Test suite for web tools""" + + def __init__(self, verbose: bool = False, test_llm: bool = True): + self.verbose = verbose + self.test_llm = test_llm + self.test_results = { + "passed": [], + "failed": [], + "skipped": [] + } + self.start_time = None + self.end_time = None + + def log_result(self, test_name: str, status: str, details: str = ""): + """Log test result""" + result = { + "test": test_name, + "status": status, + "details": details, + "timestamp": datetime.now().isoformat() + } + + if status == "passed": + self.test_results["passed"].append(result) + print_success(f"{test_name}: {details}" if details else test_name) + elif status == "failed": + self.test_results["failed"].append(result) + print_error(f"{test_name}: {details}" if details else test_name) + elif status == "skipped": + self.test_results["skipped"].append(result) + print_warning(f"{test_name} skipped: {details}" if details else f"{test_name} skipped") + + def test_environment(self) -> bool: + """Test environment setup and API keys""" + print_section("Environment Check") + + # Check Firecrawl API key + if not check_firecrawl_api_key(): + self.log_result("Firecrawl API Key", "failed", "FIRECRAWL_API_KEY not set") + return False + else: + self.log_result("Firecrawl API Key", "passed", "Found") + + # Check Nous API key (optional) + if not check_nous_api_key(): + self.log_result("Nous API Key", "skipped", "NOUS_API_KEY not set (LLM tests will be skipped)") + self.test_llm = False + else: + self.log_result("Nous API Key", "passed", "Found") + + # Check debug mode + debug_info = get_debug_session_info() + if debug_info["enabled"]: + print_info(f"Debug mode enabled - Session: {debug_info['session_id']}") + print_info(f"Debug log: {debug_info['log_path']}") + + return True + + def test_web_search(self) -> List[str]: + """Test web search functionality""" + print_section("Test 1: Web Search") + + test_queries = [ + ("Python web scraping tutorial", 5), + ("Firecrawl API documentation", 3), + ("inflammatory arthritis symptoms treatment", 8) # Test medical query from your example + ] + + extracted_urls = [] + + for query, limit in test_queries: + try: + print(f"\n Testing search: '{query}' (limit={limit})") + + if self.verbose: + print(f" Calling web_search_tool(query='{query}', limit={limit})") + + # Perform search + result = web_search_tool(query, limit) + + # Parse result + try: + data = json.loads(result) + except json.JSONDecodeError as e: + self.log_result(f"Search: {query[:30]}...", "failed", f"Invalid JSON: {e}") + if self.verbose: + print(f" Raw response (first 500 chars): {result[:500]}...") + continue + + if "error" in data: + self.log_result(f"Search: {query[:30]}...", "failed", f"API error: {data['error']}") + continue + + # Check structure + if "success" not in data or "data" not in data: + self.log_result(f"Search: {query[:30]}...", "failed", "Missing success or data fields") + if self.verbose: + print(f" Response keys: {list(data.keys())}") + continue + + web_results = data.get("data", {}).get("web", []) + + if not web_results: + self.log_result(f"Search: {query[:30]}...", "failed", "Empty web results array") + if self.verbose: + print(f" data.web content: {data.get('data', {}).get('web')}") + continue + + # Validate each result + valid_results = 0 + missing_fields = [] + + for i, result in enumerate(web_results): + required_fields = ["url", "title", "description"] + has_all_fields = all(key in result for key in required_fields) + + if has_all_fields: + valid_results += 1 + # Collect URLs for extraction test + if len(extracted_urls) < 3: + extracted_urls.append(result["url"]) + + if self.verbose: + print(f" Result {i+1}: ✓ {result['title'][:50]}...") + print(f" URL: {result['url'][:60]}...") + else: + missing = [f for f in required_fields if f not in result] + missing_fields.append(f"Result {i+1} missing: {missing}") + if self.verbose: + print(f" Result {i+1}: ✗ Missing fields: {missing}") + + # Log results + if valid_results == len(web_results): + self.log_result( + f"Search: {query[:30]}...", + "passed", + f"All {valid_results} results valid" + ) + else: + self.log_result( + f"Search: {query[:30]}...", + "failed", + f"Only {valid_results}/{len(web_results)} valid. Issues: {'; '.join(missing_fields[:3])}" + ) + + except Exception as e: + self.log_result(f"Search: {query[:30]}...", "failed", f"Exception: {type(e).__name__}: {str(e)}") + if self.verbose: + import traceback + print(f" Traceback: {traceback.format_exc()}") + + if self.verbose and extracted_urls: + print(f"\n URLs collected for extraction test: {len(extracted_urls)}") + for url in extracted_urls: + print(f" - {url}") + + return extracted_urls + + async def test_web_extract(self, urls: List[str] = None): + """Test web content extraction""" + print_section("Test 2: Web Extract (without LLM)") + + # Use provided URLs or defaults + if not urls: + urls = [ + "https://docs.firecrawl.dev/introduction", + "https://www.python.org/about/" + ] + print(f" Using default URLs for testing") + else: + print(f" Using {len(urls)} URLs from search results") + + # Test extraction + if urls: + try: + test_urls = urls[:2] # Test with max 2 URLs + print(f"\n Extracting content from {len(test_urls)} URL(s)...") + for url in test_urls: + print(f" - {url}") + + if self.verbose: + print(f" Calling web_extract_tool(urls={test_urls}, format='markdown', use_llm_processing=False)") + + result = await web_extract_tool( + test_urls, + format="markdown", + use_llm_processing=False + ) + + # Parse result + try: + data = json.loads(result) + except json.JSONDecodeError as e: + self.log_result("Extract (no LLM)", "failed", f"Invalid JSON: {e}") + if self.verbose: + print(f" Raw response (first 500 chars): {result[:500]}...") + return + + if "error" in data: + self.log_result("Extract (no LLM)", "failed", f"API error: {data['error']}") + return + + results = data.get("results", []) + + if not results: + self.log_result("Extract (no LLM)", "failed", "No results in response") + if self.verbose: + print(f" Response keys: {list(data.keys())}") + return + + # Validate each result + valid_results = 0 + failed_results = 0 + total_content_length = 0 + extraction_details = [] + + for i, result in enumerate(results): + title = result.get("title", "No title") + content = result.get("content", "") + error = result.get("error") + + if error: + failed_results += 1 + extraction_details.append(f"Page {i+1}: ERROR - {error}") + if self.verbose: + print(f" Page {i+1}: ✗ Error - {error}") + elif content: + content_len = len(content) + total_content_length += content_len + valid_results += 1 + extraction_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)") + if self.verbose: + print(f" Page {i+1}: ✓ {title[:50]}... - {content_len} characters") + print(f" First 100 chars: {content[:100]}...") + else: + extraction_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)") + if self.verbose: + print(f" Page {i+1}: ⚠ {title[:50]}... - Empty content") + + # Log results + if valid_results > 0: + self.log_result( + "Extract (no LLM)", + "passed", + f"{valid_results}/{len(results)} pages extracted, {total_content_length} total chars" + ) + else: + self.log_result( + "Extract (no LLM)", + "failed", + f"No valid content. {failed_results} errors, {len(results) - failed_results} empty" + ) + if self.verbose: + print(f"\n Extraction details:") + for detail in extraction_details: + print(f" {detail}") + + except Exception as e: + self.log_result("Extract (no LLM)", "failed", f"Exception: {type(e).__name__}: {str(e)}") + if self.verbose: + import traceback + print(f" Traceback: {traceback.format_exc()}") + + async def test_web_extract_with_llm(self, urls: List[str] = None): + """Test web extraction with LLM processing""" + print_section("Test 3: Web Extract (with Gemini LLM)") + + if not self.test_llm: + self.log_result("Extract (with LLM)", "skipped", "LLM testing disabled") + return + + # Use a URL likely to have substantial content + test_url = urls[0] if urls else "https://docs.firecrawl.dev/features/scrape" + + try: + print(f"\n Extracting and processing: {test_url}") + + result = await web_extract_tool( + [test_url], + format="markdown", + use_llm_processing=True, + min_length=1000 # Lower threshold for testing + ) + + data = json.loads(result) + + if "error" in data: + self.log_result("Extract (with LLM)", "failed", data["error"]) + return + + results = data.get("results", []) + + if not results: + self.log_result("Extract (with LLM)", "failed", "No results returned") + return + + result = results[0] + content = result.get("content", "") + + if content: + content_len = len(content) + + # Check if content was actually processed (should be shorter than typical raw content) + if content_len > 0: + self.log_result( + "Extract (with LLM)", + "passed", + f"Content processed: {content_len} chars" + ) + + if self.verbose: + print(f"\n First 300 chars of processed content:") + print(f" {content[:300]}...") + else: + self.log_result("Extract (with LLM)", "failed", "No content after processing") + else: + self.log_result("Extract (with LLM)", "failed", "No content field in result") + + except json.JSONDecodeError as e: + self.log_result("Extract (with LLM)", "failed", f"Invalid JSON: {e}") + except Exception as e: + self.log_result("Extract (with LLM)", "failed", str(e)) + + async def test_web_crawl(self): + """Test web crawling functionality""" + print_section("Test 4: Web Crawl") + + test_sites = [ + ("https://docs.firecrawl.dev", None, 2), # Test docs site + ("https://firecrawl.dev", None, 3), # Test main site + ] + + for url, instructions, expected_min_pages in test_sites: + try: + print(f"\n Testing crawl of: {url}") + if instructions: + print(f" Instructions: {instructions}") + else: + print(f" No instructions (general crawl)") + print(f" Expected minimum pages: {expected_min_pages}") + + # Show what's being called + if self.verbose: + print(f" Calling web_crawl_tool(url='{url}', instructions={instructions}, use_llm_processing=False)") + + result = await web_crawl_tool( + url, + instructions=instructions, + use_llm_processing=False # Disable LLM for faster testing + ) + + # Check if result is valid JSON + try: + data = json.loads(result) + except json.JSONDecodeError as e: + self.log_result(f"Crawl: {url}", "failed", f"Invalid JSON response: {e}") + if self.verbose: + print(f" Raw response (first 500 chars): {result[:500]}...") + continue + + # Check for errors + if "error" in data: + self.log_result(f"Crawl: {url}", "failed", f"API error: {data['error']}") + continue + + # Get results + results = data.get("results", []) + + if not results: + self.log_result(f"Crawl: {url}", "failed", "No pages in results array") + if self.verbose: + print(f" Full response: {json.dumps(data, indent=2)[:1000]}...") + continue + + # Analyze pages + valid_pages = 0 + empty_pages = 0 + total_content = 0 + page_details = [] + + for i, page in enumerate(results): + content = page.get("content", "") + title = page.get("title", "Untitled") + error = page.get("error") + + if error: + page_details.append(f"Page {i+1}: ERROR - {error}") + elif content: + valid_pages += 1 + content_len = len(content) + total_content += content_len + page_details.append(f"Page {i+1}: {title[:40]}... ({content_len} chars)") + else: + empty_pages += 1 + page_details.append(f"Page {i+1}: {title[:40]}... (EMPTY)") + + # Show detailed results if verbose + if self.verbose: + print(f"\n Crawl Results:") + print(f" Total pages returned: {len(results)}") + print(f" Valid pages (with content): {valid_pages}") + print(f" Empty pages: {empty_pages}") + print(f" Total content size: {total_content} characters") + print(f"\n Page Details:") + for detail in page_details[:10]: # Show first 10 pages + print(f" - {detail}") + if len(page_details) > 10: + print(f" ... and {len(page_details) - 10} more pages") + + # Determine pass/fail + if valid_pages >= expected_min_pages: + self.log_result( + f"Crawl: {url}", + "passed", + f"{valid_pages}/{len(results)} valid pages, {total_content} chars total" + ) + else: + self.log_result( + f"Crawl: {url}", + "failed", + f"Only {valid_pages} valid pages (expected >= {expected_min_pages}), {empty_pages} empty, {len(results)} total" + ) + + except Exception as e: + self.log_result(f"Crawl: {url}", "failed", f"Exception: {type(e).__name__}: {str(e)}") + if self.verbose: + import traceback + print(f" Traceback:") + print(" " + "\n ".join(traceback.format_exc().split("\n"))) + + async def run_all_tests(self): + """Run all tests""" + self.start_time = datetime.now() + + print_header("WEB TOOLS TEST SUITE") + print(f"Started at: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}") + + # Test environment + if not self.test_environment(): + print_error("\nCannot proceed without required API keys!") + return False + + # Test search and collect URLs + urls = self.test_web_search() + + # Test extraction + await self.test_web_extract(urls if urls else None) + + # Test extraction with LLM + if self.test_llm: + await self.test_web_extract_with_llm(urls if urls else None) + + # Test crawling + await self.test_web_crawl() + + # Print summary + self.end_time = datetime.now() + duration = (self.end_time - self.start_time).total_seconds() + + print_header("TEST SUMMARY") + print(f"Duration: {duration:.2f} seconds") + print(f"\n{Colors.GREEN}Passed: {len(self.test_results['passed'])}{Colors.ENDC}") + print(f"{Colors.FAIL}Failed: {len(self.test_results['failed'])}{Colors.ENDC}") + print(f"{Colors.WARNING}Skipped: {len(self.test_results['skipped'])}{Colors.ENDC}") + + # List failed tests + if self.test_results["failed"]: + print(f"\n{Colors.FAIL}{Colors.BOLD}Failed Tests:{Colors.ENDC}") + for test in self.test_results["failed"]: + print(f" - {test['test']}: {test['details']}") + + # Save results to file + self.save_results() + + return len(self.test_results["failed"]) == 0 + + def save_results(self): + """Save test results to a JSON file""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"test_results_web_tools_{timestamp}.json" + + results = { + "test_suite": "Web Tools", + "start_time": self.start_time.isoformat() if self.start_time else None, + "end_time": self.end_time.isoformat() if self.end_time else None, + "duration_seconds": (self.end_time - self.start_time).total_seconds() if self.start_time and self.end_time else None, + "summary": { + "passed": len(self.test_results["passed"]), + "failed": len(self.test_results["failed"]), + "skipped": len(self.test_results["skipped"]) + }, + "results": self.test_results, + "environment": { + "firecrawl_api_key": check_firecrawl_api_key(), + "nous_api_key": check_nous_api_key(), + "debug_mode": get_debug_session_info()["enabled"] + } + } + + try: + with open(filename, 'w') as f: + json.dump(results, f, indent=2) + print_info(f"Test results saved to: {filename}") + except Exception as e: + print_warning(f"Failed to save results: {e}") + + +async def main(): + """Main entry point""" + parser = argparse.ArgumentParser(description="Test Web Tools Module") + parser.add_argument("--no-llm", action="store_true", help="Skip LLM processing tests") + parser.add_argument("--verbose", "-v", action="store_true", help="Show detailed output") + parser.add_argument("--debug", action="store_true", help="Enable debug mode for web tools") + + args = parser.parse_args() + + # Set debug mode if requested + if args.debug: + os.environ["WEB_TOOLS_DEBUG"] = "true" + print_info("Debug mode enabled for web tools") + + # Create tester + tester = WebToolsTester( + verbose=args.verbose, + test_llm=not args.no_llm + ) + + # Run tests + success = await tester.run_all_tests() + + # Exit with appropriate code + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/web_tools.py b/web_tools.py index 25c44f2d..706eb1ff 100644 --- a/web_tools.py +++ b/web_tools.py @@ -48,11 +48,11 @@ import uuid import datetime from pathlib import Path from typing import List, Dict, Any, Optional -from firecrawl import FirecrawlApp, ScrapeOptions +from firecrawl import Firecrawl from openai import AsyncOpenAI # Initialize Firecrawl client once at module level -firecrawl_app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) +firecrawl_client = Firecrawl(api_key=os.getenv("FIRECRAWL_API_KEY")) # Initialize Nous Research API client for LLM processing (async) nous_client = AsyncOpenAI( @@ -251,7 +251,8 @@ def web_search_tool(query: str, limit: int = 5) -> str: This function provides a generic interface for web search that can work with multiple backends. Currently uses Firecrawl. - Note: Search results are already concise snippets, so no LLM processing is applied. + Note: This function returns search result metadata only (URLs, titles, descriptions). + Use web_extract_tool to get full content from specific URLs. Args: query (str): The search query to look up @@ -260,16 +261,18 @@ def web_search_tool(query: str, limit: int = 5) -> str: Returns: str: JSON string containing search results with the following structure: { - "query": str, - "results": [ - { - "title": str, - "url": str, - "content": str, - "score": float - }, - ... - ] + "success": bool, + "data": { + "web": [ + { + "title": str, + "url": str, + "description": str, + "position": int + }, + ... + ] + } } Raises: @@ -289,46 +292,67 @@ def web_search_tool(query: str, limit: int = 5) -> str: try: print(f"🔍 Searching the web for: '{query}' (limit: {limit})") - # Use Firecrawl's search functionality - # Firecrawl Search: search the web and get full content from results - # Docs: https://docs.firecrawl.dev/introduction - # Note: Firecrawl SDK supports search via app.search(query, limit=...) - response = firecrawl_app.search(query=query, limit=limit) + # Use Firecrawl's v2 search functionality WITHOUT scraping + # We only want search result metadata, not scraped content + # Docs: https://docs.firecrawl.dev/features/search + response = firecrawl_client.search( + query=query, + limit=limit + ) - # Determine results count and trim to minimal structure: { success, data: [{markdown}] } - results_list = [] - success_flag = True - if isinstance(response, dict): - success_flag = bool(response.get("success", True)) - if "data" in response and isinstance(response["data"], list): - results_list = response["data"] - elif "results" in response and isinstance(response["results"], list): - results_list = response["results"] - results_count = len(results_list) - print(f"✅ Found {results_count} results") + # The response is a SearchData object with web, news, and images attributes + # When not scraping, the results are directly in these attributes + web_results = [] + + # Check if response has web attribute (SearchData object) + if hasattr(response, 'web'): + # Response is a SearchData object with web attribute + if response.web: + # Convert each SearchResultWeb object to dict + for result in response.web: + if hasattr(result, 'model_dump'): + # Pydantic model - use model_dump + web_results.append(result.model_dump()) + elif hasattr(result, '__dict__'): + # Regular object - use __dict__ + web_results.append(result.__dict__) + elif isinstance(result, dict): + # Already a dict + web_results.append(result) + elif hasattr(response, 'model_dump'): + # Response has model_dump method - use it to get dict + response_dict = response.model_dump() + if 'web' in response_dict and response_dict['web']: + web_results = response_dict['web'] + elif isinstance(response, dict): + # Response is already a dictionary + if 'web' in response and response['web']: + web_results = response['web'] + + results_count = len(web_results) + print(f"✅ Found {results_count} search results") + + # Build response with just search metadata (URLs, titles, descriptions) + response_data = { + "success": True, + "data": { + "web": web_results + } + } # Capture debug information debug_call_data["results_count"] = results_count - debug_call_data["original_response_size"] = len(json.dumps(response)) - # Build minimal response - minimal_data = [] - for item in results_list: - if isinstance(item, dict) and ("markdown" in item): - minimal_data.append({"markdown": item.get("markdown", "")}) - minimal_response = {"success": success_flag, "data": minimal_data} + # Convert to JSON + result_json = json.dumps(response_data, indent=2) - result_json = json.dumps(minimal_response, indent=2) - cleaned_result = clean_base64_images(result_json) - - debug_call_data["final_response_size"] = len(cleaned_result) - debug_call_data["compression_applied"] = "base64_image_removal" + debug_call_data["final_response_size"] = len(result_json) # Log debug information _log_debug_call("web_search_tool", debug_call_data) _save_debug_log() - return cleaned_result + return result_json except Exception as e: error_msg = f"Error searching web: {str(e)}" @@ -388,40 +412,87 @@ async def web_extract_tool( try: print(f"📄 Extracting content from {len(urls)} URL(s)") - # Use Firecrawl's scrape functionality per URL and normalize to a common shape + # Determine requested formats for Firecrawl v2 + formats: List[str] = [] + if format == "markdown": + formats = ["markdown"] + elif format == "html": + formats = ["html"] + else: + # Default: request markdown for LLM-readiness and include html as backup + formats = ["markdown", "html"] + + # Always use individual scraping for simplicity and reliability + # Batch scraping adds complexity without much benefit for small numbers of URLs results: List[Dict[str, Any]] = [] + for url in urls: try: - # Determine requested formats for Firecrawl - formats: List[str] = [] - if format == "markdown": - formats = ["markdown"] - elif format == "html": - formats = ["html"] - else: - # Default: request markdown for LLM-readiness and include html as backup - formats = ["markdown", "html"] - - scrape_result = firecrawl_app.scrape_url(url, formats=formats) - - # Firecrawl returns {success, data: {markdown?, html?, metadata}} - data = scrape_result.get("data", {}) if isinstance(scrape_result, dict) else {} - metadata = data.get("metadata", {}) + print(f" 📄 Scraping: {url}") + scrape_result = firecrawl_client.scrape( + url=url, + formats=formats + ) + + # Process the result - properly handle object serialization + metadata = {} + title = "" + content_markdown = None + content_html = None + + # Extract data from the scrape result + if hasattr(scrape_result, 'model_dump'): + # Pydantic model - use model_dump to get dict + result_dict = scrape_result.model_dump() + content_markdown = result_dict.get('markdown') + content_html = result_dict.get('html') + metadata = result_dict.get('metadata', {}) + elif hasattr(scrape_result, '__dict__'): + # Regular object with attributes + content_markdown = getattr(scrape_result, 'markdown', None) + content_html = getattr(scrape_result, 'html', None) + + # Handle metadata - convert to dict if it's an object + metadata_obj = getattr(scrape_result, 'metadata', {}) + if hasattr(metadata_obj, 'model_dump'): + metadata = metadata_obj.model_dump() + elif hasattr(metadata_obj, '__dict__'): + metadata = metadata_obj.__dict__ + elif isinstance(metadata_obj, dict): + metadata = metadata_obj + else: + metadata = {} + elif isinstance(scrape_result, dict): + # Already a dictionary + content_markdown = scrape_result.get('markdown') + content_html = scrape_result.get('html') + metadata = scrape_result.get('metadata', {}) + + # Ensure metadata is a dict (not an object) + if not isinstance(metadata, dict): + if hasattr(metadata, 'model_dump'): + metadata = metadata.model_dump() + elif hasattr(metadata, '__dict__'): + metadata = metadata.__dict__ + else: + metadata = {} + + # Get title from metadata title = metadata.get("title", "") - content_markdown = data.get("markdown") - content_html = data.get("html") - + # Choose content based on requested format chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or "" - + results.append({ "url": metadata.get("sourceURL", url), "title": title, "content": chosen_content, "raw_content": chosen_content, - "metadata": metadata + "metadata": metadata # Now guaranteed to be a dict }) + except Exception as scrape_err: + print(f" ❌ Error scraping {url}: {str(scrape_err)}") results.append({ "url": url, "title": "", @@ -582,36 +653,126 @@ async def web_crawl_tool( } try: + # Ensure URL has protocol + if not url.startswith(('http://', 'https://')): + url = f'https://{url}' + print(f" 📝 Added https:// prefix to URL: {url}") + instructions_text = f" with instructions: '{instructions}'" if instructions else "" print(f"đŸ•ˇī¸ Crawling {url}{instructions_text}") - # Use Firecrawl's crawl functionality and normalize to a common shape - # Firecrawl SDK returns the crawl results directly for synchronous crawl - scrape_options = ScrapeOptions(formats=["markdown", "html"]) - crawl_result = firecrawl_app.crawl_url( - url, - limit=20, - scrape_options=scrape_options, - ) + # Use Firecrawl's v2 crawl functionality + # Docs: https://docs.firecrawl.dev/features/crawl + # The crawl() method automatically waits for completion and returns all data + + # Build crawl parameters - keep it simple + crawl_params = { + "limit": 20, # Limit number of pages to crawl + "scrape_options": { + "formats": ["markdown"] # Just markdown for simplicity + } + } + + # Note: The 'prompt' parameter is not documented for crawl + # Instructions are typically used with the Extract endpoint, not Crawl + if instructions: + print(f" â„šī¸ Note: Instructions parameter ignored (not supported in crawl API)") + + # Use the crawl method which waits for completion automatically + try: + crawl_result = firecrawl_client.crawl( + url=url, + **crawl_params + ) + except Exception as e: + print(f" ❌ Crawl API call failed: {e}") + raise pages: List[Dict[str, Any]] = [] - if isinstance(crawl_result, dict): - # Firecrawl returns {success, data: [ {markdown?, html?, metadata} ]} + + # Process crawl results - the crawl method returns a CrawlJob object with data attribute + data_list = [] + + # The crawl_result is a CrawlJob object with a 'data' attribute containing list of Document objects + if hasattr(crawl_result, 'data'): + data_list = crawl_result.data if crawl_result.data else [] + print(f" 📊 Status: {getattr(crawl_result, 'status', 'unknown')}") + print(f" 📄 Retrieved {len(data_list)} pages") + + # Debug: Check other attributes if no data + if not data_list: + print(f" 🔍 Debug - CrawlJob attributes: {[attr for attr in dir(crawl_result) if not attr.startswith('_')]}") + print(f" 🔍 Debug - Status: {getattr(crawl_result, 'status', 'N/A')}") + print(f" 🔍 Debug - Total: {getattr(crawl_result, 'total', 'N/A')}") + print(f" 🔍 Debug - Completed: {getattr(crawl_result, 'completed', 'N/A')}") + + elif isinstance(crawl_result, dict) and 'data' in crawl_result: data_list = crawl_result.get("data", []) - for item in data_list: - metadata = item.get("metadata", {}) if isinstance(item, dict) else {} - page_url = metadata.get("sourceURL", "Unknown URL") - title = metadata.get("title", "") - content_markdown = item.get("markdown") if isinstance(item, dict) else None - content_html = item.get("html") if isinstance(item, dict) else None - content = content_markdown or content_html or "" - pages.append({ - "url": page_url, - "title": title, - "content": content, - "raw_content": content, - "metadata": metadata - }) + else: + print(" âš ī¸ Unexpected crawl result type") + print(f" 🔍 Debug - Result type: {type(crawl_result)}") + if hasattr(crawl_result, '__dict__'): + print(f" 🔍 Debug - Result attributes: {list(crawl_result.__dict__.keys())}") + + for item in data_list: + # Process each crawled page - properly handle object serialization + page_url = "Unknown URL" + title = "" + content_markdown = None + content_html = None + metadata = {} + + # Extract data from the item + if hasattr(item, 'model_dump'): + # Pydantic model - use model_dump to get dict + item_dict = item.model_dump() + content_markdown = item_dict.get('markdown') + content_html = item_dict.get('html') + metadata = item_dict.get('metadata', {}) + elif hasattr(item, '__dict__'): + # Regular object with attributes + content_markdown = getattr(item, 'markdown', None) + content_html = getattr(item, 'html', None) + + # Handle metadata - convert to dict if it's an object + metadata_obj = getattr(item, 'metadata', {}) + if hasattr(metadata_obj, 'model_dump'): + metadata = metadata_obj.model_dump() + elif hasattr(metadata_obj, '__dict__'): + metadata = metadata_obj.__dict__ + elif isinstance(metadata_obj, dict): + metadata = metadata_obj + else: + metadata = {} + elif isinstance(item, dict): + # Already a dictionary + content_markdown = item.get('markdown') + content_html = item.get('html') + metadata = item.get('metadata', {}) + + # Ensure metadata is a dict (not an object) + if not isinstance(metadata, dict): + if hasattr(metadata, 'model_dump'): + metadata = metadata.model_dump() + elif hasattr(metadata, '__dict__'): + metadata = metadata.__dict__ + else: + metadata = {} + + # Extract URL and title from metadata + page_url = metadata.get("sourceURL", metadata.get("url", "Unknown URL")) + title = metadata.get("title", "") + + # Choose content (prefer markdown) + content = content_markdown or content_html or "" + + pages.append({ + "url": page_url, + "title": title, + "content": content, + "raw_content": content, + "metadata": metadata # Now guaranteed to be a dict + }) response = {"results": pages}