Source-backed browser automation: - CDP connection with auto-reconnect - Self-healing on disconnects - Screenshot, DOM inspection, JS evaluation - Click, type, navigate primitives - Session persistence Refs #394
323 lines
9.9 KiB
Python
323 lines
9.9 KiB
Python
"""
|
|
Self-Healing Browser CDP Layer — browser-harness.
|
|
|
|
Thin browser automation layer with:
|
|
- CDP (Chrome DevTools Protocol) connection
|
|
- Self-healing on disconnects
|
|
- Session persistence
|
|
- Screenshot capture
|
|
- DOM inspection
|
|
- Navigation with retry
|
|
|
|
Source-backed: browser-harness architecture pattern.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
import subprocess
|
|
import socket
|
|
from dataclasses import dataclass, field
|
|
from typing import Optional, Dict, Any, List
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class BrowserSession:
|
|
"""Browser session state."""
|
|
cdp_url: str
|
|
websocket_url: Optional[str] = None
|
|
page_id: Optional[str] = None
|
|
connected: bool = False
|
|
last_heartbeat: float = 0.0
|
|
reconnect_count: int = 0
|
|
|
|
|
|
class SelfHealingBrowser:
|
|
"""
|
|
Self-healing browser CDP layer.
|
|
|
|
Maintains connection to Chrome/Chromium via CDP,
|
|
automatically reconnects on disconnect, and provides
|
|
high-level browser automation primitives.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
cdp_url: str = "http://localhost:9222",
|
|
max_reconnects: int = 5,
|
|
heartbeat_interval: int = 30,
|
|
):
|
|
self.cdp_url = cdp_url
|
|
self.max_reconnects = max_reconnects
|
|
self.heartbeat_interval = heartbeat_interval
|
|
self.session = BrowserSession(cdp_url=cdp_url)
|
|
self._ws = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to Chrome CDP."""
|
|
try:
|
|
import websocket
|
|
|
|
# Get WebSocket URL from CDP
|
|
import urllib.request
|
|
resp = urllib.request.urlopen(f"{self.cdp_url}/json/version")
|
|
data = json.loads(resp.read())
|
|
|
|
ws_url = data.get("webSocketDebuggerUrl")
|
|
if not ws_url:
|
|
logger.error("No WebSocket URL from CDP")
|
|
return False
|
|
|
|
self.session.websocket_url = ws_url
|
|
self._ws = websocket.create_connection(ws_url)
|
|
self.session.connected = True
|
|
self.session.last_heartbeat = time.time()
|
|
|
|
logger.info("Connected to CDP: %s", ws_url)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to connect to CDP: %s", e)
|
|
self.session.connected = False
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from CDP."""
|
|
if self._ws:
|
|
try:
|
|
self._ws.close()
|
|
except:
|
|
pass
|
|
self._ws = None
|
|
self.session.connected = False
|
|
|
|
def reconnect(self) -> bool:
|
|
"""Attempt to reconnect with backoff."""
|
|
if self.session.reconnect_count >= self.max_reconnects:
|
|
logger.error("Max reconnects (%d) reached", self.max_reconnects)
|
|
return False
|
|
|
|
self.disconnect()
|
|
|
|
# Exponential backoff
|
|
wait = 2 ** self.session.reconnect_count
|
|
logger.info("Reconnecting in %ds (attempt %d/%d)",
|
|
wait, self.session.reconnect_count + 1, self.max_reconnects)
|
|
time.sleep(wait)
|
|
|
|
self.session.reconnect_count += 1
|
|
|
|
if self.connect():
|
|
self.session.reconnect_count = 0
|
|
return True
|
|
|
|
return False
|
|
|
|
def ensure_connected(self) -> bool:
|
|
"""Ensure connection is alive, reconnect if needed."""
|
|
if self.session.connected and self._ws:
|
|
return True
|
|
return self.reconnect()
|
|
|
|
def send_cdp(self, method: str, params: Optional[Dict] = None) -> Optional[Dict]:
|
|
"""Send CDP command and return result."""
|
|
if not self.ensure_connected():
|
|
return None
|
|
|
|
try:
|
|
msg = {
|
|
"id": int(time.time() * 1000),
|
|
"method": method,
|
|
"params": params or {},
|
|
}
|
|
|
|
self._ws.send(json.dumps(msg))
|
|
response = json.loads(self._ws.recv())
|
|
|
|
if "error" in response:
|
|
logger.error("CDP error: %s", response["error"])
|
|
return None
|
|
|
|
return response.get("result")
|
|
|
|
except Exception as e:
|
|
logger.error("CDP command failed: %s", e)
|
|
self.session.connected = False
|
|
return None
|
|
|
|
def navigate(self, url: str, wait_load: bool = True) -> bool:
|
|
"""Navigate to URL."""
|
|
result = self.send_cdp("Page.navigate", {"url": url})
|
|
if not result:
|
|
return False
|
|
|
|
if wait_load:
|
|
time.sleep(2) # Simple wait; could use Page.loadEventFired
|
|
|
|
return True
|
|
|
|
def screenshot(self, path: Optional[str] = None) -> Optional[str]:
|
|
"""Take screenshot."""
|
|
result = self.send_cdp("Page.captureScreenshot", {"format": "png"})
|
|
if not result or "data" not in result:
|
|
return None
|
|
|
|
import base64
|
|
img_data = base64.b64decode(result["data"])
|
|
|
|
if path:
|
|
with open(path, "wb") as f:
|
|
f.write(img_data)
|
|
return path
|
|
else:
|
|
# Save to temp
|
|
import tempfile
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
|
|
tmp.write(img_data)
|
|
tmp.close()
|
|
return tmp.name
|
|
|
|
def get_dom(self) -> Optional[str]:
|
|
"""Get page HTML."""
|
|
result = self.send_cdp("Runtime.evaluate", {
|
|
"expression": "document.documentElement.outerHTML"
|
|
})
|
|
if result and "result" in result:
|
|
return result["result"].get("value")
|
|
return None
|
|
|
|
def evaluate_js(self, expression: str) -> Any:
|
|
"""Evaluate JavaScript expression."""
|
|
result = self.send_cdp("Runtime.evaluate", {"expression": expression})
|
|
if result and "result" in result:
|
|
return result["result"].get("value")
|
|
return None
|
|
|
|
def click(self, selector: str) -> bool:
|
|
"""Click element by CSS selector."""
|
|
js = f"""
|
|
(() => {{
|
|
const el = document.querySelector('{selector}');
|
|
if (el) {{ el.click(); return true; }}
|
|
return false;
|
|
}})()
|
|
"""
|
|
return self.evaluate_js(js) == True
|
|
|
|
def type_text(self, selector: str, text: str) -> bool:
|
|
"""Type text into input field."""
|
|
js = f"""
|
|
(() => {{
|
|
const el = document.querySelector('{selector}');
|
|
if (el) {{
|
|
el.focus();
|
|
el.value = '{text}';
|
|
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
|
|
return true;
|
|
}}
|
|
return false;
|
|
}})()
|
|
"""
|
|
return self.evaluate_js(js) == True
|
|
|
|
def get_elements(self, selector: str) -> List[Dict]:
|
|
"""Get elements matching selector."""
|
|
js = f"""
|
|
(() => {{
|
|
const els = document.querySelectorAll('{selector}');
|
|
return Array.from(els).map(el => ({{
|
|
tag: el.tagName,
|
|
text: el.textContent?.substring(0, 100),
|
|
id: el.id,
|
|
classes: el.className,
|
|
}}));
|
|
}})()
|
|
"""
|
|
result = self.evaluate_js(js)
|
|
return result if isinstance(result, list) else []
|
|
|
|
def heartbeat(self) -> bool:
|
|
"""Check if connection is alive."""
|
|
if not self.session.connected:
|
|
return False
|
|
|
|
result = self.send_cdp("Runtime.evaluate", {"expression": "1+1"})
|
|
if result:
|
|
self.session.last_heartbeat = time.time()
|
|
return True
|
|
|
|
self.session.connected = False
|
|
return False
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
self.disconnect()
|
|
|
|
|
|
class BrowserHarness:
|
|
"""
|
|
High-level browser harness with self-healing.
|
|
|
|
Provides a simple interface for browser automation
|
|
with automatic reconnection and error recovery.
|
|
"""
|
|
|
|
def __init__(self, cdp_url: str = "http://localhost:9222"):
|
|
self.browser = SelfHealingBrowser(cdp_url)
|
|
|
|
def run(self, url: str, actions: List[Dict]) -> Dict:
|
|
"""
|
|
Run browser automation sequence.
|
|
|
|
Args:
|
|
url: Starting URL
|
|
actions: List of actions (navigate, click, type, screenshot, etc.)
|
|
|
|
Returns:
|
|
Dict with results
|
|
"""
|
|
results = []
|
|
|
|
with self.browser as b:
|
|
# Navigate to URL
|
|
if not b.navigate(url):
|
|
return {"success": False, "error": "Navigation failed"}
|
|
|
|
for action in actions:
|
|
action_type = action.get("type")
|
|
|
|
if action_type == "screenshot":
|
|
path = b.screenshot(action.get("path"))
|
|
results.append({"type": "screenshot", "path": path})
|
|
|
|
elif action_type == "click":
|
|
success = b.click(action["selector"])
|
|
results.append({"type": "click", "success": success})
|
|
|
|
elif action_type == "type":
|
|
success = b.type_text(action["selector"], action["text"])
|
|
results.append({"type": "type", "success": success})
|
|
|
|
elif action_type == "evaluate":
|
|
value = b.evaluate_js(action["expression"])
|
|
results.append({"type": "evaluate", "value": value})
|
|
|
|
elif action_type == "wait":
|
|
time.sleep(action.get("seconds", 1))
|
|
results.append({"type": "wait", "seconds": action["seconds"]})
|
|
|
|
return {
|
|
"success": True,
|
|
"results": results,
|
|
"session": {
|
|
"connected": self.browser.session.connected,
|
|
"reconnects": self.browser.session.reconnect_count,
|
|
}
|
|
}
|