diff --git a/cli.py b/cli.py index ea7693a98..680b2da35 100755 --- a/cli.py +++ b/cli.py @@ -1113,6 +1113,52 @@ class HermesCLI: self.console.print() + def _try_attach_clipboard_image(self) -> bool: + """Check clipboard for an image and attach it if found. + + Saves the image to ~/.hermes/images/ and appends the path to + ``_attached_images``. Returns True if an image was attached. + """ + from hermes_cli.clipboard import save_clipboard_image + + img_dir = Path.home() / ".hermes" / "images" + self._image_counter += 1 + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + img_path = img_dir / f"clip_{ts}_{self._image_counter}.png" + + if save_clipboard_image(img_path): + self._attached_images.append(img_path) + return True + self._image_counter -= 1 + return False + + def _build_multimodal_content(self, text: str, images: list) -> list: + """Convert text + image paths into OpenAI vision multimodal content. + + Returns a list of content parts suitable for the ``content`` field + of a ``user`` message. + """ + import base64 as _b64 + + content_parts = [] + text_part = text if isinstance(text, str) and text else "What do you see in this image?" + content_parts.append({"type": "text", "text": text_part}) + + _MIME = { + "png": "image/png", "jpg": "image/jpeg", "jpeg": "image/jpeg", + "gif": "image/gif", "webp": "image/webp", + } + for img_path in images: + if img_path.exists(): + data = _b64.b64encode(img_path.read_bytes()).decode() + ext = img_path.suffix.lower().lstrip(".") + mime = _MIME.get(ext, "image/png") + content_parts.append({ + "type": "image_url", + "image_url": {"url": f"data:{mime};base64,{data}"} + }) + return content_parts + def _show_tool_availability_warnings(self): """Show warnings about disabled tools due to missing API keys.""" try: @@ -2164,25 +2210,12 @@ class HermesCLI: # Convert attached images to OpenAI vision multimodal content if images: - import base64 as _b64 - content_parts = [] - text_part = message if isinstance(message, str) else "" - if not text_part: - text_part = "What do you see in this image?" - content_parts.append({"type": "text", "text": text_part}) + message = self._build_multimodal_content( + message if isinstance(message, str) else "", images + ) for img_path in images: if img_path.exists(): - data = _b64.b64encode(img_path.read_bytes()).decode() - ext = img_path.suffix.lower().lstrip(".") - mime = {"png": "image/png", "jpg": "image/jpeg", - "jpeg": "image/jpeg", "gif": "image/gif", - "webp": "image/webp"}.get(ext, "image/png") - content_parts.append({ - "type": "image_url", - "image_url": {"url": f"data:{mime};base64,{data}"} - }) _cprint(f" {_DIM}πŸ“Ž attached {img_path.name} ({img_path.stat().st_size // 1024}KB){_RST}") - message = content_parts # Add user message to history self.conversation_history.append({"role": "user", "content": message}) @@ -2565,29 +2598,10 @@ class HermesCLI: @kb.add(Keys.BracketedPaste, eager=True) def handle_paste(event): - """Handle Cmd+V / Ctrl+V paste β€” detect clipboard images. - - On every paste event, check the system clipboard for image data. - If found, save to ~/.hermes/images/ and attach it to the next - message. Any pasted text is inserted into the buffer normally. - """ - from hermes_cli.clipboard import save_clipboard_image - + """Handle Cmd+V / Ctrl+V paste β€” detect clipboard images.""" pasted_text = event.data or "" - - # Check clipboard for image - img_dir = Path.home() / ".hermes" / "images" - self._image_counter += 1 - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - img_path = img_dir / f"clip_{ts}_{self._image_counter}.png" - - if save_clipboard_image(img_path): - self._attached_images.append(img_path) + if self._try_attach_clipboard_image(): event.app.invalidate() - else: - self._image_counter -= 1 - - # Insert any pasted text normally if pasted_text: event.current_buffer.insert_text(pasted_text) diff --git a/docs/send_file_integration_map.md b/docs/send_file_integration_map.md new file mode 100644 index 000000000..1ef4ed826 --- /dev/null +++ b/docs/send_file_integration_map.md @@ -0,0 +1,344 @@ +# send_file Integration Map β€” Hermes Agent Codebase Deep Dive + +## 1. environments/tool_context.py β€” Base64 File Transfer Implementation + +### upload_file() (lines 153-205) +- Reads local file as raw bytes, base64-encodes to ASCII string +- Creates parent dirs in sandbox via `self.terminal(f"mkdir -p {parent}")` +- **Chunk size:** 60,000 chars (~60KB per shell command) +- **Small files (<=60KB b64):** Single `printf '%s' '{b64}' | base64 -d > {remote_path}` +- **Large files:** Writes chunks to `/tmp/_hermes_upload.b64` via `printf >> append`, then `base64 -d` to target +- **Error handling:** Checks local file exists; returns `{exit_code, output}` +- **Size limits:** No explicit limit, but shell arg limit ~2MB means chunking is necessary for files >~45KB raw +- **No theoretical max** β€” but very large files would be slow (many terminal round trips) + +### download_file() (lines 234-278) +- Runs `base64 {remote_path}` inside sandbox, captures stdout +- Strips output, base64-decodes to raw bytes +- Writes to host filesystem with parent dir creation +- **Error handling:** Checks exit code, empty output, decode errors +- Returns `{success: bool, bytes: int}` or `{success: false, error: str}` +- **Size limit:** Bounded by terminal output buffer (practical limit ~few MB via base64 terminal output) + +### Promotion potential: +- These methods work via `self.terminal()` β€” they're environment-agnostic +- Could be directly lifted into a new tool that operates on the agent's current sandbox +- For send_file, this `download_file()` pattern is the key: it extracts files from sandbox β†’ host + +## 2. tools/environments/base.py β€” BaseEnvironment Interface + +### Current methods: +- `execute(command, cwd, timeout, stdin_data)` β†’ `{output, returncode}` +- `cleanup()` β€” release resources +- `stop()` β€” alias for cleanup +- `_prepare_command()` β€” sudo transformation +- `_build_run_kwargs()` β€” subprocess kwargs +- `_timeout_result()` β€” standard timeout dict + +### What would need to be added for file transfer: +- **Nothing required at this level.** File transfer can be implemented via `execute()` (base64 over terminal, like ToolContext does) or via environment-specific methods. +- Optional: `upload_file(local_path, remote_path)` and `download_file(remote_path, local_path)` methods could be added to BaseEnvironment for optimized per-backend transfers, but the base64-over-terminal approach already works universally. + +## 3. tools/environments/docker.py β€” Docker Container Details + +### Container ID tracking: +- `self._container_id` stored at init from `self._inner.container_id` +- Inner is `minisweagent.environments.docker.DockerEnvironment` +- Container ID is a standard Docker container hash + +### docker cp feasibility: +- **YES**, `docker cp` could be used for optimized file transfer: + - `docker cp {container_id}:{remote_path} {local_path}` (download) + - `docker cp {local_path} {container_id}:{remote_path}` (upload) +- Much faster than base64-over-terminal for large files +- Container ID is directly accessible via `env._container_id` or `env._inner.container_id` + +### Volumes mounted: +- **Persistent mode:** Bind mounts at `~/.hermes/sandboxes/docker/{task_id}/workspace` β†’ `/workspace` and `.../home` β†’ `/root` +- **Ephemeral mode:** tmpfs at `/workspace` (10GB), `/home` (1GB), `/root` (1GB) +- **User volumes:** From `config.yaml docker_volumes` (arbitrary `-v` mounts) +- **Security tmpfs:** `/tmp` (512MB), `/var/tmp` (256MB), `/run` (64MB) + +### Direct host access for persistent mode: +- If persistent, files at `/workspace/foo.txt` are just `~/.hermes/sandboxes/docker/{task_id}/workspace/foo.txt` on host β€” no transfer needed! + +## 4. tools/environments/ssh.py β€” SSH Connection Management + +### Connection management: +- Uses SSH ControlMaster for persistent connection +- Control socket at `/tmp/hermes-ssh/{user}@{host}:{port}.sock` +- ControlPersist=300 (5 min keepalive) +- BatchMode=yes (non-interactive) +- Stores: `self.host`, `self.user`, `self.port`, `self.key_path` + +### SCP/SFTP feasibility: +- **YES**, SCP can piggyback on the ControlMaster socket: + - `scp -o ControlPath={socket} {user}@{host}:{remote} {local}` (download) + - `scp -o ControlPath={socket} {local} {user}@{host}:{remote}` (upload) +- Same SSH key and connection reuse β€” zero additional auth +- Would be much faster than base64-over-terminal for large files + +## 5. tools/environments/modal.py β€” Modal Sandbox Filesystem + +### Filesystem API exposure: +- **Not directly.** The inner `SwerexModalEnvironment` wraps Modal's sandbox +- The sandbox object is accessible at: `env._inner.deployment._sandbox` +- Modal's Python SDK exposes `sandbox.open()` for file I/O β€” but only via async API +- Currently only used for `snapshot_filesystem()` during cleanup +- **Could use:** `sandbox.open(path, "rb")` to read files or `sandbox.open(path, "wb")` to write +- **Alternative:** Base64-over-terminal already works via `execute()` β€” simpler, no SDK dependency + +## 6. gateway/platforms/base.py β€” MEDIA: Tag Flow (Complete) + +### extract_media() (lines 587-620): +- **Pattern:** `MEDIA:\S+` β€” extracts file paths after MEDIA: prefix +- **Voice flag:** `[[audio_as_voice]]` global directive sets `is_voice=True` for all media in message +- Returns `List[Tuple[str, bool]]` (path, is_voice) and cleaned content + +### _process_message_background() media routing (lines 752-786): +- After extracting MEDIA tags, routes by file extension: + - `.ogg .opus .mp3 .wav .m4a` β†’ `send_voice()` + - `.mp4 .mov .avi .mkv .3gp` β†’ `send_video()` + - `.jpg .jpeg .png .webp .gif` β†’ `send_image_file()` + - **Everything else** β†’ `send_document()` +- This routing already supports arbitrary files! + +### send_* method inventory (base class): +- `send(chat_id, content, reply_to, metadata)` β€” ABSTRACT, text +- `send_image(chat_id, image_url, caption, reply_to)` β€” URL-based images +- `send_animation(chat_id, animation_url, caption, reply_to)` β€” GIF animations +- `send_voice(chat_id, audio_path, caption, reply_to)` β€” voice messages +- `send_video(chat_id, video_path, caption, reply_to)` β€” video files +- `send_document(chat_id, file_path, caption, file_name, reply_to)` β€” generic files +- `send_image_file(chat_id, image_path, caption, reply_to)` β€” local image files +- `send_typing(chat_id)` β€” typing indicator +- `edit_message(chat_id, message_id, content)` β€” edit sent messages + +### What's missing: +- **Telegram:** No override for `send_document` or `send_image_file` β€” falls back to text! +- **Discord:** No override for `send_document` β€” falls back to text! +- **WhatsApp:** Has `send_document` and `send_image_file` via bridge β€” COMPLETE. +- The base class defaults just send "πŸ“Ž File: /path" as text β€” useless for actual file delivery. + +## 7. gateway/platforms/telegram.py β€” Send Method Analysis + +### Implemented send methods: +- `send()` β€” MarkdownV2 text with fallback to plain +- `send_voice()` β€” `.ogg`/`.opus` as `send_voice()`, others as `send_audio()` +- `send_image()` β€” URL-based via `send_photo()` +- `send_animation()` β€” GIF via `send_animation()` +- `send_typing()` β€” "typing" chat action +- `edit_message()` β€” edit text messages + +### MISSING: +- **`send_document()` NOT overridden** β€” Need to add `self._bot.send_document(chat_id, document=open(file_path, 'rb'), ...)` +- **`send_image_file()` NOT overridden** β€” Need to add `self._bot.send_photo(chat_id, photo=open(path, 'rb'), ...)` +- **`send_video()` NOT overridden** β€” Need to add `self._bot.send_video(...)` + +## 8. gateway/platforms/discord.py β€” Send Method Analysis + +### Implemented send methods: +- `send()` β€” text messages with chunking +- `send_voice()` β€” discord.File attachment +- `send_image()` β€” downloads URL, creates discord.File attachment +- `send_typing()` β€” channel.typing() +- `edit_message()` β€” edit text messages + +### MISSING: +- **`send_document()` NOT overridden** β€” Need to add discord.File attachment +- **`send_image_file()` NOT overridden** β€” Need to add discord.File from local path +- **`send_video()` NOT overridden** β€” Need to add discord.File attachment + +## 9. gateway/run.py β€” User File Attachment Handling + +### Current attachment flow: +1. **Telegram photos** (line 509-529): Download via `photo.get_file()` β†’ `cache_image_from_bytes()` β†’ vision auto-analysis +2. **Telegram voice** (line 532-541): Download β†’ `cache_audio_from_bytes()` β†’ STT transcription +3. **Telegram audio** (line 542-551): Same pattern +4. **Telegram documents** (line 553-617): Extension validation against `SUPPORTED_DOCUMENT_TYPES`, 20MB limit, content injection for text files +5. **Discord attachments** (line 717-751): Content-type detection, image/audio caching, URL fallback for other types +6. **Gateway run.py** (lines 818-883): Auto-analyzes images with vision, transcribes audio, enriches document messages with context notes + +### Key insight: Files are always cached to host filesystem first, then processed. The agent sees local file paths. + +## 10. tools/terminal_tool.py β€” Terminal Tool & Environment Interaction + +### How it manages environments: +- Global dict `_active_environments: Dict[str, Any]` keyed by task_id +- Per-task creation locks prevent duplicate sandbox creation +- Auto-cleanup thread kills idle environments after `TERMINAL_LIFETIME_SECONDS` +- `_get_env_config()` reads all TERMINAL_* env vars for backend selection +- `_create_environment()` factory creates the right backend type + +### Could send_file piggyback? +- **YES.** send_file needs access to the same environment to extract files from sandboxes. +- It can reuse `_active_environments[task_id]` to get the environment, then: + - Docker: Use `docker cp` via `env._container_id` + - SSH: Use `scp` via `env.control_socket` + - Local: Just read the file directly + - Modal: Use base64-over-terminal via `env.execute()` +- The file_tools.py module already does this with `ShellFileOperations` β€” read_file/write_file/search/patch all share the same env instance. + +## 11. tools/tts_tool.py β€” Working Example of File Delivery + +### Flow: +1. Generate audio file to `~/.hermes/audio_cache/tts_TIMESTAMP.{ogg,mp3}` +2. Return JSON with `media_tag: "MEDIA:/path/to/file"` +3. For Telegram voice: prepend `[[audio_as_voice]]` directive +4. The LLM includes the MEDIA tag in its response text +5. `BasePlatformAdapter._process_message_background()` calls `extract_media()` to find the tag +6. Routes by extension β†’ `send_voice()` for audio files +7. Platform adapter sends the file natively + +### Key pattern: Tool saves file to host β†’ returns MEDIA: path β†’ LLM echoes it β†’ gateway extracts β†’ platform delivers + +## 12. tools/image_generation_tool.py β€” Working Example of Image Delivery + +### Flow: +1. Call FAL.ai API β†’ get image URL +2. Return JSON with `image: "https://fal.media/..."` URL +3. The LLM includes the URL in markdown: `![description](URL)` +4. `BasePlatformAdapter.extract_images()` finds `![alt](url)` patterns +5. Routes through `send_image()` (URL) or `send_animation()` (GIF) +6. Platform downloads and sends natively + +### Key difference from TTS: Images are URL-based, not local files. The gateway downloads at send time. + +--- + +# INTEGRATION MAP: Where send_file Hooks In + +## Architecture Decision: MEDIA: Tag Protocol vs. New Tool + +The MEDIA: tag protocol is already the established pattern for file delivery. Two options: + +### Option A: Pure MEDIA: Tag (Minimal Change) +- No new tool needed +- Agent downloads file from sandbox to host using terminal (base64) +- Saves to known location (e.g., `~/.hermes/file_cache/`) +- Includes `MEDIA:/path` in response text +- Existing routing in `_process_message_background()` handles delivery +- **Problem:** Agent has to manually do base64 dance + know about MEDIA: convention + +### Option B: Dedicated send_file Tool (Recommended) +- New tool that the agent calls with `(file_path, caption?)` +- Tool handles the sandbox β†’ host extraction automatically +- Returns MEDIA: tag that gets routed through existing pipeline +- Much cleaner agent experience + +## Implementation Plan for Option B + +### Files to CREATE: + +1. **`tools/send_file_tool.py`** β€” The new tool + - Accepts: `file_path` (path in sandbox), `caption` (optional) + - Detects environment backend from `_active_environments` + - Extracts file from sandbox: + - **local:** `shutil.copy()` or direct path + - **docker:** `docker cp {container_id}:{path} {local_cache}/` + - **ssh:** `scp -o ControlPath=... {user}@{host}:{path} {local_cache}/` + - **modal:** base64-over-terminal via `env.execute("base64 {path}")` + - Saves to `~/.hermes/file_cache/{uuid}_{filename}` + - Returns: `MEDIA:/cached/path` in response for gateway to pick up + - Register with `registry.register(name="send_file", toolset="file", ...)` + +### Files to MODIFY: + +2. **`gateway/platforms/telegram.py`** β€” Add missing send methods: + ```python + async def send_document(self, chat_id, file_path, caption=None, file_name=None, reply_to=None): + with open(file_path, "rb") as f: + msg = await self._bot.send_document( + chat_id=int(chat_id), document=f, + caption=caption, filename=file_name or os.path.basename(file_path)) + return SendResult(success=True, message_id=str(msg.message_id)) + + async def send_image_file(self, chat_id, image_path, caption=None, reply_to=None): + with open(image_path, "rb") as f: + msg = await self._bot.send_photo(chat_id=int(chat_id), photo=f, caption=caption) + return SendResult(success=True, message_id=str(msg.message_id)) + + async def send_video(self, chat_id, video_path, caption=None, reply_to=None): + with open(video_path, "rb") as f: + msg = await self._bot.send_video(chat_id=int(chat_id), video=f, caption=caption) + return SendResult(success=True, message_id=str(msg.message_id)) + ``` + +3. **`gateway/platforms/discord.py`** β€” Add missing send methods: + ```python + async def send_document(self, chat_id, file_path, caption=None, file_name=None, reply_to=None): + channel = self._client.get_channel(int(chat_id)) or await self._client.fetch_channel(int(chat_id)) + with open(file_path, "rb") as f: + file = discord.File(io.BytesIO(f.read()), filename=file_name or os.path.basename(file_path)) + msg = await channel.send(content=caption, file=file) + return SendResult(success=True, message_id=str(msg.id)) + + async def send_image_file(self, chat_id, image_path, caption=None, reply_to=None): + # Same pattern as send_document with image filename + + async def send_video(self, chat_id, video_path, caption=None, reply_to=None): + # Same pattern, discord renders video attachments inline + ``` + +4. **`toolsets.py`** β€” Add `"send_file"` to `_HERMES_CORE_TOOLS` list + +5. **`agent/prompt_builder.py`** β€” Update platform hints to mention send_file tool + +### Code that can be REUSED (zero rewrite): + +- `BasePlatformAdapter.extract_media()` β€” Already extracts MEDIA: tags +- `BasePlatformAdapter._process_message_background()` β€” Already routes by extension +- `ToolContext.download_file()` β€” Base64-over-terminal extraction pattern +- `tools/terminal_tool.py` _active_environments dict β€” Environment access +- `tools/registry.py` β€” Tool registration infrastructure +- `gateway/platforms/base.py` send_document/send_image_file/send_video signatures β€” Already defined + +### Code that needs to be WRITTEN from scratch: + +1. `tools/send_file_tool.py` (~150 lines): + - File extraction from each environment backend type + - Local file cache management + - Registry registration + +2. Telegram `send_document` + `send_image_file` + `send_video` overrides (~40 lines) +3. Discord `send_document` + `send_image_file` + `send_video` overrides (~50 lines) + +### Total effort: ~240 lines of new code, ~5 lines of config changes + +## Key Environment-Specific Extract Strategies + +| Backend | Extract Method | Speed | Complexity | +|------------|-------------------------------|----------|------------| +| local | shutil.copy / direct path | Instant | None | +| docker | `docker cp container:path .` | Fast | Low | +| docker+vol | Direct host path access | Instant | None | +| ssh | `scp -o ControlPath=...` | Fast | Low | +| modal | base64-over-terminal | Moderate | Medium | +| singularity| Direct path (overlay mount) | Fast | Low | + +## Data Flow Summary + +``` +Agent calls send_file(file_path="/workspace/output.pdf", caption="Here's the report") + β”‚ + β–Ό +send_file_tool.py: + 1. Get environment from _active_environments[task_id] + 2. Detect backend type (docker/ssh/modal/local) + 3. Extract file to ~/.hermes/file_cache/{uuid}_{filename} + 4. Return: '{"success": true, "media_tag": "MEDIA:/home/user/.hermes/file_cache/abc123_output.pdf"}' + β”‚ + β–Ό +LLM includes MEDIA: tag in its response text + β”‚ + β–Ό +BasePlatformAdapter._process_message_background(): + 1. extract_media(response) β†’ finds MEDIA:/path + 2. Checks extension: .pdf β†’ send_document() + 3. Calls platform-specific send_document(chat_id, file_path, caption) + β”‚ + β–Ό +TelegramAdapter.send_document() / DiscordAdapter.send_document(): + Opens file, sends via platform API as native document attachment + User receives downloadable file in chat +``` diff --git a/tests/tools/test_clipboard.py b/tests/tools/test_clipboard.py index 897a9d99a..cbcb1ce50 100644 --- a/tests/tools/test_clipboard.py +++ b/tests/tools/test_clipboard.py @@ -1,15 +1,18 @@ -"""Tests for hermes_cli/clipboard.py β€” clipboard image extraction. +"""Tests for clipboard image paste β€” clipboard extraction, multimodal conversion, +and CLI integration. -Tests clipboard image extraction across platforms, and the CLI-level -multimodal content conversion that turns attached images into OpenAI -vision API format. +Coverage: + hermes_cli/clipboard.py β€” platform-specific image extraction + cli.py β€” _try_attach_clipboard_image, _build_multimodal_content, + image attachment state, queue tuple routing """ import base64 +import queue import subprocess import sys from pathlib import Path -from unittest.mock import patch, MagicMock, call +from unittest.mock import patch, MagicMock, PropertyMock import pytest @@ -20,8 +23,12 @@ from hermes_cli.clipboard import ( _macos_osascript, ) +FAKE_PNG = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 -# ── Platform dispatch ──────────────────────────────────────────────────── + +# ═════════════════════════════════════════════════════════════════════════ +# Level 1: Clipboard module β€” platform dispatch + tool interactions +# ═════════════════════════════════════════════════════════════════════════ class TestSaveClipboardImage: def test_dispatches_to_macos_on_darwin(self, tmp_path): @@ -49,21 +56,15 @@ class TestSaveClipboardImage: assert dest.parent.exists() -# ── macOS pngpaste ─────────────────────────────────────────────────────── - class TestMacosPngpaste: def test_success_writes_file(self, tmp_path): - """pngpaste writes the file on success β€” verify we detect it.""" dest = tmp_path / "out.png" - def fake_run(cmd, **kw): - # Simulate pngpaste writing the file - dest.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100) + dest.write_bytes(FAKE_PNG) return MagicMock(returncode=0) - with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _macos_pngpaste(dest) is True - assert dest.stat().st_size > 0 + assert dest.stat().st_size == len(FAKE_PNG) def test_not_installed(self, tmp_path): with patch("hermes_cli.clipboard.subprocess.run", side_effect=FileNotFoundError): @@ -77,18 +78,19 @@ class TestMacosPngpaste: assert not dest.exists() def test_empty_file_rejected(self, tmp_path): - """pngpaste exits 0 but writes an empty file β€” should return False.""" dest = tmp_path / "out.png" - def fake_run(cmd, **kw): - dest.write_bytes(b"") # empty + dest.write_bytes(b"") return MagicMock(returncode=0) - with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _macos_pngpaste(dest) is False + def test_timeout_returns_false(self, tmp_path): + dest = tmp_path / "out.png" + with patch("hermes_cli.clipboard.subprocess.run", + side_effect=subprocess.TimeoutExpired("pngpaste", 3)): + assert _macos_pngpaste(dest) is False -# ── macOS osascript ────────────────────────────────────────────────────── class TestMacosOsascript: def test_no_image_type_in_clipboard(self, tmp_path): @@ -103,57 +105,53 @@ class TestMacosOsascript: assert _macos_osascript(tmp_path / "out.png") is False def test_success_with_png(self, tmp_path): - """clipboard has PNGf, osascript extracts it successfully.""" dest = tmp_path / "out.png" - call_count = [0] - + calls = [] def fake_run(cmd, **kw): - call_count[0] += 1 - if call_count[0] == 1: - # clipboard info check + calls.append(cmd) + if len(calls) == 1: return MagicMock(stdout="Β«class PNGfΒ», Β«class ut16Β»", returncode=0) - else: - # extraction β€” simulate writing the file - dest.write_bytes(b"\x89PNG" + b"\x00" * 50) - return MagicMock(stdout="", returncode=0) - + dest.write_bytes(FAKE_PNG) + return MagicMock(stdout="", returncode=0) with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _macos_osascript(dest) is True assert dest.stat().st_size > 0 def test_success_with_tiff(self, tmp_path): - """clipboard has TIFF type β€” should still attempt extraction.""" dest = tmp_path / "out.png" - call_count = [0] - + calls = [] def fake_run(cmd, **kw): - call_count[0] += 1 - if call_count[0] == 1: + calls.append(cmd) + if len(calls) == 1: return MagicMock(stdout="Β«class TIFFΒ»", returncode=0) - else: - dest.write_bytes(b"\x89PNG" + b"\x00" * 50) - return MagicMock(stdout="", returncode=0) - + dest.write_bytes(FAKE_PNG) + return MagicMock(stdout="", returncode=0) with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _macos_osascript(dest) is True def test_extraction_returns_fail(self, tmp_path): - """clipboard info says image but extraction script returns 'fail'.""" dest = tmp_path / "out.png" - call_count = [0] - + calls = [] def fake_run(cmd, **kw): - call_count[0] += 1 - if call_count[0] == 1: + calls.append(cmd) + if len(calls) == 1: return MagicMock(stdout="Β«class PNGfΒ»", returncode=0) - else: - return MagicMock(stdout="fail", returncode=0) - + return MagicMock(stdout="fail", returncode=0) with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _macos_osascript(dest) is False + def test_extraction_writes_empty_file(self, tmp_path): + dest = tmp_path / "out.png" + calls = [] + def fake_run(cmd, **kw): + calls.append(cmd) + if len(calls) == 1: + return MagicMock(stdout="Β«class PNGfΒ»", returncode=0) + dest.write_bytes(b"") + return MagicMock(stdout="", returncode=0) + with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): + assert _macos_osascript(dest) is False -# ── Linux xclip ────────────────────────────────────────────────────────── class TestLinuxSave: def test_no_xclip_installed(self, tmp_path): @@ -166,116 +164,234 @@ class TestLinuxSave: assert _linux_save(tmp_path / "out.png") is False def test_image_extraction_success(self, tmp_path): - """xclip reports image/png in targets, then pipes PNG data.""" dest = tmp_path / "out.png" - call_count = [0] - def fake_run(cmd, **kw): - call_count[0] += 1 if "TARGETS" in cmd: return MagicMock(stdout="image/png\ntext/plain\n", returncode=0) - # Extract call β€” write via the stdout file handle if "stdout" in kw and hasattr(kw["stdout"], "write"): - kw["stdout"].write(b"\x89PNG\r\n\x1a\n" + b"\x00" * 100) + kw["stdout"].write(FAKE_PNG) return MagicMock(returncode=0) - with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _linux_save(dest) is True assert dest.stat().st_size > 0 def test_extraction_fails_cleans_up(self, tmp_path): - """If xclip extraction fails, any partial file is cleaned up.""" dest = tmp_path / "out.png" - call_count = [0] - def fake_run(cmd, **kw): - call_count[0] += 1 if "TARGETS" in cmd: return MagicMock(stdout="image/png\n", returncode=0) raise subprocess.SubprocessError("pipe broke") - with patch("hermes_cli.clipboard.subprocess.run", side_effect=fake_run): assert _linux_save(dest) is False assert not dest.exists() + def test_targets_check_timeout(self, tmp_path): + with patch("hermes_cli.clipboard.subprocess.run", + side_effect=subprocess.TimeoutExpired("xclip", 3)): + assert _linux_save(tmp_path / "out.png") is False -# ── Multimodal content conversion (CLI-level) ──────────────────────────── -class TestMultimodalConversion: - """Test the image β†’ OpenAI vision content conversion in chat().""" +# ═════════════════════════════════════════════════════════════════════════ +# Level 2: _build_multimodal_content β€” image β†’ OpenAI vision format +# ═════════════════════════════════════════════════════════════════════════ - def _make_fake_image(self, tmp_path, name="test.png", size=64): - """Create a small fake PNG file.""" +class TestBuildMultimodalContent: + """Test the extracted _build_multimodal_content method directly.""" + + @pytest.fixture + def cli(self): + """Minimal HermesCLI with mocked internals.""" + with patch("cli.load_cli_config") as mock_cfg: + mock_cfg.return_value = { + "model": {"default": "test/model", "base_url": "http://x", "provider": "auto"}, + "terminal": {"timeout": 60}, + "browser": {}, + "compression": {"enabled": True}, + "agent": {"max_turns": 10}, + "display": {"compact": True}, + "clarify": {}, + "code_execution": {}, + "delegation": {}, + } + with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}): + with patch("cli.CLI_CONFIG", mock_cfg.return_value): + from cli import HermesCLI + cli_obj = HermesCLI.__new__(HermesCLI) + # Manually init just enough state + cli_obj._attached_images = [] + cli_obj._image_counter = 0 + return cli_obj + + def _make_image(self, tmp_path, name="test.png", content=FAKE_PNG): img = tmp_path / name - img.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * size) + img.write_bytes(content) return img - def test_single_image_with_text(self, tmp_path): - """One image + text β†’ multimodal content array.""" - img = self._make_fake_image(tmp_path) - raw_bytes = img.read_bytes() - expected_b64 = base64.b64encode(raw_bytes).decode() + def test_single_image_with_text(self, cli, tmp_path): + img = self._make_image(tmp_path) + result = cli._build_multimodal_content("Describe this", [img]) - # Simulate what chat() does with images - message = "What's in this image?" - images = [img] + assert len(result) == 2 + assert result[0] == {"type": "text", "text": "Describe this"} + assert result[1]["type"] == "image_url" + url = result[1]["image_url"]["url"] + assert url.startswith("data:image/png;base64,") + # Verify the base64 actually decodes to our image + b64_data = url.split(",", 1)[1] + assert base64.b64decode(b64_data) == FAKE_PNG - content_parts = [] - content_parts.append({"type": "text", "text": message}) - for img_path in images: - data = base64.b64encode(img_path.read_bytes()).decode() - ext = img_path.suffix.lower().lstrip(".") - mime = {"png": "image/png", "jpg": "image/jpeg"}.get(ext, "image/png") - content_parts.append({ - "type": "image_url", - "image_url": {"url": f"data:{mime};base64,{data}"} - }) + def test_multiple_images(self, cli, tmp_path): + imgs = [self._make_image(tmp_path, f"img{i}.png") for i in range(3)] + result = cli._build_multimodal_content("Compare", imgs) + assert len(result) == 4 # 1 text + 3 images + assert all(r["type"] == "image_url" for r in result[1:]) - assert len(content_parts) == 2 - assert content_parts[0]["type"] == "text" - assert content_parts[0]["text"] == "What's in this image?" - assert content_parts[1]["type"] == "image_url" - assert content_parts[1]["image_url"]["url"].startswith("data:image/png;base64,") - assert expected_b64 in content_parts[1]["image_url"]["url"] + def test_empty_text_gets_default_question(self, cli, tmp_path): + img = self._make_image(tmp_path) + result = cli._build_multimodal_content("", [img]) + assert result[0]["text"] == "What do you see in this image?" - def test_multiple_images(self, tmp_path): - """Multiple images β†’ all included in content array.""" - imgs = [self._make_fake_image(tmp_path, f"img{i}.png") for i in range(3)] + def test_jpeg_mime_type(self, cli, tmp_path): + img = self._make_image(tmp_path, "photo.jpg", b"\xff\xd8\xff\x00" * 20) + result = cli._build_multimodal_content("test", [img]) + assert "image/jpeg" in result[1]["image_url"]["url"] - content_parts = [{"type": "text", "text": "Compare these"}] - for img_path in imgs: - data = base64.b64encode(img_path.read_bytes()).decode() - content_parts.append({ - "type": "image_url", - "image_url": {"url": f"data:image/png;base64,{data}"} - }) + def test_webp_mime_type(self, cli, tmp_path): + img = self._make_image(tmp_path, "img.webp", b"RIFF\x00\x00" * 10) + result = cli._build_multimodal_content("test", [img]) + assert "image/webp" in result[1]["image_url"]["url"] - assert len(content_parts) == 4 # 1 text + 3 images + def test_unknown_extension_defaults_to_png(self, cli, tmp_path): + img = self._make_image(tmp_path, "data.bmp", b"\x00" * 50) + result = cli._build_multimodal_content("test", [img]) + assert "image/png" in result[1]["image_url"]["url"] - def test_no_text_gets_default(self): - """Empty text with image β†’ default question added.""" - text = "" - if not text: - text = "What do you see in this image?" - assert text == "What do you see in this image?" + def test_missing_image_skipped(self, cli, tmp_path): + missing = tmp_path / "gone.png" + result = cli._build_multimodal_content("test", [missing]) + assert len(result) == 1 # only text - def test_jpeg_mime_type(self, tmp_path): - """JPEG files get the correct MIME type.""" - img = tmp_path / "photo.jpg" - img.write_bytes(b"\xff\xd8\xff" + b"\x00" * 50) + def test_mix_of_existing_and_missing(self, cli, tmp_path): + real = self._make_image(tmp_path, "real.png") + missing = tmp_path / "gone.png" + result = cli._build_multimodal_content("test", [real, missing]) + assert len(result) == 2 # text + 1 real image - ext = img.suffix.lower().lstrip(".") - mime = {"png": "image/png", "jpg": "image/jpeg", - "jpeg": "image/jpeg", "gif": "image/gif", - "webp": "image/webp"}.get(ext, "image/png") - assert mime == "image/jpeg" - def test_missing_image_skipped(self, tmp_path): - """Non-existent image path is silently skipped.""" - missing = tmp_path / "does_not_exist.png" - images = [missing] - content_parts = [{"type": "text", "text": "test"}] - for img_path in images: - if img_path.exists(): - content_parts.append({"type": "image_url"}) - assert len(content_parts) == 1 # only text, no image +# ═════════════════════════════════════════════════════════════════════════ +# Level 3: _try_attach_clipboard_image β€” state management +# ═════════════════════════════════════════════════════════════════════════ + +class TestTryAttachClipboardImage: + """Test the clipboard β†’ state flow.""" + + @pytest.fixture + def cli(self): + from cli import HermesCLI + cli_obj = HermesCLI.__new__(HermesCLI) + cli_obj._attached_images = [] + cli_obj._image_counter = 0 + return cli_obj + + def test_image_found_attaches(self, cli): + with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True): + result = cli._try_attach_clipboard_image() + assert result is True + assert len(cli._attached_images) == 1 + assert cli._image_counter == 1 + + def test_no_image_doesnt_attach(self, cli): + with patch("hermes_cli.clipboard.save_clipboard_image", return_value=False): + result = cli._try_attach_clipboard_image() + assert result is False + assert len(cli._attached_images) == 0 + assert cli._image_counter == 0 # rolled back + + def test_multiple_attaches_increment_counter(self, cli): + with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True): + cli._try_attach_clipboard_image() + cli._try_attach_clipboard_image() + cli._try_attach_clipboard_image() + assert len(cli._attached_images) == 3 + assert cli._image_counter == 3 + + def test_mixed_success_and_failure(self, cli): + results = [True, False, True] + with patch("hermes_cli.clipboard.save_clipboard_image", side_effect=results): + cli._try_attach_clipboard_image() + cli._try_attach_clipboard_image() + cli._try_attach_clipboard_image() + assert len(cli._attached_images) == 2 + assert cli._image_counter == 2 # 3 attempts, 1 rolled back + + def test_image_path_follows_naming_convention(self, cli): + with patch("hermes_cli.clipboard.save_clipboard_image", return_value=True): + cli._try_attach_clipboard_image() + path = cli._attached_images[0] + assert path.parent == Path.home() / ".hermes" / "images" + assert path.name.startswith("clip_") + assert path.suffix == ".png" + + +# ═════════════════════════════════════════════════════════════════════════ +# Level 4: Queue routing β€” tuple unpacking in process_loop +# ═════════════════════════════════════════════════════════════════════════ + +class TestQueueRouting: + """Test that (text, images) tuples are correctly unpacked and routed.""" + + def test_plain_string_stays_string(self): + """Regular text input has no images.""" + user_input = "hello world" + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + assert user_input == "hello world" + assert submit_images == [] + + def test_tuple_unpacks_text_and_images(self, tmp_path): + """(text, images) tuple is correctly split.""" + img = tmp_path / "test.png" + img.write_bytes(FAKE_PNG) + user_input = ("describe this", [img]) + + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + assert user_input == "describe this" + assert len(submit_images) == 1 + assert submit_images[0] == img + + def test_empty_text_with_images(self, tmp_path): + """Images without text β€” text should be empty string.""" + img = tmp_path / "test.png" + img.write_bytes(FAKE_PNG) + user_input = ("", [img]) + + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + assert user_input == "" + assert len(submit_images) == 1 + + def test_command_with_images_not_treated_as_command(self): + """Text starting with / in a tuple should still be a command.""" + user_input = "/help" + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + is_command = isinstance(user_input, str) and user_input.startswith("/") + assert is_command is True + + def test_images_only_not_treated_as_command(self, tmp_path): + """Empty text + images should not be treated as a command.""" + img = tmp_path / "test.png" + img.write_bytes(FAKE_PNG) + user_input = ("", [img]) + + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + is_command = isinstance(user_input, str) and user_input.startswith("/") + assert is_command is False + assert len(submit_images) == 1