diff --git a/cli.py b/cli.py index 5ebd46843..a326c93db 100755 --- a/cli.py +++ b/cli.py @@ -1319,32 +1319,68 @@ class HermesCLI: else: _cprint(f" {_DIM}(._.) No image found in clipboard{_RST}") - def _build_multimodal_content(self, text: str, images: list) -> list: - """Convert text + image paths into OpenAI vision multimodal content. + def _preprocess_images_with_vision(self, text: str, images: list) -> str: + """Analyze attached images via the vision tool and return enriched text. - Returns a list of content parts suitable for the ``content`` field - of a ``user`` message. + Instead of embedding raw base64 ``image_url`` content parts in the + conversation (which only works with vision-capable models), this + pre-processes each image through the auxiliary vision model (Gemini + Flash) and prepends the descriptions to the user's message — the + same approach the messaging gateway uses. + + The local file path is included so the agent can re-examine the + image later with ``vision_analyze`` if needed. """ - import base64 as _b64 + import asyncio as _asyncio + import json as _json + from tools.vision_tools import vision_analyze_tool - content_parts = [] - text_part = text if isinstance(text, str) and text else "What do you see in this image?" - content_parts.append({"type": "text", "text": text_part}) + analysis_prompt = ( + "Describe everything visible in this image in thorough detail. " + "Include any text, code, data, objects, people, layout, colors, " + "and any other notable visual information." + ) - _MIME = { - "png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", - "gif": "image/gif", "webp": "image/webp", - } + enriched_parts = [] for img_path in images: - if img_path.exists(): - data = _b64.b64encode(img_path.read_bytes()).decode() - ext = img_path.suffix.lower().lstrip(".") - mime = _MIME.get(ext, "image/png") - content_parts.append({ - "type": "image_url", - "image_url": {"url": f"data:{mime};base64,{data}"} - }) - return content_parts + if not img_path.exists(): + continue + size_kb = img_path.stat().st_size // 1024 + _cprint(f" {_DIM}👁️ analyzing {img_path.name} ({size_kb}KB)...{_RST}") + try: + result_json = _asyncio.run( + vision_analyze_tool(image_url=str(img_path), user_prompt=analysis_prompt) + ) + result = _json.loads(result_json) + if result.get("success"): + description = result.get("analysis", "") + enriched_parts.append( + f"[The user attached an image. Here's what it contains:\n{description}]\n" + f"[If you need a closer look, use vision_analyze with " + f"image_url: {img_path}]" + ) + _cprint(f" {_DIM}✓ image analyzed{_RST}") + else: + enriched_parts.append( + f"[The user attached an image but it couldn't be analyzed. " + f"You can try examining it with vision_analyze using " + f"image_url: {img_path}]" + ) + _cprint(f" {_DIM}⚠ vision analysis failed — path included for retry{_RST}") + except Exception as e: + enriched_parts.append( + f"[The user attached an image but analysis failed ({e}). " + f"You can try examining it with vision_analyze using " + f"image_url: {img_path}]" + ) + _cprint(f" {_DIM}⚠ vision analysis error — path included for retry{_RST}") + + # Combine: vision descriptions first, then the user's original text + user_text = text if isinstance(text, str) and text else "" + if enriched_parts: + prefix = "\n\n".join(enriched_parts) + return f"{prefix}\n\n{user_text}" if user_text else prefix + return user_text or "What do you see in this image?" def _show_tool_availability_warnings(self): """Show warnings about disabled tools due to missing API keys.""" @@ -2627,14 +2663,13 @@ class HermesCLI: if not self._init_agent(): return None - # Convert attached images to OpenAI vision multimodal content + # Pre-process images through the vision tool (Gemini Flash) so the + # main model receives text descriptions instead of raw base64 image + # content — works with any model, not just vision-capable ones. if images: - message = self._build_multimodal_content( + message = self._preprocess_images_with_vision( message if isinstance(message, str) else "", images ) - for img_path in images: - if img_path.exists(): - _cprint(f" {_DIM}📎 attached {img_path.name} ({img_path.stat().st_size // 1024}KB){_RST}") # Add user message to history self.conversation_history.append({"role": "user", "content": message}) diff --git a/tests/tools/test_clipboard.py b/tests/tools/test_clipboard.py index 1fb1a39e4..9b7597417 100644 --- a/tests/tools/test_clipboard.py +++ b/tests/tools/test_clipboard.py @@ -602,11 +602,11 @@ class TestHasClipboardImage: # ═════════════════════════════════════════════════════════════════════════ -# Level 2: _build_multimodal_content — image → OpenAI vision format +# Level 2: _preprocess_images_with_vision — image → text via vision tool # ═════════════════════════════════════════════════════════════════════════ -class TestBuildMultimodalContent: - """Test the extracted _build_multimodal_content method directly.""" +class TestPreprocessImagesWithVision: + """Test vision-based image pre-processing for the CLI.""" @pytest.fixture def cli(self): @@ -637,55 +637,81 @@ class TestBuildMultimodalContent: img.write_bytes(content) return img + def _mock_vision_success(self, description="A test image with colored pixels."): + """Return an async mock that simulates a successful vision_analyze_tool call.""" + import json + async def _fake_vision(**kwargs): + return json.dumps({"success": True, "analysis": description}) + return _fake_vision + + def _mock_vision_failure(self): + """Return an async mock that simulates a failed vision_analyze_tool call.""" + import json + async def _fake_vision(**kwargs): + return json.dumps({"success": False, "analysis": "Error"}) + return _fake_vision + def test_single_image_with_text(self, cli, tmp_path): img = self._make_image(tmp_path) - result = cli._build_multimodal_content("Describe this", [img]) + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()): + result = cli._preprocess_images_with_vision("Describe this", [img]) - assert len(result) == 2 - assert result[0] == {"type": "text", "text": "Describe this"} - assert result[1]["type"] == "image_url" - url = result[1]["image_url"]["url"] - assert url.startswith("data:image/png;base64,") - # Verify the base64 actually decodes to our image - b64_data = url.split(",", 1)[1] - assert base64.b64decode(b64_data) == FAKE_PNG + assert isinstance(result, str) + assert "A test image with colored pixels." in result + assert "Describe this" in result + assert str(img) in result + assert "base64," not in result # no raw base64 image content def test_multiple_images(self, cli, tmp_path): imgs = [self._make_image(tmp_path, f"img{i}.png") for i in range(3)] - result = cli._build_multimodal_content("Compare", imgs) - assert len(result) == 4 # 1 text + 3 images - assert all(r["type"] == "image_url" for r in result[1:]) + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()): + result = cli._preprocess_images_with_vision("Compare", imgs) + + assert isinstance(result, str) + assert "Compare" in result + # Each image path should be referenced + for img in imgs: + assert str(img) in result def test_empty_text_gets_default_question(self, cli, tmp_path): img = self._make_image(tmp_path) - result = cli._build_multimodal_content("", [img]) - assert result[0]["text"] == "What do you see in this image?" - - def test_jpeg_mime_type(self, cli, tmp_path): - img = self._make_image(tmp_path, "photo.jpg", b"\xff\xd8\xff\x00" * 20) - result = cli._build_multimodal_content("test", [img]) - assert "image/jpeg" in result[1]["image_url"]["url"] - - def test_webp_mime_type(self, cli, tmp_path): - img = self._make_image(tmp_path, "img.webp", b"RIFF\x00\x00" * 10) - result = cli._build_multimodal_content("test", [img]) - assert "image/webp" in result[1]["image_url"]["url"] - - def test_unknown_extension_defaults_to_png(self, cli, tmp_path): - img = self._make_image(tmp_path, "data.bmp", b"\x00" * 50) - result = cli._build_multimodal_content("test", [img]) - assert "image/png" in result[1]["image_url"]["url"] + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()): + result = cli._preprocess_images_with_vision("", [img]) + assert isinstance(result, str) + assert "A test image with colored pixels." in result def test_missing_image_skipped(self, cli, tmp_path): missing = tmp_path / "gone.png" - result = cli._build_multimodal_content("test", [missing]) - assert len(result) == 1 # only text + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()): + result = cli._preprocess_images_with_vision("test", [missing]) + # No images analyzed, falls back to default + assert result == "test" def test_mix_of_existing_and_missing(self, cli, tmp_path): real = self._make_image(tmp_path, "real.png") missing = tmp_path / "gone.png" - result = cli._build_multimodal_content("test", [real, missing]) - assert len(result) == 2 # text + 1 real image + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()): + result = cli._preprocess_images_with_vision("test", [real, missing]) + assert str(real) in result + assert str(missing) not in result + assert "test" in result + + def test_vision_failure_includes_path(self, cli, tmp_path): + img = self._make_image(tmp_path) + with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_failure()): + result = cli._preprocess_images_with_vision("check this", [img]) + assert isinstance(result, str) + assert str(img) in result # path still included for retry + assert "check this" in result + + def test_vision_exception_includes_path(self, cli, tmp_path): + img = self._make_image(tmp_path) + async def _explode(**kwargs): + raise RuntimeError("API down") + with patch("tools.vision_tools.vision_analyze_tool", side_effect=_explode): + result = cli._preprocess_images_with_vision("check this", [img]) + assert isinstance(result, str) + assert str(img) in result # path still included for retry # ═════════════════════════════════════════════════════════════════════════