updates for stability and speed

This commit is contained in:
teknium
2026-01-08 08:57:51 +00:00
parent f957ec2267
commit 6af6ff2a0a
5 changed files with 447 additions and 131 deletions

View File

@@ -155,7 +155,8 @@ def _process_single_prompt(
if config.get("verbose"):
print(f" Prompt {prompt_index}: Using toolsets {selected_toolsets}")
# Initialize agent with sampled toolsets
# Initialize agent with sampled toolsets and log prefix for identification
log_prefix = f"[B{batch_num}:P{prompt_index}]"
agent = AIAgent(
base_url=config.get("base_url"),
api_key=config.get("api_key"),
@@ -165,7 +166,12 @@ def _process_single_prompt(
save_trajectories=False, # We handle saving ourselves
verbose_logging=config.get("verbose", False),
ephemeral_system_prompt=config.get("ephemeral_system_prompt"),
log_prefix_chars=config.get("log_prefix_chars", 100)
log_prefix_chars=config.get("log_prefix_chars", 100),
log_prefix=log_prefix,
providers_allowed=config.get("providers_allowed"),
providers_ignored=config.get("providers_ignored"),
providers_order=config.get("providers_order"),
provider_sort=config.get("provider_sort"),
)
# Run the agent with task_id to ensure each task gets its own isolated VM
@@ -326,6 +332,10 @@ class BatchRunner:
verbose: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
):
"""
Initialize the batch runner.
@@ -343,6 +353,10 @@ class BatchRunner:
verbose (bool): Enable verbose logging
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
providers_allowed (List[str]): OpenRouter providers to allow (optional)
providers_ignored (List[str]): OpenRouter providers to ignore (optional)
providers_order (List[str]): OpenRouter providers to try in order (optional)
provider_sort (str): Sort providers by price/throughput/latency (optional)
"""
self.dataset_file = Path(dataset_file)
self.batch_size = batch_size
@@ -356,6 +370,10 @@ class BatchRunner:
self.verbose = verbose
self.ephemeral_system_prompt = ephemeral_system_prompt
self.log_prefix_chars = log_prefix_chars
self.providers_allowed = providers_allowed
self.providers_ignored = providers_ignored
self.providers_order = providers_order
self.provider_sort = provider_sort
# Validate distribution
if not validate_distribution(distribution):
@@ -512,7 +530,11 @@ class BatchRunner:
"api_key": self.api_key,
"verbose": self.verbose,
"ephemeral_system_prompt": self.ephemeral_system_prompt,
"log_prefix_chars": self.log_prefix_chars
"log_prefix_chars": self.log_prefix_chars,
"providers_allowed": self.providers_allowed,
"providers_ignored": self.providers_ignored,
"providers_order": self.providers_order,
"provider_sort": self.provider_sort,
}
# Get completed prompts set
@@ -523,6 +545,8 @@ class BatchRunner:
start_time = time.time()
print(f"\n🔧 Initializing {self.num_workers} worker processes...")
# Process batches in parallel
with Pool(processes=self.num_workers) as pool:
# Create tasks for each batch
@@ -537,6 +561,9 @@ class BatchRunner:
for batch_num, batch_data in enumerate(self.batches)
]
print(f"✅ Created {len(tasks)} batch tasks")
print(f"🚀 Starting parallel batch processing...\n")
# Use map to process batches in parallel
results = pool.map(_process_batch_worker, tasks)
@@ -657,6 +684,10 @@ def main(
list_distributions: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
providers_allowed: str = None,
providers_ignored: str = None,
providers_order: str = None,
provider_sort: str = None,
):
"""
Run batch processing of agent prompts from a dataset.
@@ -676,6 +707,10 @@ def main(
list_distributions (bool): List available toolset distributions and exit
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
providers_allowed (str): Comma-separated list of OpenRouter providers to allow (e.g. "anthropic,openai")
providers_ignored (str): Comma-separated list of OpenRouter providers to ignore (e.g. "together,deepinfra")
providers_order (str): Comma-separated list of OpenRouter providers to try in order (e.g. "anthropic,openai,google")
provider_sort (str): Sort providers by "price", "throughput", or "latency" (OpenRouter only)
Examples:
# Basic usage
@@ -723,6 +758,11 @@ def main(
print("❌ Error: --run_name is required")
return
# Parse provider preferences (comma-separated strings to lists)
providers_allowed_list = [p.strip() for p in providers_allowed.split(",")] if providers_allowed else None
providers_ignored_list = [p.strip() for p in providers_ignored.split(",")] if providers_ignored else None
providers_order_list = [p.strip() for p in providers_order.split(",")] if providers_order else None
# Initialize and run batch runner
try:
runner = BatchRunner(
@@ -737,7 +777,11 @@ def main(
num_workers=num_workers,
verbose=verbose,
ephemeral_system_prompt=ephemeral_system_prompt,
log_prefix_chars=log_prefix_chars
log_prefix_chars=log_prefix_chars,
providers_allowed=providers_allowed_list,
providers_ignored=providers_ignored_list,
providers_order=providers_order_list,
provider_sort=provider_sort,
)
runner.run(resume=resume)

View File

@@ -67,6 +67,11 @@ class AIAgent:
verbose_logging: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
log_prefix: str = "",
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
):
"""
Initialize the AI Agent.
@@ -83,6 +88,11 @@ class AIAgent:
verbose_logging (bool): Enable verbose logging for debugging (default: False)
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "")
providers_allowed (List[str]): OpenRouter providers to allow (optional)
providers_ignored (List[str]): OpenRouter providers to ignore (optional)
providers_order (List[str]): OpenRouter providers to try in order (optional)
provider_sort (str): Sort providers by price/throughput/latency (optional)
"""
self.model = model
self.max_iterations = max_iterations
@@ -91,6 +101,13 @@ class AIAgent:
self.verbose_logging = verbose_logging
self.ephemeral_system_prompt = ephemeral_system_prompt
self.log_prefix_chars = log_prefix_chars
self.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store OpenRouter provider preferences
self.providers_allowed = providers_allowed
self.providers_ignored = providers_ignored
self.providers_order = providers_order
self.provider_sort = provider_sort
# Store toolset filtering options
self.enabled_toolsets = enabled_toolsets
@@ -103,11 +120,13 @@ class AIAgent:
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Keep OpenAI and httpx at INFO level to avoid massive base64 logs
# Even in verbose mode, we don't want to see full request/response bodies
logging.getLogger('openai').setLevel(logging.INFO)
# Keep OpenAI and httpx at WARNING level to reduce noise
# We have our own retry and error logging that's more informative
logging.getLogger('openai').setLevel(logging.WARNING)
logging.getLogger('openai._base_client').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
print("🔍 Verbose logging enabled (OpenAI/httpx request bodies suppressed)")
logging.getLogger('httpcore').setLevel(logging.WARNING)
print("🔍 Verbose logging enabled (OpenAI/httpx internal logs suppressed)")
else:
# Set logging to INFO level for important messages only
logging.basicConfig(
@@ -115,24 +134,40 @@ class AIAgent:
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
# Reduce OpenAI client logging
logging.getLogger('openai').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
# Suppress noisy library logging
logging.getLogger('openai').setLevel(logging.ERROR)
logging.getLogger('openai._base_client').setLevel(logging.ERROR)
logging.getLogger('httpx').setLevel(logging.ERROR)
logging.getLogger('httpcore').setLevel(logging.ERROR)
# Initialize OpenAI client
client_kwargs = {}
if base_url:
client_kwargs["base_url"] = base_url
# Handle API key with multiple fallbacks
if api_key:
client_kwargs["api_key"] = api_key
else:
client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", "dummy-key")
# Try multiple common API key environment variables based on base_url
if base_url and "openrouter" in base_url.lower():
client_kwargs["api_key"] = os.getenv("OPENROUTER_API_KEY", os.getenv("ANTHROPIC_API_KEY", "dummy-key"))
elif base_url and "anthropic" in base_url.lower():
client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", os.getenv("OPENAI_API_KEY", "dummy-key"))
else:
client_kwargs["api_key"] = os.getenv("ANTHROPIC_API_KEY", os.getenv("OPENAI_API_KEY", "dummy-key"))
try:
self.client = OpenAI(**client_kwargs)
print(f"🤖 AI Agent initialized with model: {self.model}")
if base_url:
print(f"🔗 Using custom base URL: {base_url}")
# Always show API key info (masked) for debugging auth issues
key_used = client_kwargs.get("api_key", "none")
if key_used and key_used != "dummy-key" and len(key_used) > 12:
print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}")
else:
print(f"⚠️ Warning: API key appears invalid or missing (got: '{key_used[:20] if key_used else 'none'}...')")
except Exception as e:
raise RuntimeError(f"Failed to initialize OpenAI client: {e}")
@@ -245,8 +280,13 @@ class AIAgent:
if "tool_calls" in msg and msg["tool_calls"]:
# Format assistant message with tool calls
content = ""
# Prepend reasoning in <think> tags if available
if msg.get("reasoning") and msg["reasoning"].strip():
content = f"<think>{msg['reasoning']}</think>"
if msg.get("content") and msg["content"].strip():
content = msg["content"] + "\n"
content += msg["content"] + "\n"
# Add tool calls wrapped in XML tags
for tool_call in msg["tool_calls"]:
@@ -296,9 +336,17 @@ class AIAgent:
else:
# Regular assistant message without tool calls
content = ""
# Prepend reasoning in <think> tags if available
if msg.get("reasoning") and msg["reasoning"].strip():
content = f"<think>{msg['reasoning']}</think>"
content += msg["content"] or ""
trajectory.append({
"from": "gpt",
"value": msg["content"] or ""
"value": content
})
elif msg["role"] == "user":
@@ -388,12 +436,27 @@ class AIAgent:
while api_call_count < self.max_iterations:
api_call_count += 1
print(f"\n🔄 Making API call #{api_call_count}...")
# Prepare messages for API call
# If we have an ephemeral system prompt, prepend it to the messages
api_messages = messages.copy()
if active_system_prompt:
# Insert system message at the beginning
api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages
# Calculate approximate request size for logging
total_chars = sum(len(str(msg)) for msg in api_messages)
approx_tokens = total_chars // 4 # Rough estimate: 4 chars per token
print(f"\n{self.log_prefix}🔄 Making API call #{api_call_count}/{self.max_iterations}...")
print(f"{self.log_prefix} 📊 Request size: {len(api_messages)} messages, ~{approx_tokens:,} tokens (~{total_chars:,} chars)")
print(f"{self.log_prefix} 🔧 Available tools: {len(self.tools) if self.tools else 0}")
# Log request details if verbose
if self.verbose_logging:
logging.debug(f"API Request - Model: {self.model}, Messages: {len(messages)}, Tools: {len(self.tools) if self.tools else 0}")
logging.debug(f"Last message role: {messages[-1]['role'] if messages else 'none'}")
logging.debug(f"Total message size: ~{approx_tokens:,} tokens")
api_start_time = time.time()
retry_count = 0
@@ -401,23 +464,33 @@ class AIAgent:
while retry_count <= max_retries:
try:
# Prepare messages for API call
# If we have an ephemeral system prompt, prepend it to the messages
api_messages = messages.copy()
if active_system_prompt:
# Insert system message at the beginning
api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages
# Build OpenRouter provider preferences if specified
provider_preferences = {}
if self.providers_allowed:
provider_preferences["only"] = self.providers_allowed
if self.providers_ignored:
provider_preferences["ignore"] = self.providers_ignored
if self.providers_order:
provider_preferences["order"] = self.providers_order
if self.provider_sort:
provider_preferences["sort"] = self.provider_sort
# Make API call with tools
response = self.client.chat.completions.create(
model=self.model,
messages=api_messages,
tools=self.tools if self.tools else None,
timeout=60.0 # Add explicit timeout
)
# Make API call with tools - increased timeout for long responses
api_kwargs = {
"model": self.model,
"messages": api_messages,
"tools": self.tools if self.tools else None,
"timeout": 600.0 # 10 minute timeout for very long responses
}
# Add provider preferences for OpenRouter via extra_body
if provider_preferences:
api_kwargs["extra_body"] = {"provider": provider_preferences}
response = self.client.chat.completions.create(**api_kwargs)
api_duration = time.time() - api_start_time
print(f"⏱️ API call completed in {api_duration:.2f}s")
print(f"{self.log_prefix}⏱️ API call completed in {api_duration:.2f}s")
if self.verbose_logging:
logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}")
@@ -426,13 +499,26 @@ class AIAgent:
except Exception as api_error:
retry_count += 1
elapsed_time = time.time() - api_start_time
# Enhanced error logging
error_type = type(api_error).__name__
error_msg = str(api_error)
print(f"{self.log_prefix}⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}")
print(f"{self.log_prefix} ⏱️ Time elapsed before failure: {elapsed_time:.2f}s")
print(f"{self.log_prefix} 📝 Error: {error_msg[:200]}")
print(f"{self.log_prefix} 📊 Request context: {len(api_messages)} messages, ~{approx_tokens:,} tokens, {len(self.tools) if self.tools else 0} tools")
if retry_count > max_retries:
print(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.")
logging.error(f"{self.log_prefix}API call failed after {max_retries} retries. Last error: {api_error}")
logging.error(f"{self.log_prefix}Request details - Messages: {len(api_messages)}, Approx tokens: {approx_tokens:,}")
raise api_error
wait_time = min(2 ** retry_count, 10) # Exponential backoff, max 10s
print(f"⚠️ API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}")
print(f"⏳ Retrying in {wait_time}s...")
logging.warning(f"API retry {retry_count}/{max_retries} after error: {api_error}")
print(f"{self.log_prefix}⏳ Retrying in {wait_time}s...")
logging.warning(f"API retry {retry_count}/{max_retries} after {error_type}: {error_msg[:200]}")
time.sleep(wait_time)
try:
@@ -440,20 +526,28 @@ class AIAgent:
# Handle assistant response
if assistant_message.content:
print(f"🤖 Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}")
print(f"{self.log_prefix}🤖 Assistant: {assistant_message.content[:100]}{'...' if len(assistant_message.content) > 100 else ''}")
# Check for tool calls
if assistant_message.tool_calls:
print(f"🔧 Processing {len(assistant_message.tool_calls)} tool call(s)...")
print(f"{self.log_prefix}🔧 Processing {len(assistant_message.tool_calls)} tool call(s)...")
if self.verbose_logging:
for tc in assistant_message.tool_calls:
logging.debug(f"Tool call: {tc.function.name} with args: {tc.function.arguments[:200]}...")
# Extract reasoning from response if available (for reasoning models like minimax, kimi, etc.)
reasoning_content = None
if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning:
reasoning_content = assistant_message.reasoning
elif hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content:
reasoning_content = assistant_message.reasoning_content
# Add assistant message with tool calls to conversation
messages.append({
"role": "assistant",
"content": assistant_message.content,
"reasoning": reasoning_content, # Store reasoning for trajectory
"tool_calls": [
{
"id": tool_call.id,
@@ -516,10 +610,18 @@ class AIAgent:
# No tool calls - this is the final response
final_response = assistant_message.content or ""
# Extract reasoning from response if available
reasoning_content = None
if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning:
reasoning_content = assistant_message.reasoning
elif hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content:
reasoning_content = assistant_message.reasoning_content
# Add final assistant message
messages.append({
"role": "assistant",
"content": final_response
"content": final_response,
"reasoning": reasoning_content # Store reasoning for trajectory
})
print(f"🎉 Conversation completed after {api_call_count} API call(s)")

12
run_datagen_minimax-3.1.sh Executable file
View File

@@ -0,0 +1,12 @@
python batch_runner.py \
--dataset_file="source-data/hermes-agent-agent-tasks-1/agent_tasks_eval.jsonl" \
--batch_size=50 \
--run_name="megascience_sft_minimax-m2.1-thinking-2-eval" \
--distribution="science" \
--model="minimax/minimax-m2.1" \
--base_url="https://openrouter.ai/api/v1" \
--providers_allowed="minimax" \
--num_workers=1 \
--max_turns=40 \
--verbose \
--ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use the terminal or search tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work. Always pip install any packages you need to use the python scripts you want to run. If you need to use a tool that isn't available, you can use the terminal tool to install or create it in many cases as well. Do not use the terminal tool to communicate with the user, as they cannot see your commands, only your final response after completing the task. Search for at least 3 sources, but not more than 12."

124
test_api_call_reasoning.py Normal file
View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Test script to see how minimax-m2.1 responds to a tool-calling request via OpenRouter.
"""
import os
import json
from pathlib import Path
from openai import OpenAI
from dotenv import load_dotenv
# Load environment variables
env_path = Path(__file__).parent / '.env'
if env_path.exists():
load_dotenv(dotenv_path=env_path)
print(f"✅ Loaded .env from {env_path}")
# Get API key
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
print("❌ OPENROUTER_API_KEY not found in environment")
exit(1)
print(f"🔑 Using API key: {api_key[:12]}...{api_key[-4:]}")
# Initialize client
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key
)
# Define a single simple tool
tools = [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for information on any topic. Returns relevant results with titles and URLs.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to look up on the web"
}
},
"required": ["query"]
}
}
}
]
# Messages
messages = [
{
"role": "system",
"content": "You are a helpful assistant with access to tools. Use the web_search tool when you need to find information."
},
{
"role": "user",
"content": "What is the current price of Bitcoin?"
}
]
print("\n" + "="*60)
print("📤 SENDING REQUEST")
print("="*60)
print(f"Model: minimax/minimax-m2.1")
print(f"Messages: {len(messages)}")
print(f"Tools: {len(tools)}")
print(f"User query: {messages[-1]['content']}")
# Make the request
try:
response = client.chat.completions.create(
model="minimax/minimax-m2.1",
messages=messages,
tools=tools,
extra_body={
"provider": {
"only": ["minimax"]
}
},
timeout=120.0
)
print("\n" + "="*60)
print("📥 RESPONSE RECEIVED")
print("="*60)
# Print raw response info
print(f"\nModel: {response.model}")
print(f"ID: {response.id}")
print(f"Created: {response.created}")
if response.usage:
print(f"\n📊 Usage:")
print(f" Prompt tokens: {response.usage.prompt_tokens}")
print(f" Completion tokens: {response.usage.completion_tokens}")
print(f" Total tokens: {response.usage.total_tokens}")
# Print the message
msg = response.choices[0].message
print(f"\n🤖 Assistant Response:")
print(f" Role: {msg.role}")
print(f" Content: {msg.content}")
print(f" Tool calls: {msg.tool_calls}")
if msg.tool_calls:
print(f"\n🔧 Tool Calls Detail:")
for i, tc in enumerate(msg.tool_calls):
print(f" [{i}] ID: {tc.id}")
print(f" Function: {tc.function.name}")
print(f" Arguments: {tc.function.arguments}")
# Print full raw response as JSON
print("\n" + "="*60)
print("📝 RAW RESPONSE (JSON)")
print("="*60)
print(json.dumps(response.model_dump(), indent=2, default=str))
except Exception as e:
print(f"\n❌ Error: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()

View File

@@ -15,7 +15,7 @@ Backend compatibility:
- Firecrawl: https://docs.firecrawl.dev/introduction
LLM Processing:
- Uses Nous Research API with Gemini 2.5 Flash for intelligent content extraction
- Uses OpenRouter API with Gemini 3 Flash Preview for intelligent content extraction
- Extracts key excerpts and creates markdown summaries to reduce token usage
Debug Mode:
@@ -54,14 +54,14 @@ from openai import AsyncOpenAI
# Initialize Firecrawl client once at module level
firecrawl_client = Firecrawl(api_key=os.getenv("FIRECRAWL_API_KEY"))
# Initialize Nous Research API client for LLM processing (async)
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
# Initialize OpenRouter API client for LLM processing (async)
summarizer_client = AsyncOpenAI(
api_key=os.getenv("OPENROUTER_API_KEY"),
base_url="https://openrouter.ai/api/v1"
)
# Configuration for LLM processing
DEFAULT_SUMMARIZER_MODEL = "gemini-2.5-flash"
DEFAULT_SUMMARIZER_MODEL = "google/gemini-3-flash-preview"
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
# Debug mode configuration
@@ -135,7 +135,7 @@ async def process_content_with_llm(
"""
Process web content using LLM to create intelligent summaries with key excerpts.
This function uses Gemini 2.5 Flash (or specified model) via Nous Research API
This function uses Gemini 3 Flash Preview (or specified model) via OpenRouter API
to intelligently extract key information and create markdown summaries,
significantly reducing token usage while preserving all important information.
@@ -143,7 +143,7 @@ async def process_content_with_llm(
content (str): The raw content to process
url (str): The source URL (for context, optional)
title (str): The page title (for context, optional)
model (str): The model to use for processing (default: gemini-2.5-flash)
model (str): The model to use for processing (default: google/gemini-3-flash-preview)
min_length (int): Minimum content length to trigger processing (default: 5000)
Returns:
@@ -190,7 +190,7 @@ Create a markdown summary that captures all key information in a well-organized,
for attempt in range(max_retries):
try:
response = await nous_client.chat.completions.create(
response = await summarizer_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
@@ -399,7 +399,7 @@ async def web_extract_tool(
urls (List[str]): List of URLs to extract content from
format (str): Desired output format ("markdown" or "html", optional)
use_llm_processing (bool): Whether to process content with LLM for summarization (default: True)
model (str): The model to use for LLM processing (default: gemini-2.5-flash)
model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview)
min_length (int): Minimum content length to trigger LLM processing (default: 5000)
Returns:
@@ -527,57 +527,74 @@ async def web_extract_tool(
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled
if use_llm_processing and os.getenv("NOUS_API_KEY"):
print("🧠 Processing extracted content with LLM...")
if use_llm_processing and os.getenv("OPENROUTER_API_KEY"):
print("🧠 Processing extracted content with LLM (parallel)...")
debug_call_data["processing_applied"].append("llm_processing")
for result in response.get('results', []):
# Prepare tasks for parallel processing
async def process_single_result(result):
"""Process a single result with LLM and return updated result with metrics."""
url = result.get('url', 'Unknown URL')
title = result.get('title', '')
raw_content = result.get('raw_content', '') or result.get('content', '')
if raw_content:
original_size = len(raw_content)
if not raw_content:
return result, None, "no_content"
original_size = len(raw_content)
# Process content with LLM
processed = await process_content_with_llm(
raw_content, url, title, model, min_length
)
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Process content with LLM
processed = await process_content_with_llm(
raw_content, url, title, model, min_length
)
# Update result with processed content
result['content'] = processed
result['raw_content'] = raw_content
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Capture compression metrics
debug_call_data["compression_metrics"].append({
"url": url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
})
# Replace content with processed version
result['content'] = processed
# Keep raw content in separate field for reference
result['raw_content'] = raw_content
debug_call_data["pages_processed_with_llm"] += 1
print(f" 📝 {url} (processed)")
else:
debug_call_data["compression_metrics"].append({
"url": url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
})
print(f" 📝 {url} (no processing - content too short)")
metrics = {
"url": url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
}
return result, metrics, "processed"
else:
metrics = {
"url": url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
}
return result, metrics, "too_short"
# Run all LLM processing in parallel
results_list = response.get('results', [])
tasks = [process_single_result(result) for result in results_list]
processed_results = await asyncio.gather(*tasks)
# Collect metrics and print results
for result, metrics, status in processed_results:
url = result.get('url', 'Unknown URL')
if status == "processed":
debug_call_data["compression_metrics"].append(metrics)
debug_call_data["pages_processed_with_llm"] += 1
print(f" 📝 {url} (processed)")
elif status == "too_short":
debug_call_data["compression_metrics"].append(metrics)
print(f" 📝 {url} (no processing - content too short)")
else:
print(f" ⚠️ {url} (no content to process)")
else:
if use_llm_processing and not os.getenv("NOUS_API_KEY"):
print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content")
if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"):
print("⚠️ LLM processing requested but OPENROUTER_API_KEY not set, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of extracted pages for debugging (original behavior)
@@ -646,7 +663,7 @@ async def web_crawl_tool(
instructions (str): Instructions for what to crawl/extract using LLM intelligence (optional)
depth (str): Depth of extraction ("basic" or "advanced", default: "basic")
use_llm_processing (bool): Whether to process content with LLM for summarization (default: True)
model (str): The model to use for LLM processing (default: gemini-2.5-flash)
model (str): The model to use for LLM processing (default: google/gemini-3-flash-preview)
min_length (int): Minimum content length to trigger LLM processing (default: 5000)
Returns:
@@ -806,57 +823,74 @@ async def web_crawl_tool(
debug_call_data["original_response_size"] = len(json.dumps(response))
# Process each result with LLM if enabled
if use_llm_processing and os.getenv("NOUS_API_KEY"):
print("🧠 Processing crawled content with LLM...")
if use_llm_processing and os.getenv("OPENROUTER_API_KEY"):
print("🧠 Processing crawled content with LLM (parallel)...")
debug_call_data["processing_applied"].append("llm_processing")
for result in response.get('results', []):
# Prepare tasks for parallel processing
async def process_single_crawl_result(result):
"""Process a single crawl result with LLM and return updated result with metrics."""
page_url = result.get('url', 'Unknown URL')
title = result.get('title', '')
content = result.get('content', '')
if content:
original_size = len(content)
if not content:
return result, None, "no_content"
original_size = len(content)
# Process content with LLM
processed = await process_content_with_llm(
content, page_url, title, model, min_length
)
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Process content with LLM
processed = await process_content_with_llm(
content, page_url, title, model, min_length
)
# Update result with processed content
result['raw_content'] = content
result['content'] = processed
if processed:
processed_size = len(processed)
compression_ratio = processed_size / original_size if original_size > 0 else 1.0
# Capture compression metrics
debug_call_data["compression_metrics"].append({
"url": page_url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
})
# Keep original content in raw_content field
result['raw_content'] = content
# Replace content with processed version
result['content'] = processed
debug_call_data["pages_processed_with_llm"] += 1
print(f" 🌐 {page_url} (processed)")
else:
debug_call_data["compression_metrics"].append({
"url": page_url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
})
print(f" 🌐 {page_url} (no processing - content too short)")
metrics = {
"url": page_url,
"original_size": original_size,
"processed_size": processed_size,
"compression_ratio": compression_ratio,
"model_used": model
}
return result, metrics, "processed"
else:
metrics = {
"url": page_url,
"original_size": original_size,
"processed_size": original_size,
"compression_ratio": 1.0,
"model_used": None,
"reason": "content_too_short"
}
return result, metrics, "too_short"
# Run all LLM processing in parallel
results_list = response.get('results', [])
tasks = [process_single_crawl_result(result) for result in results_list]
processed_results = await asyncio.gather(*tasks)
# Collect metrics and print results
for result, metrics, status in processed_results:
page_url = result.get('url', 'Unknown URL')
if status == "processed":
debug_call_data["compression_metrics"].append(metrics)
debug_call_data["pages_processed_with_llm"] += 1
print(f" 🌐 {page_url} (processed)")
elif status == "too_short":
debug_call_data["compression_metrics"].append(metrics)
print(f" 🌐 {page_url} (no processing - content too short)")
else:
print(f" ⚠️ {page_url} (no content to process)")
else:
if use_llm_processing and not os.getenv("NOUS_API_KEY"):
print("⚠️ LLM processing requested but NOUS_API_KEY not set, returning raw content")
if use_llm_processing and not os.getenv("OPENROUTER_API_KEY"):
print("⚠️ LLM processing requested but OPENROUTER_API_KEY not set, returning raw content")
debug_call_data["processing_applied"].append("llm_processing_unavailable")
# Print summary of crawled pages for debugging (original behavior)
@@ -918,7 +952,7 @@ def check_nous_api_key() -> bool:
Returns:
bool: True if API key is set, False otherwise
"""
return bool(os.getenv("NOUS_API_KEY"))
return bool(os.getenv("OPENROUTER_API_KEY"))
def get_debug_session_info() -> Dict[str, Any]:
@@ -967,8 +1001,8 @@ if __name__ == "__main__":
print("✅ Firecrawl API key found")
if not nous_available:
print("NOUS_API_KEY environment variable not set")
print("Please set your API key: export NOUS_API_KEY='your-key-here'")
print("OPENROUTER_API_KEY environment variable not set")
print("Please set your API key: export OPENROUTER_API_KEY='your-key-here'")
print("Get API key at: https://inference-api.nousresearch.com/")
print("⚠️ Without Nous API key, LLM content processing will be disabled")
else:
@@ -980,7 +1014,7 @@ if __name__ == "__main__":
print("🛠️ Web tools ready for use!")
if nous_available:
print("🧠 LLM content processing available with Gemini 2.5 Flash")
print("🧠 LLM content processing available with Gemini 3 Flash Preview via OpenRouter")
print(f" Default min length for processing: {DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION} chars")
# Show debug mode status
@@ -1012,7 +1046,7 @@ if __name__ == "__main__":
print(" crawl_data = await web_crawl_tool(")
print(" 'docs.python.org',")
print(" 'Find key concepts',")
print(" model='gemini-2.5-flash',")
print(" model='google/gemini-3-flash-preview',")
print(" min_length=3000")
print(" )")
print("")