Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f8f4678ee4 feat: benchmark local Ollama models against 50 tok/s threshold (#287)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m24s
Add scripts/benchmark_local_models.py — tests all local Ollama models
against the 50 tok/s UX threshold (configurable via --threshold).

Features:
- Auto-discovers all pulled Ollama models or test specific ones
- Configurable rounds, max tokens, threshold
- Per-round timing with prompt_eval/eval token breakdown
- Human-readable table report with PASS/FAIL/ERROR status
- JSON output mode (--json) for CI integration
- Exit code 1 if any model fails threshold

Usage:
  python3 scripts/benchmark_local_models.py                 # all models, 3 rounds
  python3 scripts/benchmark_local_models.py --models qwen2.5:7b  # single model
  python3 scripts/benchmark_local_models.py --json          # CI output
  python3 scripts/benchmark_local_models.py --threshold 30  # custom threshold

Tested: gemma3:1b scores 141.8 tok/s (PASS).

Closes #287
2026-04-13 17:46:53 -04:00
4 changed files with 299 additions and 526 deletions

View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Benchmark local Ollama models against the 50 tok/s UX threshold.
Usage:
python3 scripts/benchmark_local_models.py [--models MODEL1,MODEL2] [--prompt PROMPT] [--rounds N]
python3 scripts/benchmark_local_models.py --all # test all pulled models
python3 scripts/benchmark_local_models.py --json # JSON output for CI
"""
import argparse
import json
import os
import sys
import time
import urllib.request
import urllib.error
from dataclasses import dataclass, asdict
from typing import Optional
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
THRESHOLD_TOK_S = 50.0
BENCHMARK_PROMPT = (
"Explain the difference between TCP and UDP protocols. "
"Cover reliability, ordering, speed, and use cases. "
"Be thorough but concise. Write at least 300 words."
)
@dataclass
class BenchmarkResult:
model: str
size_gb: float
prompt_tokens: int
eval_tokens: int
eval_duration_s: float
tokens_per_second: float
total_duration_s: float
rounds: int
avg_tok_s: float
meets_threshold: bool
error: Optional[str] = None
def get_models() -> list[dict]:
"""List all pulled Ollama models."""
url = f"{OLLAMA_BASE}/api/tags"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
return data.get("models", [])
except Exception as e:
print(f"Error connecting to Ollama at {OLLAMA_BASE}: {e}", file=sys.stderr)
sys.exit(1)
def benchmark_model(model: str, prompt: str, num_predict: int = 512) -> dict:
"""Run a single benchmark generation, return timing stats."""
url = f"{OLLAMA_BASE}/api/generate"
payload = json.dumps({
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": num_predict,
"temperature": 0.1, # low temp for consistent output
},
}).encode()
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
start = time.monotonic()
try:
with urllib.request.urlopen(req, timeout=300) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else str(e)
raise RuntimeError(f"HTTP {e.code}: {body[:200]}")
except Exception as e:
raise RuntimeError(str(e))
elapsed = time.monotonic() - start
prompt_tokens = data.get("prompt_eval_count", 0)
eval_tokens = data.get("eval_count", 0)
eval_duration_ns = data.get("eval_duration", 0)
total_duration_ns = data.get("total_duration", 0)
eval_duration_s = eval_duration_ns / 1e9 if eval_duration_ns else elapsed
total_duration_s = total_duration_ns / 1e9 if total_duration_ns else elapsed
tok_s = eval_tokens / eval_duration_s if eval_duration_s > 0 else 0.0
return {
"prompt_tokens": prompt_tokens,
"eval_tokens": eval_tokens,
"eval_duration_s": round(eval_duration_s, 2),
"total_duration_s": round(total_duration_s, 2),
"tokens_per_second": round(tok_s, 1),
}
def run_benchmark(
model_name: str,
model_size: float,
prompt: str,
rounds: int,
num_predict: int,
threshold: float = 50.0,
) -> BenchmarkResult:
"""Run multiple rounds and compute average."""
results = []
errors = []
for i in range(rounds):
try:
r = benchmark_model(model_name, prompt, num_predict)
results.append(r)
print(f" Round {i+1}/{rounds}: {r['tokens_per_second']} tok/s "
f"({r['eval_tokens']} tokens in {r['eval_duration_s']}s)")
except Exception as e:
errors.append(str(e))
print(f" Round {i+1}/{rounds}: ERROR - {e}")
if not results:
return BenchmarkResult(
model=model_name,
size_gb=model_size,
prompt_tokens=0, eval_tokens=0,
eval_duration_s=0, tokens_per_second=0,
total_duration_s=0, rounds=rounds,
avg_tok_s=0, meets_threshold=False,
error="; ".join(errors),
)
avg_tok_s = sum(r["tokens_per_second"] for r in results) / len(results)
avg_tok_s = round(avg_tok_s, 1)
return BenchmarkResult(
model=model_name,
size_gb=model_size,
prompt_tokens=sum(r["prompt_tokens"] for r in results) // len(results),
eval_tokens=sum(r["eval_tokens"] for r in results) // len(results),
eval_duration_s=round(sum(r["eval_duration_s"] for r in results) / len(results), 2),
tokens_per_second=avg_tok_s,
total_duration_s=round(sum(r["total_duration_s"] for r in results) / len(results), 2),
rounds=len(results),
avg_tok_s=avg_tok_s,
meets_threshold=avg_tok_s >= threshold,
)
def format_report(results: list[BenchmarkResult], threshold: float = 50.0) -> str:
"""Format a human-readable benchmark report."""
lines = []
lines.append("")
lines.append("=" * 72)
lines.append(f" LOCAL MODEL BENCHMARK — {threshold:.0f} tok/s UX Threshold")
lines.append("=" * 72)
lines.append("")
# Summary table
header = f"{'Model':<25} {'Size':>6} {'tok/s':>8} {'Threshold':>10} {'Status':>8}"
lines.append(header)
lines.append("-" * 72)
passed = 0
failed = 0
errors = 0
for r in sorted(results, key=lambda x: x.avg_tok_s, reverse=True):
size_str = f"{r.size_gb:.1f}GB"
tok_s_str = f"{r.avg_tok_s:.1f}"
if r.error:
status = "ERROR"
errors += 1
elif r.meets_threshold:
status = "PASS"
passed += 1
else:
status = "FAIL"
failed += 1
marker = ">" if r.meets_threshold else "X" if r.error else "!"
thresh_str = f">= {threshold:.0f}"
lines.append(f" {marker} {r.model:<23} {size_str:>6} {tok_s_str:>8} {thresh_str:>10} {status:>8}")
lines.append("-" * 72)
lines.append(f" Passed: {passed} | Failed: {failed} | Errors: {errors} | Total: {len(results)}")
lines.append("")
# Detail section for failures
failures = [r for r in results if not r.meets_threshold and not r.error]
if failures:
lines.append(" FAILED MODELS (below threshold):")
for r in sorted(failures, key=lambda x: x.avg_tok_s):
gap = threshold - r.avg_tok_s
lines.append(f" - {r.model}: {r.avg_tok_s:.1f} tok/s "
f"({gap:.1f} tok/s short, {r.eval_tokens} avg tokens/round)")
lines.append("")
error_list = [r for r in results if r.error]
if error_list:
lines.append(" ERRORS:")
for r in error_list:
lines.append(f" - {r.model}: {r.error}")
lines.append("")
# Hardware info
import platform
lines.append(f" Host: {platform.node()} | {platform.system()} {platform.release()}")
lines.append(f" Ollama: {OLLAMA_BASE}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Benchmark local Ollama models vs 50 tok/s threshold")
parser.add_argument("--models", help="Comma-separated model names (default: all)")
parser.add_argument("--prompt", default=BENCHMARK_PROMPT, help="Benchmark prompt")
parser.add_argument("--rounds", type=int, default=3, help="Rounds per model (default: 3)")
parser.add_argument("--tokens", type=int, default=512, help="Max tokens to generate (default: 512)")
parser.add_argument("--json", action="store_true", help="JSON output for CI")
parser.add_argument("--all", action="store_true", help="Test all pulled models")
parser.add_argument("--threshold", type=float, default=THRESHOLD_TOK_S, help="tok/s threshold")
args = parser.parse_args()
threshold = args.threshold
# Get model list
available = get_models()
if not available:
print("No models found. Pull a model first: ollama pull <model>", file=sys.stderr)
sys.exit(1)
if args.models:
names = [m.strip() for m in args.models.split(",")]
models = [m for m in available if m["name"] in names]
missing = set(names) - set(m["name"] for m in models)
if missing:
print(f"Models not found: {', '.join(missing)}", file=sys.stderr)
print(f"Available: {', '.join(m['name'] for m in available)}", file=sys.stderr)
else:
models = available
print(f"Benchmarking {len(models)} model(s) against {threshold} tok/s threshold")
print(f"Ollama: {OLLAMA_BASE} | Rounds: {args.rounds} | Max tokens: {args.tokens}")
print()
results = []
for m in models:
name = m["name"]
size_gb = m.get("size", 0) / (1024**3)
print(f" {name} ({size_gb:.1f}GB):")
result = run_benchmark(name, size_gb, args.prompt, args.rounds, args.tokens, threshold)
results.append(result)
# Output
report = format_report(results, threshold)
if args.json:
output = {
"threshold_tok_s": threshold,
"ollama_base": OLLAMA_BASE,
"rounds": args.rounds,
"results": [asdict(r) for r in results],
"passed": sum(1 for r in results if r.meets_threshold),
"failed": sum(1 for r in results if not r.meets_threshold and not r.error),
"errors": sum(1 for r in results if r.error),
}
print(json.dumps(output, indent=2))
else:
print(report)
# Exit code: 0 if all pass, 1 if any fail/error
if any(not r.meets_threshold or r.error for r in results):
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,182 +0,0 @@
---
name: graphify
description: AST-based codebase knowledge graph for precise code understanding. Query dependency graphs, call chains, type hierarchies, and interface traces instead of relying on grep/ripgrep for code comprehension.
version: 1.0.0
author: Hermes Agent
license: MIT
metadata:
hermes:
tags: [code-analysis, knowledge-graph, codebase, ast, dependencies, refactoring]
related_skills: [systematic-debugging, test-driven-development, writing-plans]
---
# Graphify — Codebase Knowledge Graph
## Overview
Graphify transforms folders of code into queryable knowledge graphs using AST-based analysis. Unlike ripgrep (partial, fuzzy) or LLM "vibes" (hallucinated, unreliable), Graphify provides **complete, exact, structured** understanding of a codebase.
| Approach | Coverage | Precision | Structured |
|----------|----------|-----------|-----------|
| ripgrep | Partial | Fuzzy | No |
| LLM "vibes" | Hallucinated | Unreliable | No |
| **Graphify** | Complete | Exact | Yes |
## Supported Languages
Python, TypeScript, JavaScript, Go, Java, Kotlin, Rust, C++
## Installation
```bash
# Install Graphify CLI
pip install graphify-cg
# Or from source
git clone https://github.com/safishamsi/graphify.git
cd graphify
pip install -e .
```
## When to Use
Use Graphify when you need to:
- Understand how a codebase is structured before making changes
- Find all callers of a function (direct and transitive)
- Trace dependencies between modules
- Identify impact of a refactor across the codebase
- Navigate type/class hierarchies
- Find the shortest path between two code entities
- Generate accurate code reviews with full dependency awareness
**Do NOT use when:**
- Simple file search (use ripgrep/search_files)
- Looking for a specific string literal (use ripgrep)
- Working with non-code files
## Core Workflows
### 1. Initialize a Project
```bash
# Initialize Graphify for a project directory
cd /path/to/project
graphify init
# Index the codebase (builds the knowledge graph)
graphify index
# Index with specific languages only
graphify index --lang python,typescript
```
### 2. Query the Knowledge Graph
```bash
# Natural language query
graphify query "What services call database methods?"
# Find all dependencies of a module
graphify deps UserService
# Find all callers of a function
graphify callers main
graphify callers "DatabaseService.save"
# Trace a call chain
graphify trace "APIHandler.process_request" --depth 5
# Find shortest path between two nodes
graphify path "APIHandler" "DatabaseConnection"
```
### 3. Structured Output (for LLM consumption)
```bash
# JSON output — pipe directly to LLM
graphify query "What depends on the auth module?" --format json
# Streaming JSON for large results
graphify deps LargeModule --format json --stream
```
### 4. Git Integration
```bash
# Enable auto-refresh on git operations
graphify hooks install
# Manual refresh after changes
graphify refresh
```
## Hermes Agent Integration
### Code Understanding Before Changes
Before modifying code, use Graphify to understand the full picture:
```bash
# 1. Understand what you're touching
graphify deps "module_name" --format json
# 2. Find all callers (who depends on this?)
graphify callers "function_name" --format json
# 3. Check refactoring safety
graphify path "entry_point" "target_module" --format json
```
### Inject into System Prompt
For complex tasks, inject the codebase structure into the agent's context:
```bash
# Get a structural overview
graphify query "Give me a high-level overview of the project structure" --format json
```
Then include the JSON output in the system message or user message to ground the agent's understanding.
### With Test-Driven Development
Use Graphify to understand test impact:
```bash
# What code does this test exercise?
graphify callers "TestAuth.test_login" --depth 3 --format json
# What tests cover this function?
graphify callers "AuthService.authenticate" --format json | grep test
```
### With Systematic Debugging
Trace bugs through the call chain:
```bash
# Where does this error originate?
graphify trace "APIHandler.handle_request" --format json
# What calls this failing function?
graphify callers "PaymentService.charge" --depth 5
```
## Tips
- **Run `graphify index` after pulling changes** — the graph goes stale otherwise
- **Use `--format json` for agent integration** — structured output is easier to consume
- **Combine with `search_files`** — use Graphify for structure, ripgrep for content
- **Cache is project-local** — each project has its own graph, no cross-contamination
- **Large repos?** Index only the directories you're working in: `graphify index --path src/core`
## Troubleshooting
| Problem | Solution |
|---------|----------|
| "graphify: command not found" | `pip install graphify-cg` |
| "No graph found" | Run `graphify init && graphify index` first |
| Stale results | Run `graphify refresh` |
| Language not supported | Check `graphify --help` for supported list |
| Slow indexing | Use `--path` to limit scope |

View File

@@ -1,246 +0,0 @@
"""Tests for Browser Use provider — anti-detect, CAPTCHA, profiles, persistence."""
from unittest.mock import patch, MagicMock
import pytest
class TestBrowserUseProviderConfig:
"""Test configuration resolution for Browser Use provider."""
def test_not_configured_without_key(self, monkeypatch):
monkeypatch.delenv("BROWSER_USE_API_KEY", raising=False)
from tools.browser_providers.browser_use import BrowserUseProvider
provider = BrowserUseProvider()
assert provider.is_configured() is False
def test_configured_with_api_key(self, monkeypatch):
monkeypatch.setenv("BROWSER_USE_API_KEY", "test-key-123")
from tools.browser_providers.browser_use import BrowserUseProvider
provider = BrowserUseProvider()
assert provider.is_configured() is True
def test_provider_name(self):
from tools.browser_providers.browser_use import BrowserUseProvider
assert BrowserUseProvider().provider_name() == "Browser Use"
class TestBrowserUseFeatures:
"""Test feature configuration and payload assembly."""
def _make_provider(self, monkeypatch, **env):
defaults = {
"BROWSER_USE_API_KEY": "test-key",
"BROWSER_USE_ANTI_DETECT": "true",
"BROWSER_USE_CAPTCHA_SOLVING": "true",
"BROWSER_USE_PERSISTENT_LOGIN": "false",
"BROWSER_USE_PROFILE_NAME": "",
"BROWSER_USE_PROXY_COUNTRY": "us",
}
defaults.update(env)
for k, v in defaults.items():
if v is None:
monkeypatch.delenv(k, raising=False)
else:
monkeypatch.setenv(k, v)
# Reimport to pick up new env values
import importlib
import tools.browser_providers.browser_use as mod
importlib.reload(mod)
return mod.BrowserUseProvider()
def test_features_all_enabled(self, monkeypatch):
provider = self._make_provider(
monkeypatch,
BROWSER_USE_ANTI_DETECT="true",
BROWSER_USE_CAPTCHA_SOLVING="true",
BROWSER_USE_PERSISTENT_LOGIN="true",
BROWSER_USE_PROFILE_NAME="my-profile",
)
features = provider._build_features()
assert features["anti_detect"] is True
assert features["captcha_solving"] is True
assert features["persistent_login"] is True
assert features["profile"] is True
assert features["proxy_country"] == "us"
def test_features_all_disabled(self, monkeypatch):
provider = self._make_provider(
monkeypatch,
BROWSER_USE_ANTI_DETECT="false",
BROWSER_USE_CAPTCHA_SOLVING="false",
BROWSER_USE_PERSISTENT_LOGIN="false",
BROWSER_USE_PROFILE_NAME="",
)
features = provider._build_features()
assert features["anti_detect"] is False
assert features["captcha_solving"] is False
assert features["persistent_login"] is False
assert features["profile"] is False
def test_payload_includes_anti_detect(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_ANTI_DETECT="true")
payload = provider._build_create_payload(managed_mode=False)
assert payload.get("antiDetect") is True
def test_payload_excludes_anti_detect_when_disabled(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_ANTI_DETECT="false")
payload = provider._build_create_payload(managed_mode=False)
assert "antiDetect" not in payload
def test_payload_includes_captcha(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_CAPTCHA_SOLVING="true")
payload = provider._build_create_payload(managed_mode=False)
assert payload.get("captchaSolving") is True
def test_payload_includes_persistent_login(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_PERSISTENT_LOGIN="true")
payload = provider._build_create_payload(managed_mode=False)
assert payload.get("keepCookies") is True
def test_payload_includes_profile(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_PROFILE_NAME="work-account")
payload = provider._build_create_payload(managed_mode=False)
assert payload.get("profileName") == "work-account"
def test_payload_includes_proxy_country(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_PROXY_COUNTRY="de")
payload = provider._build_create_payload(managed_mode=False)
assert payload.get("proxyCountryCode") == "de"
def test_managed_mode_payload(self, monkeypatch):
provider = self._make_provider(monkeypatch)
payload = provider._build_create_payload(managed_mode=True)
assert payload["timeout"] == 5
assert payload["proxyCountryCode"] == "us"
def test_empty_profile_excluded(self, monkeypatch):
provider = self._make_provider(monkeypatch, BROWSER_USE_PROFILE_NAME="")
payload = provider._build_create_payload(managed_mode=False)
assert "profileName" not in payload
class TestBrowserUseSessionCreation:
"""Test session creation with mocked HTTP."""
def _make_provider(self, monkeypatch):
monkeypatch.setenv("BROWSER_USE_API_KEY", "test-key")
import importlib
import tools.browser_providers.browser_use as mod
importlib.reload(mod)
return mod.BrowserUseProvider()
def test_create_session_returns_features(self, monkeypatch):
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.ok = True
mock_response.json.return_value = {
"id": "sess-123",
"cdpUrl": "wss://cdp.browser-use.com/session/123",
}
mock_response.headers = {}
with patch("tools.browser_providers.browser_use.requests.post", return_value=mock_response):
result = provider.create_session("task-1")
assert result["bb_session_id"] == "sess-123"
assert result["features"]["browser_use"] is True
assert result["features"]["anti_detect"] is True
assert result["features"]["captcha_solving"] is True
def test_create_session_sends_payload(self, monkeypatch):
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.ok = True
mock_response.json.return_value = {"id": "sess-456", "cdpUrl": "wss://cdp.example.com"}
mock_response.headers = {}
captured_payload = {}
def mock_post(url, **kwargs):
captured_payload.update(kwargs.get("json", {}))
return mock_response
with patch("tools.browser_providers.browser_use.requests.post", side_effect=mock_post):
provider.create_session("task-2")
assert captured_payload.get("antiDetect") is True
assert captured_payload.get("captchaSolving") is True
def test_create_session_with_profile(self, monkeypatch):
monkeypatch.setenv("BROWSER_USE_PROFILE_NAME", "shared-team")
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.ok = True
mock_response.json.return_value = {"id": "sess-789", "cdpUrl": "wss://cdp.example.com"}
mock_response.headers = {}
captured_payload = {}
def mock_post(url, **kwargs):
captured_payload.update(kwargs.get("json", {}))
return mock_response
with patch("tools.browser_providers.browser_use.requests.post", side_effect=mock_post):
provider.create_session("task-3")
assert captured_payload.get("profileName") == "shared-team"
def test_create_session_error_raises(self, monkeypatch):
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.ok = False
mock_response.status_code = 401
mock_response.text = "Unauthorized"
with patch("tools.browser_providers.browser_use.requests.post", return_value=mock_response):
with pytest.raises(RuntimeError, match="Failed to create Browser Use session"):
provider.create_session("task-err")
class TestBrowserUseSessionCleanup:
"""Test session cleanup."""
def _make_provider(self, monkeypatch):
monkeypatch.setenv("BROWSER_USE_API_KEY", "test-key")
import importlib
import tools.browser_providers.browser_use as mod
importlib.reload(mod)
return mod.BrowserUseProvider()
def test_close_session_success(self, monkeypatch):
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.status_code = 200
with patch("tools.browser_providers.browser_use.requests.patch", return_value=mock_response):
result = provider.close_session("sess-123")
assert result is True
def test_close_session_failure(self, monkeypatch):
provider = self._make_provider(monkeypatch)
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
with patch("tools.browser_providers.browser_use.requests.patch", return_value=mock_response):
result = provider.close_session("sess-bad")
assert result is False
def test_emergency_cleanup_no_error_on_failure(self, monkeypatch):
provider = self._make_provider(monkeypatch)
with patch("tools.browser_providers.browser_use.requests.patch", side_effect=Exception("network down")):
# Should not raise
provider.emergency_cleanup("sess-xyz")

View File

@@ -1,8 +1,4 @@
"""Browser Use cloud browser provider.
Enhanced with anti-detect profiles, CAPTCHA solving, persistent logins,
and profile management per Issue #262.
"""
"""Browser Use cloud browser provider."""
import logging
import os
@@ -25,17 +21,6 @@ _DEFAULT_MANAGED_TIMEOUT_MINUTES = 5
_DEFAULT_MANAGED_PROXY_COUNTRY_CODE = "us"
# ---------------------------------------------------------------------------
# Config helpers
# ---------------------------------------------------------------------------
def _env_bool(key: str, default: bool = False) -> bool:
val = os.environ.get(key)
if val is None:
return default
return val.lower() in ("1", "true", "yes", "on")
def _get_or_create_pending_create_key(task_id: str) -> str:
with _pending_create_keys_lock:
existing = _pending_create_keys.get(task_id)
@@ -76,28 +61,7 @@ def _should_preserve_pending_create_key(response: requests.Response) -> bool:
class BrowserUseProvider(CloudBrowserProvider):
"""Browser Use (https://browser-use.com) cloud browser backend.
Supports anti-detect profiles, CAPTCHA solving, persistent logins,
and named profile management. Configuration via env vars:
- ``BROWSER_USE_API_KEY`` — direct API key
- ``BROWSER_USE_ANTI_DETECT`` — enable anti-detect fingerprinting (default: true)
- ``BROWSER_USE_CAPTCHA_SOLVING`` — enable CAPTCHA auto-solving (default: true)
- ``BROWSER_USE_PERSISTENT_LOGIN`` — persist cookies across sessions (default: false)
- ``BROWSER_USE_PROFILE_NAME`` — named profile for cookie jar isolation
- ``BROWSER_USE_PROXY_COUNTRY`` — proxy country code override (default: us)
"""
# Feature config snapshot — read once at import so runtime env mutation
# cannot silently change an in-flight session's capabilities.
_cfg_anti_detect: bool = _env_bool("BROWSER_USE_ANTI_DETECT", True)
_cfg_captcha: bool = _env_bool("BROWSER_USE_CAPTCHA_SOLVING", True)
_cfg_persistent: bool = _env_bool("BROWSER_USE_PERSISTENT_LOGIN", False)
_cfg_profile: str = os.environ.get("BROWSER_USE_PROFILE_NAME", "")
_cfg_proxy_country: str = os.environ.get(
"BROWSER_USE_PROXY_COUNTRY", _DEFAULT_MANAGED_PROXY_COUNTRY_CODE
)
"""Browser Use (https://browser-use.com) cloud browser backend."""
def provider_name(self) -> str:
return "Browser Use"
@@ -142,56 +106,6 @@ class BrowserUseProvider(CloudBrowserProvider):
raise ValueError(message)
return config
# ------------------------------------------------------------------
# Feature / payload assembly
# ------------------------------------------------------------------
def _build_features(self) -> Dict[str, Any]:
"""Return a dict describing which features are active."""
return {
"browser_use": True,
"anti_detect": self._cfg_anti_detect,
"captcha_solving": self._cfg_captcha,
"persistent_login": self._cfg_persistent,
"profile": bool(self._cfg_profile),
"proxy_country": self._cfg_proxy_country,
}
def _build_create_payload(self, managed_mode: bool) -> Dict[str, Any]:
"""Build the session creation payload with all configured features."""
payload: Dict[str, Any] = {}
if managed_mode:
payload["timeout"] = _DEFAULT_MANAGED_TIMEOUT_MINUTES
payload["proxyCountryCode"] = self._cfg_proxy_country
elif self._cfg_proxy_country:
payload["proxyCountryCode"] = self._cfg_proxy_country
# Anti-detect fingerprinting — Browser Use v3 uses
# the antiDetect field to enable browser fingerprint spoofing
# so sites see a real human browser, not an automation tool.
if self._cfg_anti_detect:
payload["antiDetect"] = True
# CAPTCHA solving — Browser Use handles reCAPTCHA, hCaptcha,
# Cloudflare Turnstile, and custom CAPTCHAs automatically.
if self._cfg_captcha:
payload["captchaSolving"] = True
# Persistent login — preserves cookies across sessions when
# a named profile is provided, enabling sites like Gmail, Twitter,
# etc. to stay logged in between agent runs.
if self._cfg_persistent:
payload["keepCookies"] = True
# Named profile — isolates cookie jar and browser state under
# a specific profile name. Multiple agents can share a profile
# or use isolated profiles for different accounts.
if self._cfg_profile:
payload["profileName"] = self._cfg_profile
return payload
# ------------------------------------------------------------------
# Session lifecycle
# ------------------------------------------------------------------
@@ -211,7 +125,17 @@ class BrowserUseProvider(CloudBrowserProvider):
if managed_mode:
headers["X-Idempotency-Key"] = _get_or_create_pending_create_key(task_id)
payload = self._build_create_payload(managed_mode)
# Keep gateway-backed sessions short so billing authorization does not
# default to a long Browser-Use timeout when Hermes only needs a task-
# scoped ephemeral browser.
payload = (
{
"timeout": _DEFAULT_MANAGED_TIMEOUT_MINUTES,
"proxyCountryCode": _DEFAULT_MANAGED_PROXY_COUNTRY_CODE,
}
if managed_mode
else {}
)
response = requests.post(
f"{config['base_url']}/browsers",
@@ -234,14 +158,7 @@ class BrowserUseProvider(CloudBrowserProvider):
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
external_call_id = response.headers.get("x-external-call-id") if managed_mode else None
logger.info(
"Created Browser Use session %s [anti_detect=%s captcha=%s persistent=%s profile=%s]",
session_name,
self._cfg_anti_detect,
self._cfg_captcha,
self._cfg_persistent,
self._cfg_profile or "(none)",
)
logger.info("Created Browser Use session %s", session_name)
cdp_url = session_data.get("cdpUrl") or session_data.get("connectUrl") or ""
@@ -249,7 +166,7 @@ class BrowserUseProvider(CloudBrowserProvider):
"session_name": session_name,
"bb_session_id": session_data["id"],
"cdp_url": cdp_url,
"features": self._build_features(),
"features": {"browser_use": True},
"external_call_id": external_call_id,
}