fix: pre-process CLI clipboard images through vision tool instead of raw embedding
Images pasted in the CLI were embedded as raw base64 image_url content parts in the conversation history, which only works with vision-capable models. If the main model (e.g. Nous API) doesn't support vision, this breaks the request and poisons all subsequent messages. Now the CLI uses the same approach as the messaging gateway: images are pre-processed through the auxiliary vision model (Gemini Flash via OpenRouter or Nous Portal) and converted to text descriptions. The local file path is included so the agent can re-examine via vision_analyze if needed. Works with any model. Fixes #638.
This commit is contained in:
87
cli.py
87
cli.py
@@ -1319,32 +1319,68 @@ class HermesCLI:
|
||||
else:
|
||||
_cprint(f" {_DIM}(._.) No image found in clipboard{_RST}")
|
||||
|
||||
def _build_multimodal_content(self, text: str, images: list) -> list:
|
||||
"""Convert text + image paths into OpenAI vision multimodal content.
|
||||
def _preprocess_images_with_vision(self, text: str, images: list) -> str:
|
||||
"""Analyze attached images via the vision tool and return enriched text.
|
||||
|
||||
Returns a list of content parts suitable for the ``content`` field
|
||||
of a ``user`` message.
|
||||
Instead of embedding raw base64 ``image_url`` content parts in the
|
||||
conversation (which only works with vision-capable models), this
|
||||
pre-processes each image through the auxiliary vision model (Gemini
|
||||
Flash) and prepends the descriptions to the user's message — the
|
||||
same approach the messaging gateway uses.
|
||||
|
||||
The local file path is included so the agent can re-examine the
|
||||
image later with ``vision_analyze`` if needed.
|
||||
"""
|
||||
import base64 as _b64
|
||||
import asyncio as _asyncio
|
||||
import json as _json
|
||||
from tools.vision_tools import vision_analyze_tool
|
||||
|
||||
content_parts = []
|
||||
text_part = text if isinstance(text, str) and text else "What do you see in this image?"
|
||||
content_parts.append({"type": "text", "text": text_part})
|
||||
analysis_prompt = (
|
||||
"Describe everything visible in this image in thorough detail. "
|
||||
"Include any text, code, data, objects, people, layout, colors, "
|
||||
"and any other notable visual information."
|
||||
)
|
||||
|
||||
_MIME = {
|
||||
"png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg",
|
||||
"gif": "image/gif", "webp": "image/webp",
|
||||
}
|
||||
enriched_parts = []
|
||||
for img_path in images:
|
||||
if img_path.exists():
|
||||
data = _b64.b64encode(img_path.read_bytes()).decode()
|
||||
ext = img_path.suffix.lower().lstrip(".")
|
||||
mime = _MIME.get(ext, "image/png")
|
||||
content_parts.append({
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:{mime};base64,{data}"}
|
||||
})
|
||||
return content_parts
|
||||
if not img_path.exists():
|
||||
continue
|
||||
size_kb = img_path.stat().st_size // 1024
|
||||
_cprint(f" {_DIM}👁️ analyzing {img_path.name} ({size_kb}KB)...{_RST}")
|
||||
try:
|
||||
result_json = _asyncio.run(
|
||||
vision_analyze_tool(image_url=str(img_path), user_prompt=analysis_prompt)
|
||||
)
|
||||
result = _json.loads(result_json)
|
||||
if result.get("success"):
|
||||
description = result.get("analysis", "")
|
||||
enriched_parts.append(
|
||||
f"[The user attached an image. Here's what it contains:\n{description}]\n"
|
||||
f"[If you need a closer look, use vision_analyze with "
|
||||
f"image_url: {img_path}]"
|
||||
)
|
||||
_cprint(f" {_DIM}✓ image analyzed{_RST}")
|
||||
else:
|
||||
enriched_parts.append(
|
||||
f"[The user attached an image but it couldn't be analyzed. "
|
||||
f"You can try examining it with vision_analyze using "
|
||||
f"image_url: {img_path}]"
|
||||
)
|
||||
_cprint(f" {_DIM}⚠ vision analysis failed — path included for retry{_RST}")
|
||||
except Exception as e:
|
||||
enriched_parts.append(
|
||||
f"[The user attached an image but analysis failed ({e}). "
|
||||
f"You can try examining it with vision_analyze using "
|
||||
f"image_url: {img_path}]"
|
||||
)
|
||||
_cprint(f" {_DIM}⚠ vision analysis error — path included for retry{_RST}")
|
||||
|
||||
# Combine: vision descriptions first, then the user's original text
|
||||
user_text = text if isinstance(text, str) and text else ""
|
||||
if enriched_parts:
|
||||
prefix = "\n\n".join(enriched_parts)
|
||||
return f"{prefix}\n\n{user_text}" if user_text else prefix
|
||||
return user_text or "What do you see in this image?"
|
||||
|
||||
def _show_tool_availability_warnings(self):
|
||||
"""Show warnings about disabled tools due to missing API keys."""
|
||||
@@ -2627,14 +2663,13 @@ class HermesCLI:
|
||||
if not self._init_agent():
|
||||
return None
|
||||
|
||||
# Convert attached images to OpenAI vision multimodal content
|
||||
# Pre-process images through the vision tool (Gemini Flash) so the
|
||||
# main model receives text descriptions instead of raw base64 image
|
||||
# content — works with any model, not just vision-capable ones.
|
||||
if images:
|
||||
message = self._build_multimodal_content(
|
||||
message = self._preprocess_images_with_vision(
|
||||
message if isinstance(message, str) else "", images
|
||||
)
|
||||
for img_path in images:
|
||||
if img_path.exists():
|
||||
_cprint(f" {_DIM}📎 attached {img_path.name} ({img_path.stat().st_size // 1024}KB){_RST}")
|
||||
|
||||
# Add user message to history
|
||||
self.conversation_history.append({"role": "user", "content": message})
|
||||
|
||||
@@ -602,11 +602,11 @@ class TestHasClipboardImage:
|
||||
|
||||
|
||||
# ═════════════════════════════════════════════════════════════════════════
|
||||
# Level 2: _build_multimodal_content — image → OpenAI vision format
|
||||
# Level 2: _preprocess_images_with_vision — image → text via vision tool
|
||||
# ═════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestBuildMultimodalContent:
|
||||
"""Test the extracted _build_multimodal_content method directly."""
|
||||
class TestPreprocessImagesWithVision:
|
||||
"""Test vision-based image pre-processing for the CLI."""
|
||||
|
||||
@pytest.fixture
|
||||
def cli(self):
|
||||
@@ -637,55 +637,81 @@ class TestBuildMultimodalContent:
|
||||
img.write_bytes(content)
|
||||
return img
|
||||
|
||||
def _mock_vision_success(self, description="A test image with colored pixels."):
|
||||
"""Return an async mock that simulates a successful vision_analyze_tool call."""
|
||||
import json
|
||||
async def _fake_vision(**kwargs):
|
||||
return json.dumps({"success": True, "analysis": description})
|
||||
return _fake_vision
|
||||
|
||||
def _mock_vision_failure(self):
|
||||
"""Return an async mock that simulates a failed vision_analyze_tool call."""
|
||||
import json
|
||||
async def _fake_vision(**kwargs):
|
||||
return json.dumps({"success": False, "analysis": "Error"})
|
||||
return _fake_vision
|
||||
|
||||
def test_single_image_with_text(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path)
|
||||
result = cli._build_multimodal_content("Describe this", [img])
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()):
|
||||
result = cli._preprocess_images_with_vision("Describe this", [img])
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0] == {"type": "text", "text": "Describe this"}
|
||||
assert result[1]["type"] == "image_url"
|
||||
url = result[1]["image_url"]["url"]
|
||||
assert url.startswith("data:image/png;base64,")
|
||||
# Verify the base64 actually decodes to our image
|
||||
b64_data = url.split(",", 1)[1]
|
||||
assert base64.b64decode(b64_data) == FAKE_PNG
|
||||
assert isinstance(result, str)
|
||||
assert "A test image with colored pixels." in result
|
||||
assert "Describe this" in result
|
||||
assert str(img) in result
|
||||
assert "base64," not in result # no raw base64 image content
|
||||
|
||||
def test_multiple_images(self, cli, tmp_path):
|
||||
imgs = [self._make_image(tmp_path, f"img{i}.png") for i in range(3)]
|
||||
result = cli._build_multimodal_content("Compare", imgs)
|
||||
assert len(result) == 4 # 1 text + 3 images
|
||||
assert all(r["type"] == "image_url" for r in result[1:])
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()):
|
||||
result = cli._preprocess_images_with_vision("Compare", imgs)
|
||||
|
||||
assert isinstance(result, str)
|
||||
assert "Compare" in result
|
||||
# Each image path should be referenced
|
||||
for img in imgs:
|
||||
assert str(img) in result
|
||||
|
||||
def test_empty_text_gets_default_question(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path)
|
||||
result = cli._build_multimodal_content("", [img])
|
||||
assert result[0]["text"] == "What do you see in this image?"
|
||||
|
||||
def test_jpeg_mime_type(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path, "photo.jpg", b"\xff\xd8\xff\x00" * 20)
|
||||
result = cli._build_multimodal_content("test", [img])
|
||||
assert "image/jpeg" in result[1]["image_url"]["url"]
|
||||
|
||||
def test_webp_mime_type(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path, "img.webp", b"RIFF\x00\x00" * 10)
|
||||
result = cli._build_multimodal_content("test", [img])
|
||||
assert "image/webp" in result[1]["image_url"]["url"]
|
||||
|
||||
def test_unknown_extension_defaults_to_png(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path, "data.bmp", b"\x00" * 50)
|
||||
result = cli._build_multimodal_content("test", [img])
|
||||
assert "image/png" in result[1]["image_url"]["url"]
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()):
|
||||
result = cli._preprocess_images_with_vision("", [img])
|
||||
assert isinstance(result, str)
|
||||
assert "A test image with colored pixels." in result
|
||||
|
||||
def test_missing_image_skipped(self, cli, tmp_path):
|
||||
missing = tmp_path / "gone.png"
|
||||
result = cli._build_multimodal_content("test", [missing])
|
||||
assert len(result) == 1 # only text
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()):
|
||||
result = cli._preprocess_images_with_vision("test", [missing])
|
||||
# No images analyzed, falls back to default
|
||||
assert result == "test"
|
||||
|
||||
def test_mix_of_existing_and_missing(self, cli, tmp_path):
|
||||
real = self._make_image(tmp_path, "real.png")
|
||||
missing = tmp_path / "gone.png"
|
||||
result = cli._build_multimodal_content("test", [real, missing])
|
||||
assert len(result) == 2 # text + 1 real image
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_success()):
|
||||
result = cli._preprocess_images_with_vision("test", [real, missing])
|
||||
assert str(real) in result
|
||||
assert str(missing) not in result
|
||||
assert "test" in result
|
||||
|
||||
def test_vision_failure_includes_path(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path)
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=self._mock_vision_failure()):
|
||||
result = cli._preprocess_images_with_vision("check this", [img])
|
||||
assert isinstance(result, str)
|
||||
assert str(img) in result # path still included for retry
|
||||
assert "check this" in result
|
||||
|
||||
def test_vision_exception_includes_path(self, cli, tmp_path):
|
||||
img = self._make_image(tmp_path)
|
||||
async def _explode(**kwargs):
|
||||
raise RuntimeError("API down")
|
||||
with patch("tools.vision_tools.vision_analyze_tool", side_effect=_explode):
|
||||
result = cli._preprocess_images_with_vision("check this", [img])
|
||||
assert isinstance(result, str)
|
||||
assert str(img) in result # path still included for retry
|
||||
|
||||
|
||||
# ═════════════════════════════════════════════════════════════════════════
|
||||
|
||||
Reference in New Issue
Block a user