diff --git a/batch_runner.py b/batch_runner.py index 9d21aebc..8943b4e0 100644 --- a/batch_runner.py +++ b/batch_runner.py @@ -155,7 +155,8 @@ def _process_single_prompt( if config.get("verbose"): print(f" Prompt {prompt_index}: Using toolsets {selected_toolsets}") - # Initialize agent with sampled toolsets + # Initialize agent with sampled toolsets and log prefix for identification + log_prefix = f"[B{batch_num}:P{prompt_index}]" agent = AIAgent( base_url=config.get("base_url"), api_key=config.get("api_key"), @@ -165,7 +166,12 @@ def _process_single_prompt( save_trajectories=False, # We handle saving ourselves verbose_logging=config.get("verbose", False), ephemeral_system_prompt=config.get("ephemeral_system_prompt"), - log_prefix_chars=config.get("log_prefix_chars", 100) + log_prefix_chars=config.get("log_prefix_chars", 100), + log_prefix=log_prefix, + providers_allowed=config.get("providers_allowed"), + providers_ignored=config.get("providers_ignored"), + providers_order=config.get("providers_order"), + provider_sort=config.get("provider_sort"), ) # Run the agent with task_id to ensure each task gets its own isolated VM @@ -326,6 +332,10 @@ class BatchRunner: verbose: bool = False, ephemeral_system_prompt: str = None, log_prefix_chars: int = 100, + providers_allowed: List[str] = None, + providers_ignored: List[str] = None, + providers_order: List[str] = None, + provider_sort: str = None, ): """ Initialize the batch runner. @@ -343,6 +353,10 @@ class BatchRunner: verbose (bool): Enable verbose logging ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20) + providers_allowed (List[str]): OpenRouter providers to allow (optional) + providers_ignored (List[str]): OpenRouter providers to ignore (optional) + providers_order (List[str]): OpenRouter providers to try in order (optional) + provider_sort (str): Sort providers by price/throughput/latency (optional) """ self.dataset_file = Path(dataset_file) self.batch_size = batch_size @@ -356,6 +370,10 @@ class BatchRunner: self.verbose = verbose self.ephemeral_system_prompt = ephemeral_system_prompt self.log_prefix_chars = log_prefix_chars + self.providers_allowed = providers_allowed + self.providers_ignored = providers_ignored + self.providers_order = providers_order + self.provider_sort = provider_sort # Validate distribution if not validate_distribution(distribution): @@ -512,7 +530,11 @@ class BatchRunner: "api_key": self.api_key, "verbose": self.verbose, "ephemeral_system_prompt": self.ephemeral_system_prompt, - "log_prefix_chars": self.log_prefix_chars + "log_prefix_chars": self.log_prefix_chars, + "providers_allowed": self.providers_allowed, + "providers_ignored": self.providers_ignored, + "providers_order": self.providers_order, + "provider_sort": self.provider_sort, } # Get completed prompts set @@ -523,6 +545,8 @@ class BatchRunner: start_time = time.time() + print(f"\nšŸ”§ Initializing {self.num_workers} worker processes...") + # Process batches in parallel with Pool(processes=self.num_workers) as pool: # Create tasks for each batch @@ -537,6 +561,9 @@ class BatchRunner: for batch_num, batch_data in enumerate(self.batches) ] + print(f"āœ… Created {len(tasks)} batch tasks") + print(f"šŸš€ Starting parallel batch processing...\n") + # Use map to process batches in parallel results = pool.map(_process_batch_worker, tasks) @@ -657,6 +684,10 @@ def main( list_distributions: bool = False, ephemeral_system_prompt: str = None, log_prefix_chars: int = 100, + providers_allowed: str = None, + providers_ignored: str = None, + providers_order: str = None, + provider_sort: str = None, ): """ Run batch processing of agent prompts from a dataset. @@ -676,6 +707,10 @@ def main( list_distributions (bool): List available toolset distributions and exit ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20) + providers_allowed (str): Comma-separated list of OpenRouter providers to allow (e.g. "anthropic,openai") + providers_ignored (str): Comma-separated list of OpenRouter providers to ignore (e.g. "together,deepinfra") + providers_order (str): Comma-separated list of OpenRouter providers to try in order (e.g. "anthropic,openai,google") + provider_sort (str): Sort providers by "price", "throughput", or "latency" (OpenRouter only) Examples: # Basic usage @@ -723,6 +758,11 @@ def main( print("āŒ Error: --run_name is required") return + # Parse provider preferences (comma-separated strings to lists) + providers_allowed_list = [p.strip() for p in providers_allowed.split(",")] if providers_allowed else None + providers_ignored_list = [p.strip() for p in providers_ignored.split(",")] if providers_ignored else None + providers_order_list = [p.strip() for p in providers_order.split(",")] if providers_order else None + # Initialize and run batch runner try: runner = BatchRunner( @@ -737,7 +777,11 @@ def main( num_workers=num_workers, verbose=verbose, ephemeral_system_prompt=ephemeral_system_prompt, - log_prefix_chars=log_prefix_chars + log_prefix_chars=log_prefix_chars, + providers_allowed=providers_allowed_list, + providers_ignored=providers_ignored_list, + providers_order=providers_order_list, + provider_sort=provider_sort, ) runner.run(resume=resume) diff --git a/run_agent.py b/run_agent.py index e7abde80..7e6f901e 100644 --- a/run_agent.py +++ b/run_agent.py @@ -67,6 +67,11 @@ class AIAgent: verbose_logging: bool = False, ephemeral_system_prompt: str = None, log_prefix_chars: int = 100, + log_prefix: str = "", + providers_allowed: List[str] = None, + providers_ignored: List[str] = None, + providers_order: List[str] = None, + provider_sort: str = None, ): """ Initialize the AI Agent. @@ -83,6 +88,11 @@ class AIAgent: verbose_logging (bool): Enable verbose logging for debugging (default: False) ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20) + log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "") + providers_allowed (List[str]): OpenRouter providers to allow (optional) + providers_ignored (List[str]): OpenRouter providers to ignore (optional) + providers_order (List[str]): OpenRouter providers to try in order (optional) + provider_sort (str): Sort providers by price/throughput/latency (optional) """ self.model = model self.max_iterations = max_iterations @@ -91,6 +101,13 @@ class AIAgent: self.verbose_logging = verbose_logging self.ephemeral_system_prompt = ephemeral_system_prompt self.log_prefix_chars = log_prefix_chars + self.log_prefix = f"{log_prefix} " if log_prefix else "" + + # Store OpenRouter provider preferences + self.providers_allowed = providers_allowed + self.providers_ignored = providers_ignored + self.providers_order = providers_order + self.provider_sort = provider_sort # Store toolset filtering options self.enabled_toolsets = enabled_toolsets @@ -103,11 +120,13 @@ class AIAgent: format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) - # Keep OpenAI and httpx at INFO level to avoid massive base64 logs - # Even in verbose mode, we don't want to see full request/response bodies - logging.getLogger('openai').setLevel(logging.INFO) + # Keep OpenAI and httpx at WARNING level to reduce noise + # We have our own retry and error logging that's more informative + logging.getLogger('openai').setLevel(logging.WARNING) + logging.getLogger('openai._base_client').setLevel(logging.WARNING) logging.getLogger('httpx').setLevel(logging.WARNING) - print("šŸ” Verbose logging enabled (OpenAI/httpx request bodies suppressed)") + logging.getLogger('httpcore').setLevel(logging.WARNING) + print("šŸ” Verbose logging enabled (OpenAI/httpx internal logs suppressed)") else: # Set logging to INFO level for important messages only logging.basicConfig( @@ -115,24 +134,40 @@ class AIAgent: format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) - # Reduce OpenAI client logging - logging.getLogger('openai').setLevel(logging.WARNING) - logging.getLogger('httpx').setLevel(logging.WARNING) + # Suppress noisy library logging + logging.getLogger('openai').setLevel(logging.ERROR) + logging.getLogger('openai._base_client').setLevel(logging.ERROR) + logging.getLogger('httpx').setLevel(logging.ERROR) + logging.getLogger('httpcore').setLevel(logging.ERROR) # Initialize OpenAI client client_kwargs = {} if base_url: client_kwargs["base_url"] = base_url + + # Handle API key with multiple fallbacks if api_key: client_kwargs["api_key"] = api_key else: - client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", "dummy-key") + # Try multiple common API key environment variables based on base_url + if base_url and "openrouter" in base_url.lower(): + client_kwargs["api_key"] = os.getenv("OPENROUTER_API_KEY", os.getenv("ANTHROPIC_API_KEY", "dummy-key")) + elif base_url and "anthropic" in base_url.lower(): + client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", os.getenv("OPENAI_API_KEY", "dummy-key")) + else: + client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", os.getenv("OPENAI_API_KEY", "dummy-key")) try: self.client = OpenAI(**client_kwargs) print(f"šŸ¤– AI Agent initialized with model: {self.model}") if base_url: print(f"šŸ”— Using custom base URL: {base_url}") + # Always show API key info (masked) for debugging auth issues + key_used = client_kwargs.get("api_key", "none") + if key_used and key_used != "dummy-key" and len(key_used) > 12: + print(f"šŸ”‘ Using API key: {key_used[:8]}...{key_used[-4:]}") + else: + print(f"āš ļø Warning: API key appears invalid or missing (got: '{key_used[:20] if key_used else 'none'}...')") except Exception as e: raise RuntimeError(f"Failed to initialize OpenAI client: {e}") @@ -245,8 +280,13 @@ class AIAgent: if "tool_calls" in msg and msg["tool_calls"]: # Format assistant message with tool calls content = "" + + # Prepend reasoning in tags if available + if msg.get("reasoning") and msg["reasoning"].strip(): + content = f"{msg['reasoning']}" + if msg.get("content") and msg["content"].strip(): - content = msg["content"] + "\n" + content += msg["content"] + "\n" # Add tool calls wrapped in XML tags for tool_call in msg["tool_calls"]: @@ -296,9 +336,17 @@ class AIAgent: else: # Regular assistant message without tool calls + content = "" + + # Prepend reasoning in tags if available + if msg.get("reasoning") and msg["reasoning"].strip(): + content = f"{msg['reasoning']}" + + content += msg["content"] or "" + trajectory.append({ "from": "gpt", - "value": msg["content"] or "" + "value": content }) elif msg["role"] == "user": @@ -388,12 +436,27 @@ class AIAgent: while api_call_count < self.max_iterations: api_call_count += 1 - print(f"\nšŸ”„ Making API call #{api_call_count}...") + + # Prepare messages for API call + # If we have an ephemeral system prompt, prepend it to the messages + api_messages = messages.copy() + if active_system_prompt: + # Insert system message at the beginning + api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages + + # Calculate approximate request size for logging + total_chars = sum(len(str(msg)) for msg in api_messages) + approx_tokens = total_chars // 4 # Rough estimate: 4 chars per token + + print(f"\n{self.log_prefix}šŸ”„ Making API call #{api_call_count}/{self.max_iterations}...") + print(f"{self.log_prefix} šŸ“Š Request size: {len(api_messages)} messages, ~{approx_tokens:,} tokens (~{total_chars:,} chars)") + print(f"{self.log_prefix} šŸ”§ Available tools: {len(self.tools) if self.tools else 0}") # Log request details if verbose if self.verbose_logging: logging.debug(f"API Request - Model: {self.model}, Messages: {len(messages)}, Tools: {len(self.tools) if self.tools else 0}") logging.debug(f"Last message role: {messages[-1]['role'] if messages else 'none'}") + logging.debug(f"Total message size: ~{approx_tokens:,} tokens") api_start_time = time.time() retry_count = 0 @@ -401,23 +464,33 @@ class AIAgent: while retry_count <= max_retries: try: - # Prepare messages for API call - # If we have an ephemeral system prompt, prepend it to the messages - api_messages = messages.copy() - if active_system_prompt: - # Insert system message at the beginning - api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages + # Build OpenRouter provider preferences if specified + provider_preferences = {} + if self.providers_allowed: + provider_preferences["only"] = self.providers_allowed + if self.providers_ignored: + provider_preferences["ignore"] = self.providers_ignored + if self.providers_order: + provider_preferences["order"] = self.providers_order + if self.provider_sort: + provider_preferences["sort"] = self.provider_sort - # Make API call with tools - response = self.client.chat.completions.create( - model=self.model, - messages=api_messages, - tools=self.tools if self.tools else None, - timeout=60.0 # Add explicit timeout - ) + # Make API call with tools - increased timeout for long responses + api_kwargs = { + "model": self.model, + "messages": api_messages, + "tools": self.tools if self.tools else None, + "timeout": 600.0 # 10 minute timeout for very long responses + } + + # Add provider preferences for OpenRouter via extra_body + if provider_preferences: + api_kwargs["extra_body"] = {"provider": provider_preferences} + + response = self.client.chat.completions.create(**api_kwargs) api_duration = time.time() - api_start_time - print(f"ā±ļø API call completed in {api_duration:.2f}s") + print(f"{self.log_prefix}ā±ļø API call completed in {api_duration:.2f}s") if self.verbose_logging: logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}") @@ -426,13 +499,26 @@ class AIAgent: except Exception as api_error: retry_count += 1 + elapsed_time = time.time() - api_start_time + + # Enhanced error logging + error_type = type(api_error).__name__ + error_msg = str(api_error) + + print(f"{self.log_prefix}āš ļø API call failed (attempt {retry_count}/{max_retries}): {error_type}") + print(f"{self.log_prefix} ā±ļø Time elapsed before failure: {elapsed_time:.2f}s") + print(f"{self.log_prefix} šŸ“ Error: {error_msg[:200]}") + print(f"{self.log_prefix} šŸ“Š Request context: {len(api_messages)} messages, ~{approx_tokens:,} tokens, {len(self.tools) if self.tools else 0} tools") + if retry_count > max_retries: + print(f"{self.log_prefix}āŒ Max retries ({max_retries}) exceeded. Giving up.") + logging.error(f"{self.log_prefix}API call failed after {max_retries} retries. Last error: {api_error}") + logging.error(f"{self.log_prefix}Request details - Messages: {len(api_messages)}, Approx tokens: {approx_tokens:,}") raise api_error wait_time = min(2 ** retry_count, 10) # Exponential backoff, max 10s - print(f"āš ļø API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}") - print(f"ā³ Retrying in {wait_time}s...") - logging.warning(f"API retry {retry_count}/{max_retries} after error: {api_error}") + print(f"{self.log_prefix}ā³ Retrying in {wait_time}s...") + logging.warning(f"API retry {retry_count}/{max_retries} after {error_type}: {error_msg[:200]}") time.sleep(wait_time) try: @@ -440,20 +526,28 @@ class AIAgent: # Handle assistant response if assistant_message.content: - print(f"šŸ¤– Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}") + print(f"{self.log_prefix}šŸ¤– Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}") # Check for tool calls if assistant_message.tool_calls: - print(f"šŸ”§ Processing {len(assistant_message.tool_calls)} tool call(s)...") + print(f"{self.log_prefix}šŸ”§ Processing {len(assistant_message.tool_calls)} tool call(s)...") if self.verbose_logging: for tc in assistant_message.tool_calls: logging.debug(f"Tool call: {tc.function.name} with args: {tc.function.arguments[:200]}...") + # Extract reasoning from response if available (for reasoning models like minimax, kimi, etc.) + reasoning_content = None + if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning: + reasoning_content = assistant_message.reasoning + elif hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content: + reasoning_content = assistant_message.reasoning_content + # Add assistant message with tool calls to conversation messages.append({ "role": "assistant", "content": assistant_message.content, + "reasoning": reasoning_content, # Store reasoning for trajectory "tool_calls": [ { "id": tool_call.id, @@ -516,10 +610,18 @@ class AIAgent: # No tool calls - this is the final response final_response = assistant_message.content or "" + # Extract reasoning from response if available + reasoning_content = None + if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning: + reasoning_content = assistant_message.reasoning + elif hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content: + reasoning_content = assistant_message.reasoning_content + # Add final assistant message messages.append({ "role": "assistant", - "content": final_response + "content": final_response, + "reasoning": reasoning_content # Store reasoning for trajectory }) print(f"šŸŽ‰ Conversation completed after {api_call_count} API call(s)") diff --git a/run_datagen_minimax-3.1.sh b/run_datagen_minimax-3.1.sh new file mode 100755 index 00000000..39f203af --- /dev/null +++ b/run_datagen_minimax-3.1.sh @@ -0,0 +1,12 @@ +python batch_runner.py \ + --dataset_file="source-data/hermes-agent-agent-tasks-1/agent_tasks_eval.jsonl" \ + --batch_size=50 \ + --run_name="megascience_sft_minimax-m2.1-thinking-2-eval" \ + --distribution="science" \ + --model="minimax/minimax-m2.1" \ + --base_url="https://openrouter.ai/api/v1" \ + --providers_allowed="minimax" \ + --num_workers=1 \ + --max_turns=40 \ + --verbose \ + --ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use the terminal or search tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work. Always pip install any packages you need to use the python scripts you want to run. If you need to use a tool that isn't available, you can use the terminal tool to install or create it in many cases as well. Do not use the terminal tool to communicate with the user, as they cannot see your commands, only your final response after completing the task. Search for at least 3 sources, but not more than 12." \ No newline at end of file diff --git a/test_api_call_reasoning.py b/test_api_call_reasoning.py new file mode 100644 index 00000000..3c8883ef --- /dev/null +++ b/test_api_call_reasoning.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +""" +Test script to see how minimax-m2.1 responds to a tool-calling request via OpenRouter. +""" +import os +import json +from pathlib import Path +from openai import OpenAI +from dotenv import load_dotenv + +# Load environment variables +env_path = Path(__file__).parent / '.env' +if env_path.exists(): + load_dotenv(dotenv_path=env_path) + print(f"āœ… Loaded .env from {env_path}") + +# Get API key +api_key = os.getenv("OPENROUTER_API_KEY") +if not api_key: + print("āŒ OPENROUTER_API_KEY not found in environment") + exit(1) +print(f"šŸ”‘ Using API key: {api_key[:12]}...{api_key[-4:]}") + +# Initialize client +client = OpenAI( + base_url="https://openrouter.ai/api/v1", + api_key=api_key +) + +# Define a single simple tool +tools = [ + { + "type": "function", + "function": { + "name": "web_search", + "description": "Search the web for information on any topic. Returns relevant results with titles and URLs.", + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "The search query to look up on the web" + } + }, + "required": ["query"] + } + } + } +] + +# Messages +messages = [ + { + "role": "system", + "content": "You are a helpful assistant with access to tools. Use the web_search tool when you need to find information." + }, + { + "role": "user", + "content": "What is the current price of Bitcoin?" + } +] + +print("\n" + "="*60) +print("šŸ“¤ SENDING REQUEST") +print("="*60) +print(f"Model: minimax/minimax-m2.1") +print(f"Messages: {len(messages)}") +print(f"Tools: {len(tools)}") +print(f"User query: {messages[-1]['content']}") + +# Make the request +try: + response = client.chat.completions.create( + model="minimax/minimax-m2.1", + messages=messages, + tools=tools, + extra_body={ + "provider": { + "only": ["minimax"] + } + }, + timeout=120.0 + ) + + print("\n" + "="*60) + print("šŸ“„ RESPONSE RECEIVED") + print("="*60) + + # Print raw response info + print(f"\nModel: {response.model}") + print(f"ID: {response.id}") + print(f"Created: {response.created}") + + if response.usage: + print(f"\nšŸ“Š Usage:") + print(f" Prompt tokens: {response.usage.prompt_tokens}") + print(f" Completion tokens: {response.usage.completion_tokens}") + print(f" Total tokens: {response.usage.total_tokens}") + + # Print the message + msg = response.choices[0].message + print(f"\nšŸ¤– Assistant Response:") + print(f" Role: {msg.role}") + print(f" Content: {msg.content}") + print(f" Tool calls: {msg.tool_calls}") + + if msg.tool_calls: + print(f"\nšŸ”§ Tool Calls Detail:") + for i, tc in enumerate(msg.tool_calls): + print(f" [{i}] ID: {tc.id}") + print(f" Function: {tc.function.name}") + print(f" Arguments: {tc.function.arguments}") + + # Print full raw response as JSON + print("\n" + "="*60) + print("šŸ“ RAW RESPONSE (JSON)") + print("="*60) + print(json.dumps(response.model_dump(), indent=2, default=str)) + +except Exception as e: + print(f"\nāŒ Error: {type(e).__name__}: {e}") + import traceback + traceback.print_exc() + diff --git a/tools/web_tools.py b/tools/web_tools.py index e3a65f71..edafc282 100644 --- a/tools/web_tools.py +++ b/tools/web_tools.py @@ -15,7 +15,7 @@ Backend compatibility: - Firecrawl: https://docs.firecrawl.dev/introduction LLM Processing: -- Uses Nous Research API with Gemini 2.5 Flash for intelligent content extraction +- Uses OpenRouter API with Gemini 3 Flash Preview for intelligent content extraction - Extracts key excerpts and creates markdown summaries to reduce token usage Debug Mode: @@ -54,14 +54,14 @@ from openai import AsyncOpenAI # Initialize Firecrawl client once at module level firecrawl_client = Firecrawl(api_key=os.getenv("FIRECRAWL_API_KEY")) -# Initialize Nous Research API client for LLM processing (async) -nous_client = AsyncOpenAI( - api_key=os.getenv("NOUS_API_KEY"), - base_url="https://inference-api.nousresearch.com/v1" +# Initialize OpenRouter API client for LLM processing (async) +summarizer_client = AsyncOpenAI( + api_key=os.getenv("OPENROUTER_API_KEY"), + base_url="https://openrouter.ai/api/v1" ) # Configuration for LLM processing -DEFAULT_SUMMARIZER_MODEL = "gemini-2.5-flash" +DEFAULT_SUMMARIZER_MODEL = "google/gemini-3-flash-preview" DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000 # Debug mode configuration @@ -135,7 +135,7 @@ async def process_content_with_llm( """ Process web content using LLM to create intelligent summaries with key excerpts. - This function uses Gemini 2.5 Flash (or specified model) via Nous Research API + This function uses Gemini 3 Flash Preview (or specified model) via OpenRouter API to intelligently extract key information and create markdown summaries, significantly reducing token usage while preserving all important information. @@ -143,7 +143,7 @@ async def process_content_with_llm( content (str): The raw content to process url (str): The source URL (for context, optional) title (str): The page title (for context, optional) - model (str): The model to use for processing (default: gemini-2.5-flash) + model (str): The model to use for processing (default: google/gemini-3-flash-preview) min_length (int): Minimum content length to trigger processing (default: 5000) Returns: @@ -190,7 +190,7 @@ Create a markdown summary that captures all key information in a well-organized, for attempt in range(max_retries): try: - response = await nous_client.chat.completions.create( + response = await summarizer_client.chat.completions.create( model=model, messages=[ {"role": "system", "content": system_prompt}, @@ -399,7 +399,7 @@ async def web_extract_tool( urls (List[str]): List of URLs to extract content from format (str): Desired output format ("markdown" or "html", optional) use_llm_processing (bool): Whether to process content with LLM for summarization (default: True) - model (str): The model to use for LLM processing (default: gemini-2.5-flash) + model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview) min_length (int): Minimum content length to trigger LLM processing (default: 5000) Returns: @@ -527,57 +527,74 @@ async def web_extract_tool( debug_call_data["original_response_size"] = len(json.dumps(response)) # Process each result with LLM if enabled - if use_llm_processing and os.getenv("NOUS_API_KEY"): - print("🧠 Processing extracted content with LLM...") + if use_llm_processing and os.getenv("OPENROUTER_API_KEY"): + print("🧠 Processing extracted content with LLM (parallel)...") debug_call_data["processing_applied"].append("llm_processing") - for result in response.get('results', []): + # Prepare tasks for parallel processing + async def process_single_result(result): + """Process a single result with LLM and return updated result with metrics.""" url = result.get('url', 'Unknown URL') title = result.get('title', '') raw_content = result.get('raw_content', '') or result.get('content', '') - if raw_content: - original_size = len(raw_content) + if not raw_content: + return result, None, "no_content" + + original_size = len(raw_content) + + # Process content with LLM + processed = await process_content_with_llm( + raw_content, url, title, model, min_length + ) + + if processed: + processed_size = len(processed) + compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - # Process content with LLM - processed = await process_content_with_llm( - raw_content, url, title, model, min_length - ) + # Update result with processed content + result['content'] = processed + result['raw_content'] = raw_content - if processed: - processed_size = len(processed) - compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - - # Capture compression metrics - debug_call_data["compression_metrics"].append({ - "url": url, - "original_size": original_size, - "processed_size": processed_size, - "compression_ratio": compression_ratio, - "model_used": model - }) - - # Replace content with processed version - result['content'] = processed - # Keep raw content in separate field for reference - result['raw_content'] = raw_content - debug_call_data["pages_processed_with_llm"] += 1 - print(f" šŸ“ {url} (processed)") - else: - debug_call_data["compression_metrics"].append({ - "url": url, - "original_size": original_size, - "processed_size": original_size, - "compression_ratio": 1.0, - "model_used": None, - "reason": "content_too_short" - }) - print(f" šŸ“ {url} (no processing - content too short)") + metrics = { + "url": url, + "original_size": original_size, + "processed_size": processed_size, + "compression_ratio": compression_ratio, + "model_used": model + } + return result, metrics, "processed" + else: + metrics = { + "url": url, + "original_size": original_size, + "processed_size": original_size, + "compression_ratio": 1.0, + "model_used": None, + "reason": "content_too_short" + } + return result, metrics, "too_short" + + # Run all LLM processing in parallel + results_list = response.get('results', []) + tasks = [process_single_result(result) for result in results_list] + processed_results = await asyncio.gather(*tasks) + + # Collect metrics and print results + for result, metrics, status in processed_results: + url = result.get('url', 'Unknown URL') + if status == "processed": + debug_call_data["compression_metrics"].append(metrics) + debug_call_data["pages_processed_with_llm"] += 1 + print(f" šŸ“ {url} (processed)") + elif status == "too_short": + debug_call_data["compression_metrics"].append(metrics) + print(f" šŸ“ {url} (no processing - content too short)") else: print(f" āš ļø {url} (no content to process)") else: - if use_llm_processing and not os.getenv("NOUS_API_KEY"): - print("āš ļø LLM processing requested but NOUS_API_KEY not set, returning raw content") + if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"): + print("āš ļø LLM processing requested but OPENROUTER_API_KEY not set, returning raw content") debug_call_data["processing_applied"].append("llm_processing_unavailable") # Print summary of extracted pages for debugging (original behavior) @@ -646,7 +663,7 @@ async def web_crawl_tool( instructions (str): Instructions for what to crawl/extract using LLM intelligence (optional) depth (str): Depth of extraction ("basic" or "advanced", default: "basic") use_llm_processing (bool): Whether to process content with LLM for summarization (default: True) - model (str): The model to use for LLM processing (default: gemini-2.5-flash) + model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview) min_length (int): Minimum content length to trigger LLM processing (default: 5000) Returns: @@ -806,57 +823,74 @@ async def web_crawl_tool( debug_call_data["original_response_size"] = len(json.dumps(response)) # Process each result with LLM if enabled - if use_llm_processing and os.getenv("NOUS_API_KEY"): - print("🧠 Processing crawled content with LLM...") + if use_llm_processing and os.getenv("OPENROUTER_API_KEY"): + print("🧠 Processing crawled content with LLM (parallel)...") debug_call_data["processing_applied"].append("llm_processing") - for result in response.get('results', []): + # Prepare tasks for parallel processing + async def process_single_crawl_result(result): + """Process a single crawl result with LLM and return updated result with metrics.""" page_url = result.get('url', 'Unknown URL') title = result.get('title', '') content = result.get('content', '') - if content: - original_size = len(content) + if not content: + return result, None, "no_content" + + original_size = len(content) + + # Process content with LLM + processed = await process_content_with_llm( + content, page_url, title, model, min_length + ) + + if processed: + processed_size = len(processed) + compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - # Process content with LLM - processed = await process_content_with_llm( - content, page_url, title, model, min_length - ) + # Update result with processed content + result['raw_content'] = content + result['content'] = processed - if processed: - processed_size = len(processed) - compression_ratio = processed_size / original_size if original_size > 0 else 1.0 - - # Capture compression metrics - debug_call_data["compression_metrics"].append({ - "url": page_url, - "original_size": original_size, - "processed_size": processed_size, - "compression_ratio": compression_ratio, - "model_used": model - }) - - # Keep original content in raw_content field - result['raw_content'] = content - # Replace content with processed version - result['content'] = processed - debug_call_data["pages_processed_with_llm"] += 1 - print(f" 🌐 {page_url} (processed)") - else: - debug_call_data["compression_metrics"].append({ - "url": page_url, - "original_size": original_size, - "processed_size": original_size, - "compression_ratio": 1.0, - "model_used": None, - "reason": "content_too_short" - }) - print(f" 🌐 {page_url} (no processing - content too short)") + metrics = { + "url": page_url, + "original_size": original_size, + "processed_size": processed_size, + "compression_ratio": compression_ratio, + "model_used": model + } + return result, metrics, "processed" + else: + metrics = { + "url": page_url, + "original_size": original_size, + "processed_size": original_size, + "compression_ratio": 1.0, + "model_used": None, + "reason": "content_too_short" + } + return result, metrics, "too_short" + + # Run all LLM processing in parallel + results_list = response.get('results', []) + tasks = [process_single_crawl_result(result) for result in results_list] + processed_results = await asyncio.gather(*tasks) + + # Collect metrics and print results + for result, metrics, status in processed_results: + page_url = result.get('url', 'Unknown URL') + if status == "processed": + debug_call_data["compression_metrics"].append(metrics) + debug_call_data["pages_processed_with_llm"] += 1 + print(f" 🌐 {page_url} (processed)") + elif status == "too_short": + debug_call_data["compression_metrics"].append(metrics) + print(f" 🌐 {page_url} (no processing - content too short)") else: print(f" āš ļø {page_url} (no content to process)") else: - if use_llm_processing and not os.getenv("NOUS_API_KEY"): - print("āš ļø LLM processing requested but NOUS_API_KEY not set, returning raw content") + if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"): + print("āš ļø LLM processing requested but OPENROUTER_API_KEY not set, returning raw content") debug_call_data["processing_applied"].append("llm_processing_unavailable") # Print summary of crawled pages for debugging (original behavior) @@ -918,7 +952,7 @@ def check_nous_api_key() -> bool: Returns: bool: True if API key is set, False otherwise """ - return bool(os.getenv("NOUS_API_KEY")) + return bool(os.getenv("OPENROUTER_API_KEY")) def get_debug_session_info() -> Dict[str, Any]: @@ -967,8 +1001,8 @@ if __name__ == "__main__": print("āœ… Firecrawl API key found") if not nous_available: - print("āŒ NOUS_API_KEY environment variable not set") - print("Please set your API key: export NOUS_API_KEY='your-key-here'") + print("āŒ OPENROUTER_API_KEY environment variable not set") + print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'") print("Get API key at: https://inference-api.nousresearch.com/") print("āš ļø Without Nous API key, LLM content processing will be disabled") else: @@ -980,7 +1014,7 @@ if __name__ == "__main__": print("šŸ› ļø Web tools ready for use!") if nous_available: - print("🧠 LLM content processing available with Gemini 2.5 Flash") + print("🧠 LLM content processing available with Gemini 3 Flash Preview via OpenRouter") print(f" Default min length for processing: {DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION} chars") # Show debug mode status @@ -1012,7 +1046,7 @@ if __name__ == "__main__": print(" crawl_data = await web_crawl_tool(") print(" 'docs.python.org',") print(" 'Find key concepts',") - print(" model='gemini-2.5-flash',") + print(" model='google/gemini-3-flash-preview',") print(" min_length=3000") print(" )") print("")