diff --git a/tools/browser_harness.py b/tools/browser_harness.py new file mode 100644 index 000000000..d02806778 --- /dev/null +++ b/tools/browser_harness.py @@ -0,0 +1,322 @@ +""" +Self-Healing Browser CDP Layer — browser-harness. + +Thin browser automation layer with: +- CDP (Chrome DevTools Protocol) connection +- Self-healing on disconnects +- Session persistence +- Screenshot capture +- DOM inspection +- Navigation with retry + +Source-backed: browser-harness architecture pattern. +""" + +import json +import logging +import time +import subprocess +import socket +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, List +from pathlib import Path + +logger = logging.getLogger(__name__) + + +@dataclass +class BrowserSession: + """Browser session state.""" + cdp_url: str + websocket_url: Optional[str] = None + page_id: Optional[str] = None + connected: bool = False + last_heartbeat: float = 0.0 + reconnect_count: int = 0 + + +class SelfHealingBrowser: + """ + Self-healing browser CDP layer. + + Maintains connection to Chrome/Chromium via CDP, + automatically reconnects on disconnect, and provides + high-level browser automation primitives. + """ + + def __init__( + self, + cdp_url: str = "http://localhost:9222", + max_reconnects: int = 5, + heartbeat_interval: int = 30, + ): + self.cdp_url = cdp_url + self.max_reconnects = max_reconnects + self.heartbeat_interval = heartbeat_interval + self.session = BrowserSession(cdp_url=cdp_url) + self._ws = None + + def connect(self) -> bool: + """Connect to Chrome CDP.""" + try: + import websocket + + # Get WebSocket URL from CDP + import urllib.request + resp = urllib.request.urlopen(f"{self.cdp_url}/json/version") + data = json.loads(resp.read()) + + ws_url = data.get("webSocketDebuggerUrl") + if not ws_url: + logger.error("No WebSocket URL from CDP") + return False + + self.session.websocket_url = ws_url + self._ws = websocket.create_connection(ws_url) + self.session.connected = True + self.session.last_heartbeat = time.time() + + logger.info("Connected to CDP: %s", ws_url) + return True + + except Exception as e: + logger.error("Failed to connect to CDP: %s", e) + self.session.connected = False + return False + + def disconnect(self): + """Disconnect from CDP.""" + if self._ws: + try: + self._ws.close() + except: + pass + self._ws = None + self.session.connected = False + + def reconnect(self) -> bool: + """Attempt to reconnect with backoff.""" + if self.session.reconnect_count >= self.max_reconnects: + logger.error("Max reconnects (%d) reached", self.max_reconnects) + return False + + self.disconnect() + + # Exponential backoff + wait = 2 ** self.session.reconnect_count + logger.info("Reconnecting in %ds (attempt %d/%d)", + wait, self.session.reconnect_count + 1, self.max_reconnects) + time.sleep(wait) + + self.session.reconnect_count += 1 + + if self.connect(): + self.session.reconnect_count = 0 + return True + + return False + + def ensure_connected(self) -> bool: + """Ensure connection is alive, reconnect if needed.""" + if self.session.connected and self._ws: + return True + return self.reconnect() + + def send_cdp(self, method: str, params: Optional[Dict] = None) -> Optional[Dict]: + """Send CDP command and return result.""" + if not self.ensure_connected(): + return None + + try: + msg = { + "id": int(time.time() * 1000), + "method": method, + "params": params or {}, + } + + self._ws.send(json.dumps(msg)) + response = json.loads(self._ws.recv()) + + if "error" in response: + logger.error("CDP error: %s", response["error"]) + return None + + return response.get("result") + + except Exception as e: + logger.error("CDP command failed: %s", e) + self.session.connected = False + return None + + def navigate(self, url: str, wait_load: bool = True) -> bool: + """Navigate to URL.""" + result = self.send_cdp("Page.navigate", {"url": url}) + if not result: + return False + + if wait_load: + time.sleep(2) # Simple wait; could use Page.loadEventFired + + return True + + def screenshot(self, path: Optional[str] = None) -> Optional[str]: + """Take screenshot.""" + result = self.send_cdp("Page.captureScreenshot", {"format": "png"}) + if not result or "data" not in result: + return None + + import base64 + img_data = base64.b64decode(result["data"]) + + if path: + with open(path, "wb") as f: + f.write(img_data) + return path + else: + # Save to temp + import tempfile + tmp = tempfile.NamedTemporaryFile(suffix=".png", delete=False) + tmp.write(img_data) + tmp.close() + return tmp.name + + def get_dom(self) -> Optional[str]: + """Get page HTML.""" + result = self.send_cdp("Runtime.evaluate", { + "expression": "document.documentElement.outerHTML" + }) + if result and "result" in result: + return result["result"].get("value") + return None + + def evaluate_js(self, expression: str) -> Any: + """Evaluate JavaScript expression.""" + result = self.send_cdp("Runtime.evaluate", {"expression": expression}) + if result and "result" in result: + return result["result"].get("value") + return None + + def click(self, selector: str) -> bool: + """Click element by CSS selector.""" + js = f""" + (() => {{ + const el = document.querySelector('{selector}'); + if (el) {{ el.click(); return true; }} + return false; + }})() + """ + return self.evaluate_js(js) == True + + def type_text(self, selector: str, text: str) -> bool: + """Type text into input field.""" + js = f""" + (() => {{ + const el = document.querySelector('{selector}'); + if (el) {{ + el.focus(); + el.value = '{text}'; + el.dispatchEvent(new Event('input', {{ bubbles: true }})); + return true; + }} + return false; + }})() + """ + return self.evaluate_js(js) == True + + def get_elements(self, selector: str) -> List[Dict]: + """Get elements matching selector.""" + js = f""" + (() => {{ + const els = document.querySelectorAll('{selector}'); + return Array.from(els).map(el => ({{ + tag: el.tagName, + text: el.textContent?.substring(0, 100), + id: el.id, + classes: el.className, + }})); + }})() + """ + result = self.evaluate_js(js) + return result if isinstance(result, list) else [] + + def heartbeat(self) -> bool: + """Check if connection is alive.""" + if not self.session.connected: + return False + + result = self.send_cdp("Runtime.evaluate", {"expression": "1+1"}) + if result: + self.session.last_heartbeat = time.time() + return True + + self.session.connected = False + return False + + def __enter__(self): + self.connect() + return self + + def __exit__(self, *args): + self.disconnect() + + +class BrowserHarness: + """ + High-level browser harness with self-healing. + + Provides a simple interface for browser automation + with automatic reconnection and error recovery. + """ + + def __init__(self, cdp_url: str = "http://localhost:9222"): + self.browser = SelfHealingBrowser(cdp_url) + + def run(self, url: str, actions: List[Dict]) -> Dict: + """ + Run browser automation sequence. + + Args: + url: Starting URL + actions: List of actions (navigate, click, type, screenshot, etc.) + + Returns: + Dict with results + """ + results = [] + + with self.browser as b: + # Navigate to URL + if not b.navigate(url): + return {"success": False, "error": "Navigation failed"} + + for action in actions: + action_type = action.get("type") + + if action_type == "screenshot": + path = b.screenshot(action.get("path")) + results.append({"type": "screenshot", "path": path}) + + elif action_type == "click": + success = b.click(action["selector"]) + results.append({"type": "click", "success": success}) + + elif action_type == "type": + success = b.type_text(action["selector"], action["text"]) + results.append({"type": "type", "success": success}) + + elif action_type == "evaluate": + value = b.evaluate_js(action["expression"]) + results.append({"type": "evaluate", "value": value}) + + elif action_type == "wait": + time.sleep(action.get("seconds", 1)) + results.append({"type": "wait", "seconds": action["seconds"]}) + + return { + "success": True, + "results": results, + "session": { + "connected": self.browser.session.connected, + "reconnects": self.browser.session.reconnect_count, + } + }