"""CLI presentation -- spinner, kawaii faces, tool preview formatting. Pure display functions and classes with no AIAgent dependency. Used by AIAgent._execute_tool_calls for CLI feedback. """ import json import logging import os import sys import threading import time # ANSI escape codes for coloring tool failure indicators _RED = "\033[31m" _RESET = "\033[0m" logger = logging.getLogger(__name__) # ========================================================================= # Configurable tool preview length (0 = no limit) # Set once at startup by CLI or gateway from display.tool_preview_length config. # ========================================================================= _tool_preview_max_len: int = 0 # 0 = unlimited def set_tool_preview_max_len(n: int) -> None: """Set the global max length for tool call previews. 0 = no limit.""" global _tool_preview_max_len _tool_preview_max_len = max(int(n), 0) if n else 0 def get_tool_preview_max_len() -> int: """Return the configured max preview length (0 = unlimited).""" return _tool_preview_max_len # ========================================================================= # Skin-aware helpers (lazy import to avoid circular deps) # ========================================================================= def _get_skin(): """Get the active skin config, or None if not available.""" try: from hermes_cli.skin_engine import get_active_skin return get_active_skin() except Exception: return None def get_skin_faces(key: str, default: list) -> list: """Get spinner face list from active skin, falling back to default.""" skin = _get_skin() if skin: faces = skin.get_spinner_list(key) if faces: return faces return default def get_skin_verbs() -> list: """Get thinking verbs from active skin.""" skin = _get_skin() if skin: verbs = skin.get_spinner_list("thinking_verbs") if verbs: return verbs return KawaiiSpinner.THINKING_VERBS def get_skin_tool_prefix() -> str: """Get tool output prefix character from active skin.""" skin = _get_skin() if skin: return skin.tool_prefix return "┊" def get_tool_emoji(tool_name: str, default: str = "⚡") -> str: """Get the display emoji for a tool. Resolution order: 1. Active skin's ``tool_emojis`` overrides (if a skin is loaded) 2. Tool registry's per-tool ``emoji`` field 3. *default* fallback """ # 1. Skin override skin = _get_skin() if skin and skin.tool_emojis: override = skin.tool_emojis.get(tool_name) if override: return override # 2. Registry default try: from tools.registry import registry emoji = registry.get_emoji(tool_name, default="") if emoji: return emoji except Exception: pass # 3. Hardcoded fallback return default # ========================================================================= # Tool preview (one-line summary of a tool call's primary argument) # ========================================================================= def _oneline(text: str) -> str: """Collapse whitespace (including newlines) to single spaces.""" return " ".join(text.split()) def build_tool_preview(tool_name: str, args: dict, max_len: int | None = None) -> str | None: """Build a short preview of a tool call's primary argument for display. *max_len* controls truncation. ``None`` (default) defers to the global ``_tool_preview_max_len`` set via config; ``0`` means unlimited. """ if max_len is None: max_len = _tool_preview_max_len if not args: return None primary_args = { "terminal": "command", "web_search": "query", "web_extract": "urls", "read_file": "path", "write_file": "path", "patch": "path", "search_files": "pattern", "browser_navigate": "url", "browser_click": "ref", "browser_type": "text", "image_generate": "prompt", "text_to_speech": "text", "vision_analyze": "question", "mixture_of_agents": "user_prompt", "skill_view": "name", "skills_list": "category", "cronjob": "action", "execute_code": "code", "delegate_task": "goal", "clarify": "question", "skill_manage": "name", } if tool_name == "process": action = args.get("action", "") sid = args.get("session_id", "") data = args.get("data", "") timeout_val = args.get("timeout") parts = [action] if sid: parts.append(sid[:16]) if data: parts.append(f'"{_oneline(data[:20])}"') if timeout_val and action == "wait": parts.append(f"{timeout_val}s") return " ".join(parts) if parts else None if tool_name == "todo": todos_arg = args.get("todos") merge = args.get("merge", False) if todos_arg is None: return "reading task list" elif merge: return f"updating {len(todos_arg)} task(s)" else: return f"planning {len(todos_arg)} task(s)" if tool_name == "session_search": query = _oneline(args.get("query", "")) return f"recall: \"{query[:25]}{'...' if len(query) > 25 else ''}\"" if tool_name == "memory": action = args.get("action", "") target = args.get("target", "") if action == "add": content = _oneline(args.get("content", "")) return f"+{target}: \"{content[:25]}{'...' if len(content) > 25 else ''}\"" elif action == "replace": return f"~{target}: \"{_oneline(args.get('old_text', '')[:20])}\"" elif action == "remove": return f"-{target}: \"{_oneline(args.get('old_text', '')[:20])}\"" return action if tool_name == "send_message": target = args.get("target", "?") msg = _oneline(args.get("message", "")) if len(msg) > 20: msg = msg[:17] + "..." return f"to {target}: \"{msg}\"" if tool_name.startswith("rl_"): rl_previews = { "rl_list_environments": "listing envs", "rl_select_environment": args.get("name", ""), "rl_get_current_config": "reading config", "rl_edit_config": f"{args.get('field', '')}={args.get('value', '')}", "rl_start_training": "starting", "rl_check_status": args.get("run_id", "")[:16], "rl_stop_training": f"stopping {args.get('run_id', '')[:16]}", "rl_get_results": args.get("run_id", "")[:16], "rl_list_runs": "listing runs", "rl_test_inference": f"{args.get('num_steps', 3)} steps", } return rl_previews.get(tool_name) key = primary_args.get(tool_name) if not key: for fallback_key in ("query", "text", "command", "path", "name", "prompt", "code", "goal"): if fallback_key in args: key = fallback_key break if not key or key not in args: return None value = args[key] if isinstance(value, list): value = value[0] if value else "" preview = _oneline(str(value)) if not preview: return None if max_len > 0 and len(preview) > max_len: preview = preview[:max_len - 3] + "..." return preview # ========================================================================= # KawaiiSpinner # ========================================================================= class KawaiiSpinner: """Animated spinner with kawaii faces for CLI feedback during tool execution.""" SPINNERS = { 'dots': ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'], 'bounce': ['⠁', '⠂', '⠄', '⡀', '⢀', '⠠', '⠐', '⠈'], 'grow': ['▁', '▂', '▃', '▄', '▅', '▆', '▇', '█', '▇', '▆', '▅', '▄', '▃', '▂'], 'arrows': ['←', '↖', '↑', '↗', '→', '↘', '↓', '↙'], 'star': ['✶', '✷', '✸', '✹', '✺', '✹', '✸', '✷'], 'moon': ['🌑', '🌒', '🌓', '🌔', '🌕', '🌖', '🌗', '🌘'], 'pulse': ['◜', '◠', '◝', '◞', '◡', '◟'], 'brain': ['🧠', '💭', '💡', '✨', '💫', '🌟', '💡', '💭'], 'sparkle': ['⁺', '˚', '*', '✧', '✦', '✧', '*', '˚'], } KAWAII_WAITING = [ "(。◕‿◕。)", "(◕‿◕✿)", "٩(◕‿◕。)۶", "(✿◠‿◠)", "( ˘▽˘)っ", "♪(´ε` )", "(◕ᴗ◕✿)", "ヾ(^∇^)", "(≧◡≦)", "(★ω★)", ] KAWAII_THINKING = [ "(。•́︿•̀。)", "(◔_◔)", "(¬‿¬)", "( •_•)>⌐■-■", "(⌐■_■)", "(´・_・`)", "◉_◉", "(°ロ°)", "( ˘⌣˘)♡", "ヽ(>∀<☆)☆", "٩(๑❛ᴗ❛๑)۶", "(⊙_⊙)", "(¬_¬)", "( ͡° ͜ʖ ͡°)", "ಠ_ಠ", ] THINKING_VERBS = [ "pondering", "contemplating", "musing", "cogitating", "ruminating", "deliberating", "mulling", "reflecting", "processing", "reasoning", "analyzing", "computing", "synthesizing", "formulating", "brainstorming", ] def __init__(self, message: str = "", spinner_type: str = 'dots', print_fn=None): self.message = message self.spinner_frames = self.SPINNERS.get(spinner_type, self.SPINNERS['dots']) self.running = False self.thread = None self.frame_idx = 0 self.start_time = None self.last_line_len = 0 # Optional callable to route all output through (e.g. a no-op for silent # background agents). When set, bypasses self._out entirely so that # agents with _print_fn overridden remain fully silent. self._print_fn = print_fn # Capture stdout NOW, before any redirect_stdout(devnull) from # child agents can replace sys.stdout with a black hole. self._out = sys.stdout def _write(self, text: str, end: str = '\n', flush: bool = False): """Write to the stdout captured at spinner creation time. If a print_fn was supplied at construction, all output is routed through it instead — allowing callers to silence the spinner with a no-op lambda. """ if self._print_fn is not None: try: self._print_fn(text) except Exception: pass return try: self._out.write(text + end) if flush: self._out.flush() except (ValueError, OSError): pass @property def _is_tty(self) -> bool: """Check if output is a real terminal, safe against closed streams.""" try: return hasattr(self._out, 'isatty') and self._out.isatty() except (ValueError, OSError): return False def _is_patch_stdout_proxy(self) -> bool: """Return True when stdout is prompt_toolkit's StdoutProxy. patch_stdout wraps sys.stdout in a StdoutProxy that queues writes and injects newlines around each flush(). The \\r overwrite never lands on the correct line — each spinner frame ends up on its own line. The CLI already drives a TUI widget (_spinner_text) for spinner display, so KawaiiSpinner's \\r-based animation is redundant under StdoutProxy. """ try: from prompt_toolkit.patch_stdout import StdoutProxy return isinstance(self._out, StdoutProxy) except ImportError: return False def _animate(self): # When stdout is not a real terminal (e.g. Docker, systemd, pipe), # skip the animation entirely — it creates massive log bloat. # Just log the start once and let stop() log the completion. if not self._is_tty: self._write(f" [tool] {self.message}", flush=True) while self.running: time.sleep(0.5) return # When running inside prompt_toolkit's patch_stdout context the CLI # renders spinner state via a dedicated TUI widget (_spinner_text). # Driving a \r-based animation here too causes visual overdraw: the # StdoutProxy injects newlines around each flush, so every frame lands # on a new line and overwrites the status bar. if self._is_patch_stdout_proxy(): while self.running: time.sleep(0.1) return # Cache skin wings at start (avoid per-frame imports) skin = _get_skin() wings = skin.get_spinner_wings() if skin else [] while self.running: if os.getenv("HERMES_SPINNER_PAUSE"): time.sleep(0.1) continue frame = self.spinner_frames[self.frame_idx % len(self.spinner_frames)] elapsed = time.time() - self.start_time if wings: left, right = wings[self.frame_idx % len(wings)] line = f" {left} {frame} {self.message} {right} ({elapsed:.1f}s)" else: line = f" {frame} {self.message} ({elapsed:.1f}s)" pad = max(self.last_line_len - len(line), 0) self._write(f"\r{line}{' ' * pad}", end='', flush=True) self.last_line_len = len(line) self.frame_idx += 1 time.sleep(0.12) def start(self): if self.running: return self.running = True self.start_time = time.time() self.thread = threading.Thread(target=self._animate, daemon=True) self.thread.start() def update_text(self, new_message: str): self.message = new_message def print_above(self, text: str): """Print a line above the spinner without disrupting animation. Clears the current spinner line, prints the text, and lets the next animation tick redraw the spinner on the line below. Thread-safe: uses the captured stdout reference (self._out). Works inside redirect_stdout(devnull) because _write bypasses sys.stdout and writes to the stdout captured at spinner creation. """ if not self.running: self._write(f" {text}", flush=True) return # Clear spinner line with spaces (not \033[K) to avoid garbled escape # codes when prompt_toolkit's patch_stdout is active — same approach # as stop(). Then print text; spinner redraws on next tick. blanks = ' ' * max(self.last_line_len + 5, 40) self._write(f"\r{blanks}\r {text}", flush=True) def stop(self, final_message: str = None): self.running = False if self.thread: self.thread.join(timeout=0.5) is_tty = self._is_tty if is_tty: # Clear the spinner line with spaces instead of \033[K to avoid # garbled escape codes when prompt_toolkit's patch_stdout is active. blanks = ' ' * max(self.last_line_len + 5, 40) self._write(f"\r{blanks}\r", end='', flush=True) if final_message: elapsed = f" ({time.time() - self.start_time:.1f}s)" if self.start_time else "" if is_tty: self._write(f" {final_message}", flush=True) else: self._write(f" [done] {final_message}{elapsed}", flush=True) def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() return False # ========================================================================= # Kawaii face arrays (used by AIAgent._execute_tool_calls for spinner text) # ========================================================================= KAWAII_SEARCH = [ "♪(´ε` )", "(。◕‿◕。)", "ヾ(^∇^)", "(◕ᴗ◕✿)", "( ˘▽˘)っ", "٩(◕‿◕。)۶", "(✿◠‿◠)", "♪~(´ε` )", "(ノ´ヮ`)ノ*:・゚✧", "\(◎o◎)/", ] KAWAII_READ = [ "φ(゜▽゜*)♪", "( ˘▽˘)っ", "(⌐■_■)", "٩(。•́‿•̀。)۶", "(◕‿◕✿)", "ヾ(@⌒ー⌒@)ノ", "(✧ω✧)", "♪(๑ᴖ◡ᴖ๑)♪", "(≧◡≦)", "( ´ ▽ ` )ノ", ] KAWAII_TERMINAL = [ "ヽ(>∀<☆)ノ", "(ノ°∀°)ノ", "٩(^ᴗ^)۶", "ヾ(⌐■_■)ノ♪", "(•̀ᴗ•́)و", "┗(^0^)┓", "(`・ω・´)", "\( ̄▽ ̄)/", "(ง •̀_•́)ง", "ヽ(´▽`)/", ] KAWAII_BROWSER = [ "(ノ°∀°)ノ", "(☞゚ヮ゚)☞", "( ͡° ͜ʖ ͡°)", "┌( ಠ_ಠ)┘", "(⊙_⊙)?", "ヾ(•ω•`)o", "( ̄ω ̄)", "( ˇωˇ )", "(ᵔᴥᵔ)", "\(◎o◎)/", ] KAWAII_CREATE = [ "✧*。٩(ˊᗜˋ*)و✧", "(ノ◕ヮ◕)ノ*:・゚✧", "ヽ(>∀<☆)ノ", "٩(♡ε♡)۶", "(◕‿◕)♡", "✿◕ ‿ ◕✿", "(*≧▽≦)", "ヾ(^-^)ノ", "(☆▽☆)", "°˖✧◝(⁰▿⁰)◜✧˖°", ] KAWAII_SKILL = [ "ヾ(@⌒ー⌒@)ノ", "(๑˃ᴗ˂)ﻭ", "٩(◕‿◕。)۶", "(✿╹◡╹)", "ヽ(・∀・)ノ", "(ノ´ヮ`)ノ*:・゚✧", "♪(๑ᴖ◡ᴖ๑)♪", "(◠‿◠)", "٩(ˊᗜˋ*)و", "(^▽^)", "ヾ(^∇^)", "(★ω★)/", "٩(。•́‿•̀。)۶", "(◕ᴗ◕✿)", "\(◎o◎)/", "(✧ω✧)", "ヽ(>∀<☆)ノ", "( ˘▽˘)っ", "(≧◡≦) ♡", "ヾ( ̄▽ ̄)", ] KAWAII_THINK = [ "(っ°Д°;)っ", "(;′⌒`)", "(・_・ヾ", "( ´_ゝ`)", "( ̄ヘ ̄)", "(。-`ω´-)", "( ˘︹˘ )", "(¬_¬)", "ヽ(ー_ー )ノ", "(;一_一)", ] KAWAII_GENERIC = [ "♪(´ε` )", "(◕‿◕✿)", "ヾ(^∇^)", "٩(◕‿◕。)۶", "(✿◠‿◠)", "(ノ´ヮ`)ノ*:・゚✧", "ヽ(>∀<☆)ノ", "(☆▽☆)", "( ˘▽˘)っ", "(≧◡≦)", ] # ========================================================================= # Cute tool message (completion line that replaces the spinner) # ========================================================================= def _detect_tool_failure(tool_name: str, result: str | None) -> tuple[bool, str]: """Inspect a tool result string for signs of failure. Returns ``(is_failure, suffix)`` where *suffix* is an informational tag like ``" [exit 1]"`` for terminal failures, or ``" [error]"`` for generic failures. On success, returns ``(False, "")``. """ if result is None: return False, "" if tool_name == "terminal": try: data = json.loads(result) exit_code = data.get("exit_code") if exit_code is not None and exit_code != 0: return True, f" [exit {exit_code}]" except (json.JSONDecodeError, TypeError, AttributeError): logger.debug("Could not parse terminal result as JSON for exit code check") return False, "" # Memory-specific: distinguish "full" from real errors if tool_name == "memory": try: data = json.loads(result) if data.get("success") is False and "exceed the limit" in data.get("error", ""): return True, " [full]" except (json.JSONDecodeError, TypeError, AttributeError): logger.debug("Could not parse memory result as JSON for capacity check") # Generic heuristic for non-terminal tools lower = result[:500].lower() if '"error"' in lower or '"failed"' in lower or result.startswith("Error"): return True, " [error]" return False, "" def get_cute_tool_message( tool_name: str, args: dict, duration: float, result: str | None = None, ) -> str: """Generate a formatted tool completion line for CLI quiet mode. Format: ``| {emoji} {verb:9} {detail} {duration}`` When *result* is provided the line is checked for failure indicators. Failed tool calls get a red prefix and an informational suffix. """ dur = f"{duration:.1f}s" is_failure, failure_suffix = _detect_tool_failure(tool_name, result) skin_prefix = get_skin_tool_prefix() def _trunc(s, n=40): s = str(s) if _tool_preview_max_len == 0: return s # no limit return (s[:n-3] + "...") if len(s) > n else s def _path(p, n=35): p = str(p) if _tool_preview_max_len == 0: return p # no limit return ("..." + p[-(n-3):]) if len(p) > n else p def _wrap(line: str) -> str: """Apply skin tool prefix and failure suffix.""" if skin_prefix != "┊": line = line.replace("┊", skin_prefix, 1) if not is_failure: return line return f"{line}{failure_suffix}" if tool_name == "web_search": return _wrap(f"┊ 🔍 search {_trunc(args.get('query', ''), 42)} {dur}") if tool_name == "web_extract": urls = args.get("urls", []) if urls: url = urls[0] if isinstance(urls, list) else str(urls) domain = url.replace("https://", "").replace("http://", "").split("/")[0] extra = f" +{len(urls)-1}" if len(urls) > 1 else "" return _wrap(f"┊ 📄 fetch {_trunc(domain, 35)}{extra} {dur}") return _wrap(f"┊ 📄 fetch pages {dur}") if tool_name == "web_crawl": url = args.get("url", "") domain = url.replace("https://", "").replace("http://", "").split("/")[0] return _wrap(f"┊ 🕸️ crawl {_trunc(domain, 35)} {dur}") if tool_name == "terminal": return _wrap(f"┊ 💻 $ {_trunc(args.get('command', ''), 42)} {dur}") if tool_name == "process": action = args.get("action", "?") sid = args.get("session_id", "")[:12] labels = {"list": "ls processes", "poll": f"poll {sid}", "log": f"log {sid}", "wait": f"wait {sid}", "kill": f"kill {sid}", "write": f"write {sid}", "submit": f"submit {sid}"} return _wrap(f"┊ ⚙️ proc {labels.get(action, f'{action} {sid}')} {dur}") if tool_name == "read_file": return _wrap(f"┊ 📖 read {_path(args.get('path', ''))} {dur}") if tool_name == "write_file": return _wrap(f"┊ ✍️ write {_path(args.get('path', ''))} {dur}") if tool_name == "patch": return _wrap(f"┊ 🔧 patch {_path(args.get('path', ''))} {dur}") if tool_name == "search_files": pattern = _trunc(args.get("pattern", ""), 35) target = args.get("target", "content") verb = "find" if target == "files" else "grep" return _wrap(f"┊ 🔎 {verb:9} {pattern} {dur}") if tool_name == "browser_navigate": url = args.get("url", "") domain = url.replace("https://", "").replace("http://", "").split("/")[0] return _wrap(f"┊ 🌐 navigate {_trunc(domain, 35)} {dur}") if tool_name == "browser_snapshot": mode = "full" if args.get("full") else "compact" return _wrap(f"┊ 📸 snapshot {mode} {dur}") if tool_name == "browser_click": return _wrap(f"┊ 👆 click {args.get('ref', '?')} {dur}") if tool_name == "browser_type": return _wrap(f"┊ ⌨️ type \"{_trunc(args.get('text', ''), 30)}\" {dur}") if tool_name == "browser_scroll": d = args.get("direction", "down") arrow = {"down": "↓", "up": "↑", "right": "→", "left": "←"}.get(d, "↓") return _wrap(f"┊ {arrow} scroll {d} {dur}") if tool_name == "browser_back": return _wrap(f"┊ ◀️ back {dur}") if tool_name == "browser_press": return _wrap(f"┊ ⌨️ press {args.get('key', '?')} {dur}") if tool_name == "browser_close": return _wrap(f"┊ 🚪 close browser {dur}") if tool_name == "browser_get_images": return _wrap(f"┊ 🖼️ images extracting {dur}") if tool_name == "browser_vision": return _wrap(f"┊ 👁️ vision analyzing page {dur}") if tool_name == "todo": todos_arg = args.get("todos") merge = args.get("merge", False) if todos_arg is None: return _wrap(f"┊ 📋 plan reading tasks {dur}") elif merge: return _wrap(f"┊ 📋 plan update {len(todos_arg)} task(s) {dur}") else: return _wrap(f"┊ 📋 plan {len(todos_arg)} task(s) {dur}") if tool_name == "session_search": return _wrap(f"┊ 🔍 recall \"{_trunc(args.get('query', ''), 35)}\" {dur}") if tool_name == "memory": action = args.get("action", "?") target = args.get("target", "") if action == "add": return _wrap(f"┊ 🧠 memory +{target}: \"{_trunc(args.get('content', ''), 30)}\" {dur}") elif action == "replace": return _wrap(f"┊ 🧠 memory ~{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}") elif action == "remove": return _wrap(f"┊ 🧠 memory -{target}: \"{_trunc(args.get('old_text', ''), 20)}\" {dur}") return _wrap(f"┊ 🧠 memory {action} {dur}") if tool_name == "skills_list": return _wrap(f"┊ 📚 skills list {args.get('category', 'all')} {dur}") if tool_name == "skill_view": return _wrap(f"┊ 📚 skill {_trunc(args.get('name', ''), 30)} {dur}") if tool_name == "image_generate": return _wrap(f"┊ 🎨 create {_trunc(args.get('prompt', ''), 35)} {dur}") if tool_name == "text_to_speech": return _wrap(f"┊ 🔊 speak {_trunc(args.get('text', ''), 30)} {dur}") if tool_name == "vision_analyze": return _wrap(f"┊ 👁️ vision {_trunc(args.get('question', ''), 30)} {dur}") if tool_name == "mixture_of_agents": return _wrap(f"┊ 🧠 reason {_trunc(args.get('user_prompt', ''), 30)} {dur}") if tool_name == "send_message": return _wrap(f"┊ 📨 send {args.get('target', '?')}: \"{_trunc(args.get('message', ''), 25)}\" {dur}") if tool_name == "cronjob": action = args.get("action", "?") if action == "create": skills = args.get("skills") or ([] if not args.get("skill") else [args.get("skill")]) label = args.get("name") or (skills[0] if skills else None) or args.get("prompt", "task") return _wrap(f"┊ ⏰ cron create {_trunc(label, 24)} {dur}") if action == "list": return _wrap(f"┊ ⏰ cron listing {dur}") return _wrap(f"┊ ⏰ cron {action} {args.get('job_id', '')} {dur}") if tool_name.startswith("rl_"): rl = { "rl_list_environments": "list envs", "rl_select_environment": f"select {args.get('name', '')}", "rl_get_current_config": "get config", "rl_edit_config": f"set {args.get('field', '?')}", "rl_start_training": "start training", "rl_check_status": f"status {args.get('run_id', '?')[:12]}", "rl_stop_training": f"stop {args.get('run_id', '?')[:12]}", "rl_get_results": f"results {args.get('run_id', '?')[:12]}", "rl_list_runs": "list runs", "rl_test_inference": "test inference", } return _wrap(f"┊ 🧪 rl {rl.get(tool_name, tool_name.replace('rl_', ''))} {dur}") if tool_name == "execute_code": code = args.get("code", "") first_line = code.strip().split("\n")[0] if code.strip() else "" return _wrap(f"┊ 🐍 exec {_trunc(first_line, 35)} {dur}") if tool_name == "delegate_task": tasks = args.get("tasks") if tasks and isinstance(tasks, list): return _wrap(f"┊ 🔀 delegate {len(tasks)} parallel tasks {dur}") return _wrap(f"┊ 🔀 delegate {_trunc(args.get('goal', ''), 35)} {dur}") preview = build_tool_preview(tool_name, args) or "" return _wrap(f"┊ ⚡ {tool_name[:9]:9} {_trunc(preview, 35)} {dur}") # ========================================================================= # Honcho session line (one-liner with clickable OSC 8 hyperlink) # ========================================================================= _DIM = "\033[2m" _SKY_BLUE = "\033[38;5;117m" _ANSI_RESET = "\033[0m" def honcho_session_url(workspace: str, session_name: str) -> str: """Build a Honcho app URL for a session.""" from urllib.parse import quote return ( f"https://app.honcho.dev/explore" f"?workspace={quote(workspace, safe='')}" f"&view=sessions" f"&session={quote(session_name, safe='')}" ) def _osc8_link(url: str, text: str) -> str: """OSC 8 terminal hyperlink (clickable in iTerm2, Ghostty, WezTerm, etc.).""" return f"\033]8;;{url}\033\\{text}\033]8;;\033\\" def honcho_session_line(workspace: str, session_name: str) -> str: """One-line session indicator: `Honcho session: `.""" url = honcho_session_url(workspace, session_name) linked_name = _osc8_link(url, f"{_SKY_BLUE}{session_name}{_ANSI_RESET}") return f"{_DIM}Honcho session:{_ANSI_RESET} {linked_name}" def write_tty(text: str) -> None: """Write directly to /dev/tty, bypassing stdout capture.""" try: fd = os.open("/dev/tty", os.O_WRONLY) os.write(fd, text.encode("utf-8")) os.close(fd) except OSError: sys.stdout.write(text) sys.stdout.flush() # ========================================================================= # Context pressure display (CLI user-facing warnings) # ========================================================================= # ANSI color codes for context pressure tiers _CYAN = "\033[36m" _YELLOW = "\033[33m" _BOLD = "\033[1m" _DIM_ANSI = "\033[2m" # Bar characters _BAR_FILLED = "▰" _BAR_EMPTY = "▱" _BAR_WIDTH = 20 def format_context_pressure( compaction_progress: float, threshold_tokens: int, threshold_percent: float, compression_enabled: bool = True, ) -> str: """Build a formatted context pressure line for CLI display. The bar and percentage show progress toward the compaction threshold, NOT the raw context window. 100% = compaction fires. Args: compaction_progress: How close to compaction (0.0–1.0, 1.0 = fires). threshold_tokens: Compaction threshold in tokens. threshold_percent: Compaction threshold as a fraction of context window. compression_enabled: Whether auto-compression is active. """ pct_int = min(int(compaction_progress * 100), 100) filled = min(int(compaction_progress * _BAR_WIDTH), _BAR_WIDTH) bar = _BAR_FILLED * filled + _BAR_EMPTY * (_BAR_WIDTH - filled) threshold_k = f"{threshold_tokens // 1000}k" if threshold_tokens >= 1000 else str(threshold_tokens) threshold_pct_int = int(threshold_percent * 100) color = f"{_BOLD}{_YELLOW}" icon = "⚠" if compression_enabled: hint = "compaction approaching" else: hint = "no auto-compaction" return ( f" {color}{icon} context {bar} {pct_int}% to compaction{_ANSI_RESET}" f" {_DIM_ANSI}{threshold_k} threshold ({threshold_pct_int}%) · {hint}{_ANSI_RESET}" ) def format_context_pressure_gateway( compaction_progress: float, threshold_percent: float, compression_enabled: bool = True, ) -> str: """Build a plain-text context pressure notification for messaging platforms. No ANSI — just Unicode and plain text suitable for Telegram/Discord/etc. The percentage shows progress toward the compaction threshold. """ pct_int = min(int(compaction_progress * 100), 100) filled = min(int(compaction_progress * _BAR_WIDTH), _BAR_WIDTH) bar = _BAR_FILLED * filled + _BAR_EMPTY * (_BAR_WIDTH - filled) threshold_pct_int = int(threshold_percent * 100) icon = "⚠️" if compression_enabled: hint = f"Context compaction approaching (threshold: {threshold_pct_int}% of window)." else: hint = "Auto-compaction is disabled — context may be truncated." return f"{icon} Context: {bar} {pct_int}% to compaction\n{hint}"