feat(#394): Self-healing browser CDP layer (browser-harness)
Source-backed browser automation: - CDP connection with auto-reconnect - Self-healing on disconnects - Screenshot, DOM inspection, JS evaluation - Click, type, navigate primitives - Session persistence Refs #394
This commit is contained in:
322
tools/browser_harness.py
Normal file
322
tools/browser_harness.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
Self-Healing Browser CDP Layer — browser-harness.
|
||||
|
||||
Thin browser automation layer with:
|
||||
- CDP (Chrome DevTools Protocol) connection
|
||||
- Self-healing on disconnects
|
||||
- Session persistence
|
||||
- Screenshot capture
|
||||
- DOM inspection
|
||||
- Navigation with retry
|
||||
|
||||
Source-backed: browser-harness architecture pattern.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import subprocess
|
||||
import socket
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional, Dict, Any, List
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BrowserSession:
|
||||
"""Browser session state."""
|
||||
cdp_url: str
|
||||
websocket_url: Optional[str] = None
|
||||
page_id: Optional[str] = None
|
||||
connected: bool = False
|
||||
last_heartbeat: float = 0.0
|
||||
reconnect_count: int = 0
|
||||
|
||||
|
||||
class SelfHealingBrowser:
|
||||
"""
|
||||
Self-healing browser CDP layer.
|
||||
|
||||
Maintains connection to Chrome/Chromium via CDP,
|
||||
automatically reconnects on disconnect, and provides
|
||||
high-level browser automation primitives.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cdp_url: str = "http://localhost:9222",
|
||||
max_reconnects: int = 5,
|
||||
heartbeat_interval: int = 30,
|
||||
):
|
||||
self.cdp_url = cdp_url
|
||||
self.max_reconnects = max_reconnects
|
||||
self.heartbeat_interval = heartbeat_interval
|
||||
self.session = BrowserSession(cdp_url=cdp_url)
|
||||
self._ws = None
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""Connect to Chrome CDP."""
|
||||
try:
|
||||
import websocket
|
||||
|
||||
# Get WebSocket URL from CDP
|
||||
import urllib.request
|
||||
resp = urllib.request.urlopen(f"{self.cdp_url}/json/version")
|
||||
data = json.loads(resp.read())
|
||||
|
||||
ws_url = data.get("webSocketDebuggerUrl")
|
||||
if not ws_url:
|
||||
logger.error("No WebSocket URL from CDP")
|
||||
return False
|
||||
|
||||
self.session.websocket_url = ws_url
|
||||
self._ws = websocket.create_connection(ws_url)
|
||||
self.session.connected = True
|
||||
self.session.last_heartbeat = time.time()
|
||||
|
||||
logger.info("Connected to CDP: %s", ws_url)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Failed to connect to CDP: %s", e)
|
||||
self.session.connected = False
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from CDP."""
|
||||
if self._ws:
|
||||
try:
|
||||
self._ws.close()
|
||||
except:
|
||||
pass
|
||||
self._ws = None
|
||||
self.session.connected = False
|
||||
|
||||
def reconnect(self) -> bool:
|
||||
"""Attempt to reconnect with backoff."""
|
||||
if self.session.reconnect_count >= self.max_reconnects:
|
||||
logger.error("Max reconnects (%d) reached", self.max_reconnects)
|
||||
return False
|
||||
|
||||
self.disconnect()
|
||||
|
||||
# Exponential backoff
|
||||
wait = 2 ** self.session.reconnect_count
|
||||
logger.info("Reconnecting in %ds (attempt %d/%d)",
|
||||
wait, self.session.reconnect_count + 1, self.max_reconnects)
|
||||
time.sleep(wait)
|
||||
|
||||
self.session.reconnect_count += 1
|
||||
|
||||
if self.connect():
|
||||
self.session.reconnect_count = 0
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def ensure_connected(self) -> bool:
|
||||
"""Ensure connection is alive, reconnect if needed."""
|
||||
if self.session.connected and self._ws:
|
||||
return True
|
||||
return self.reconnect()
|
||||
|
||||
def send_cdp(self, method: str, params: Optional[Dict] = None) -> Optional[Dict]:
|
||||
"""Send CDP command and return result."""
|
||||
if not self.ensure_connected():
|
||||
return None
|
||||
|
||||
try:
|
||||
msg = {
|
||||
"id": int(time.time() * 1000),
|
||||
"method": method,
|
||||
"params": params or {},
|
||||
}
|
||||
|
||||
self._ws.send(json.dumps(msg))
|
||||
response = json.loads(self._ws.recv())
|
||||
|
||||
if "error" in response:
|
||||
logger.error("CDP error: %s", response["error"])
|
||||
return None
|
||||
|
||||
return response.get("result")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("CDP command failed: %s", e)
|
||||
self.session.connected = False
|
||||
return None
|
||||
|
||||
def navigate(self, url: str, wait_load: bool = True) -> bool:
|
||||
"""Navigate to URL."""
|
||||
result = self.send_cdp("Page.navigate", {"url": url})
|
||||
if not result:
|
||||
return False
|
||||
|
||||
if wait_load:
|
||||
time.sleep(2) # Simple wait; could use Page.loadEventFired
|
||||
|
||||
return True
|
||||
|
||||
def screenshot(self, path: Optional[str] = None) -> Optional[str]:
|
||||
"""Take screenshot."""
|
||||
result = self.send_cdp("Page.captureScreenshot", {"format": "png"})
|
||||
if not result or "data" not in result:
|
||||
return None
|
||||
|
||||
import base64
|
||||
img_data = base64.b64decode(result["data"])
|
||||
|
||||
if path:
|
||||
with open(path, "wb") as f:
|
||||
f.write(img_data)
|
||||
return path
|
||||
else:
|
||||
# Save to temp
|
||||
import tempfile
|
||||
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
|
||||
tmp.write(img_data)
|
||||
tmp.close()
|
||||
return tmp.name
|
||||
|
||||
def get_dom(self) -> Optional[str]:
|
||||
"""Get page HTML."""
|
||||
result = self.send_cdp("Runtime.evaluate", {
|
||||
"expression": "document.documentElement.outerHTML"
|
||||
})
|
||||
if result and "result" in result:
|
||||
return result["result"].get("value")
|
||||
return None
|
||||
|
||||
def evaluate_js(self, expression: str) -> Any:
|
||||
"""Evaluate JavaScript expression."""
|
||||
result = self.send_cdp("Runtime.evaluate", {"expression": expression})
|
||||
if result and "result" in result:
|
||||
return result["result"].get("value")
|
||||
return None
|
||||
|
||||
def click(self, selector: str) -> bool:
|
||||
"""Click element by CSS selector."""
|
||||
js = f"""
|
||||
(() => {{
|
||||
const el = document.querySelector('{selector}');
|
||||
if (el) {{ el.click(); return true; }}
|
||||
return false;
|
||||
}})()
|
||||
"""
|
||||
return self.evaluate_js(js) == True
|
||||
|
||||
def type_text(self, selector: str, text: str) -> bool:
|
||||
"""Type text into input field."""
|
||||
js = f"""
|
||||
(() => {{
|
||||
const el = document.querySelector('{selector}');
|
||||
if (el) {{
|
||||
el.focus();
|
||||
el.value = '{text}';
|
||||
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||||
return true;
|
||||
}}
|
||||
return false;
|
||||
}})()
|
||||
"""
|
||||
return self.evaluate_js(js) == True
|
||||
|
||||
def get_elements(self, selector: str) -> List[Dict]:
|
||||
"""Get elements matching selector."""
|
||||
js = f"""
|
||||
(() => {{
|
||||
const els = document.querySelectorAll('{selector}');
|
||||
return Array.from(els).map(el => ({{
|
||||
tag: el.tagName,
|
||||
text: el.textContent?.substring(0, 100),
|
||||
id: el.id,
|
||||
classes: el.className,
|
||||
}}));
|
||||
}})()
|
||||
"""
|
||||
result = self.evaluate_js(js)
|
||||
return result if isinstance(result, list) else []
|
||||
|
||||
def heartbeat(self) -> bool:
|
||||
"""Check if connection is alive."""
|
||||
if not self.session.connected:
|
||||
return False
|
||||
|
||||
result = self.send_cdp("Runtime.evaluate", {"expression": "1+1"})
|
||||
if result:
|
||||
self.session.last_heartbeat = time.time()
|
||||
return True
|
||||
|
||||
self.session.connected = False
|
||||
return False
|
||||
|
||||
def __enter__(self):
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.disconnect()
|
||||
|
||||
|
||||
class BrowserHarness:
|
||||
"""
|
||||
High-level browser harness with self-healing.
|
||||
|
||||
Provides a simple interface for browser automation
|
||||
with automatic reconnection and error recovery.
|
||||
"""
|
||||
|
||||
def __init__(self, cdp_url: str = "http://localhost:9222"):
|
||||
self.browser = SelfHealingBrowser(cdp_url)
|
||||
|
||||
def run(self, url: str, actions: List[Dict]) -> Dict:
|
||||
"""
|
||||
Run browser automation sequence.
|
||||
|
||||
Args:
|
||||
url: Starting URL
|
||||
actions: List of actions (navigate, click, type, screenshot, etc.)
|
||||
|
||||
Returns:
|
||||
Dict with results
|
||||
"""
|
||||
results = []
|
||||
|
||||
with self.browser as b:
|
||||
# Navigate to URL
|
||||
if not b.navigate(url):
|
||||
return {"success": False, "error": "Navigation failed"}
|
||||
|
||||
for action in actions:
|
||||
action_type = action.get("type")
|
||||
|
||||
if action_type == "screenshot":
|
||||
path = b.screenshot(action.get("path"))
|
||||
results.append({"type": "screenshot", "path": path})
|
||||
|
||||
elif action_type == "click":
|
||||
success = b.click(action["selector"])
|
||||
results.append({"type": "click", "success": success})
|
||||
|
||||
elif action_type == "type":
|
||||
success = b.type_text(action["selector"], action["text"])
|
||||
results.append({"type": "type", "success": success})
|
||||
|
||||
elif action_type == "evaluate":
|
||||
value = b.evaluate_js(action["expression"])
|
||||
results.append({"type": "evaluate", "value": value})
|
||||
|
||||
elif action_type == "wait":
|
||||
time.sleep(action.get("seconds", 1))
|
||||
results.append({"type": "wait", "seconds": action["seconds"]})
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"results": results,
|
||||
"session": {
|
||||
"connected": self.browser.session.connected,
|
||||
"reconnects": self.browser.session.reconnect_count,
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user