diff --git a/cli.py b/cli.py index ae471d2e5..de21d81e5 100644 --- a/cli.py +++ b/cli.py @@ -3320,6 +3320,117 @@ class HermesCLI: else: _cprint(f" ↻ Resumed session {target_id}{title_part} — no messages, starting fresh.") + def _handle_branch_command(self, cmd_original: str) -> None: + """Handle /branch [name] — fork the current session into a new independent copy. + + Copies the full conversation history to a new session so the user can + explore a different approach without losing the original session state. + Inspired by Claude Code's /branch command. + """ + if not self.conversation_history: + _cprint(" No conversation to branch — send a message first.") + return + + if not self._session_db: + _cprint(" Session database not available.") + return + + parts = cmd_original.split(None, 1) + branch_name = parts[1].strip() if len(parts) > 1 else "" + + # Generate the new session ID + now = datetime.now() + timestamp_str = now.strftime("%Y%m%d_%H%M%S") + short_uuid = uuid.uuid4().hex[:6] + new_session_id = f"{timestamp_str}_{short_uuid}" + + # Determine branch title + if branch_name: + branch_title = branch_name + else: + # Auto-generate from the current session title + current_title = None + if self._session_db: + current_title = self._session_db.get_session_title(self.session_id) + base = current_title or "branch" + branch_title = self._session_db.get_next_title_in_lineage(base) + + # Save the current session's state before branching + parent_session_id = self.session_id + + # End the old session + try: + self._session_db.end_session(self.session_id, "branched") + except Exception: + pass + + # Create the new session with parent link + try: + self._session_db.create_session( + session_id=new_session_id, + source=os.environ.get("HERMES_SESSION_SOURCE", "cli"), + model=self.model, + model_config={ + "max_iterations": self.max_turns, + "reasoning_config": self.reasoning_config, + }, + parent_session_id=parent_session_id, + ) + except Exception as e: + _cprint(f" Failed to create branch session: {e}") + return + + # Copy conversation history to the new session + for msg in self.conversation_history: + try: + self._session_db.append_message( + session_id=new_session_id, + role=msg.get("role", "user"), + content=msg.get("content"), + tool_name=msg.get("tool_name") or msg.get("name"), + tool_calls=msg.get("tool_calls"), + tool_call_id=msg.get("tool_call_id"), + reasoning=msg.get("reasoning"), + ) + except Exception: + pass # Best-effort copy + + # Set title on the branch + try: + self._session_db.set_session_title(new_session_id, branch_title) + except Exception: + pass + + # Switch to the new session + self.session_id = new_session_id + self.session_start = now + self._pending_title = None + self._resumed = True # Prevents auto-title generation + + # Sync the agent + if self.agent: + self.agent.session_id = new_session_id + self.agent.session_start = now + self.agent.reset_session_state() + if hasattr(self.agent, "_last_flushed_db_idx"): + self.agent._last_flushed_db_idx = len(self.conversation_history) + if hasattr(self.agent, "_todo_store"): + try: + from tools.todo_tool import TodoStore + self.agent._todo_store = TodoStore() + except Exception: + pass + if hasattr(self.agent, "_invalidate_system_prompt"): + self.agent._invalidate_system_prompt() + + msg_count = len([m for m in self.conversation_history if m.get("role") == "user"]) + _cprint( + f" ⑂ Branched session \"{branch_title}\"" + f" ({msg_count} user message{'s' if msg_count != 1 else ''})" + ) + _cprint(f" Original session: {parent_session_id}") + _cprint(f" Branch session: {new_session_id}") + def reset_conversation(self): """Reset the conversation by starting a new session.""" # Shut down memory provider before resetting — actual session boundary @@ -4040,6 +4151,8 @@ class HermesCLI: self._pending_input.put(retry_msg) elif canonical == "undo": self.undo_last() + elif canonical == "branch": + self._handle_branch_command(cmd_original) elif canonical == "save": self.save_conversation() elif canonical == "cron": @@ -7659,6 +7772,49 @@ class HermesCLI: ) self._app = app # Store reference for clarify_callback + # ── Fix ghost status-bar lines on terminal resize ────────────── + # When the terminal shrinks (e.g. un-maximize), the emulator reflows + # the previously-rendered full-width rows (status bar, input rules) + # into multiple narrower rows. prompt_toolkit's _on_resize handler + # only cursor_up()s by the stored layout height, missing the extra + # rows created by reflow — leaving ghost duplicates visible. + # + # Fix: before the standard erase, inflate _cursor_pos.y so the + # cursor moves up far enough to cover the reflowed ghost content. + _original_on_resize = app._on_resize + + def _resize_clear_ghosts(): + from prompt_toolkit.data_structures import Point as _Pt + renderer = app.renderer + try: + old_size = renderer._last_size + new_size = renderer.output.get_size() + if ( + old_size + and new_size.columns < old_size.columns + and new_size.columns > 0 + ): + reflow_factor = ( + (old_size.columns + new_size.columns - 1) + // new_size.columns + ) + last_h = ( + renderer._last_screen.height + if renderer._last_screen + else 0 + ) + extra = last_h * (reflow_factor - 1) + if extra > 0: + renderer._cursor_pos = _Pt( + x=renderer._cursor_pos.x, + y=renderer._cursor_pos.y + extra, + ) + except Exception: + pass # never break resize handling + _original_on_resize() + + app._on_resize = _resize_clear_ghosts + def spinner_loop(): import time as _time diff --git a/gateway/run.py b/gateway/run.py index fecce33e3..33bfa1d79 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1990,6 +1990,9 @@ class GatewayRunner: if canonical == "resume": return await self._handle_resume_command(event) + if canonical == "branch": + return await self._handle_branch_command(event) + if canonical == "rollback": return await self._handle_rollback_command(event) @@ -4587,6 +4590,96 @@ class GatewayRunner: return f"↻ Resumed session **{title}**{msg_part}. Conversation restored." + async def _handle_branch_command(self, event: MessageEvent) -> str: + """Handle /branch [name] — fork the current session into a new independent copy. + + Copies conversation history to a new session so the user can explore + a different approach without losing the original. + Inspired by Claude Code's /branch command. + """ + import uuid as _uuid + + if not self._session_db: + return "Session database not available." + + source = event.source + session_key = self._session_key_for_source(source) + + # Load the current session and its transcript + current_entry = self.session_store.get_or_create_session(source) + history = self.session_store.load_transcript(current_entry.session_id) + if not history: + return "No conversation to branch — send a message first." + + branch_name = event.get_command_args().strip() + + # Generate the new session ID + from datetime import datetime as _dt + now = _dt.now() + timestamp_str = now.strftime("%Y%m%d_%H%M%S") + short_uuid = _uuid.uuid4().hex[:6] + new_session_id = f"{timestamp_str}_{short_uuid}" + + # Determine branch title + if branch_name: + branch_title = branch_name + else: + current_title = self._session_db.get_session_title(current_entry.session_id) + base = current_title or "branch" + branch_title = self._session_db.get_next_title_in_lineage(base) + + parent_session_id = current_entry.session_id + + # Create the new session with parent link + try: + self._session_db.create_session( + session_id=new_session_id, + source=source.platform.value if source.platform else "gateway", + model=(self.config.get("model", {}) or {}).get("default") if isinstance(self.config, dict) else None, + parent_session_id=parent_session_id, + ) + except Exception as e: + logger.error("Failed to create branch session: %s", e) + return f"Failed to create branch: {e}" + + # Copy conversation history to the new session + for msg in history: + try: + self._session_db.append_message( + session_id=new_session_id, + role=msg.get("role", "user"), + content=msg.get("content"), + tool_name=msg.get("tool_name") or msg.get("name"), + tool_calls=msg.get("tool_calls"), + tool_call_id=msg.get("tool_call_id"), + reasoning=msg.get("reasoning"), + ) + except Exception: + pass # Best-effort copy + + # Set title + try: + self._session_db.set_session_title(new_session_id, branch_title) + except Exception: + pass + + # Switch the session store entry to the new session + new_entry = self.session_store.switch_session(session_key, new_session_id) + if not new_entry: + return "Branch created but failed to switch to it." + + # Evict any cached agent for this session + self._evict_cached_agent(session_key) + + msg_count = len([m for m in history if m.get("role") == "user"]) + return ( + f"⑂ Branched to **{branch_title}**" + f" ({msg_count} message{'s' if msg_count != 1 else ''} copied)\n" + f"Original: `{parent_session_id}`\n" + f"Branch: `{new_session_id}`\n" + f"Use `/resume` to switch back to the original." + ) + async def _handle_usage_command(self, event: MessageEvent) -> str: """Handle /usage command -- show token usage for the session's last agent run.""" source = event.source diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index e3b3848e7..07a8f5e1e 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -57,6 +57,8 @@ COMMAND_REGISTRY: list[CommandDef] = [ CommandDef("undo", "Remove the last user/assistant exchange", "Session"), CommandDef("title", "Set a title for the current session", "Session", args_hint="[name]"), + CommandDef("branch", "Branch the current session (explore a different path)", "Session", + aliases=("fork",), args_hint="[name]"), CommandDef("compress", "Manually compress conversation context", "Session"), CommandDef("rollback", "List or restore filesystem checkpoints", "Session", args_hint="[number]"), diff --git a/tests/test_branch_command.py b/tests/test_branch_command.py new file mode 100644 index 000000000..9c3ec61d8 --- /dev/null +++ b/tests/test_branch_command.py @@ -0,0 +1,198 @@ +"""Tests for the /branch (/fork) command — session branching. + +Verifies that: +- Branching creates a new session with copied conversation history +- The original session is preserved (ended with "branched" reason) +- Auto-generated titles use lineage numbering +- Custom branch names are used when provided +- parent_session_id links are set correctly +- Edge cases: empty conversation, missing session DB +""" + +import os +import uuid +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch, PropertyMock + +import pytest + + +@pytest.fixture +def session_db(tmp_path): + """Create a real SessionDB for testing.""" + os.environ["HERMES_HOME"] = str(tmp_path / ".hermes") + os.makedirs(tmp_path / ".hermes", exist_ok=True) + from hermes_state import SessionDB + db = SessionDB(db_path=tmp_path / ".hermes" / "test_sessions.db") + yield db + db.close() + + +@pytest.fixture +def cli_instance(tmp_path, session_db): + """Create a minimal HermesCLI-like object for testing _handle_branch_command.""" + # We'll mock the CLI enough to test the branch logic without full init + from unittest.mock import MagicMock + + cli = MagicMock() + cli._session_db = session_db + cli.session_id = "20260403_120000_abc123" + cli.model = "anthropic/claude-sonnet-4.6" + cli.max_turns = 90 + cli.reasoning_config = {"enabled": True, "effort": "medium"} + cli.session_start = datetime.now() + cli._pending_title = None + cli._resumed = False + cli.agent = None + cli.conversation_history = [ + {"role": "user", "content": "Hello, can you help me?"}, + {"role": "assistant", "content": "Of course! How can I help?"}, + {"role": "user", "content": "Write a Python function to sort a list."}, + {"role": "assistant", "content": "def sort_list(lst): return sorted(lst)"}, + ] + + # Create the original session in the DB + session_db.create_session( + session_id=cli.session_id, + source="cli", + model=cli.model, + ) + session_db.set_session_title(cli.session_id, "My Coding Session") + + return cli + + +class TestBranchCommandCLI: + """Test the /branch command logic for the CLI.""" + + def test_branch_creates_new_session(self, cli_instance, session_db): + """Branching should create a new session in the DB.""" + from cli import HermesCLI + + # Call the real method on the mock, using the real implementation + HermesCLI._handle_branch_command(cli_instance, "/branch") + + # Verify a new session was created + assert cli_instance.session_id != "20260403_120000_abc123" + new_session = session_db.get_session(cli_instance.session_id) + assert new_session is not None + + def test_branch_copies_history(self, cli_instance, session_db): + """Branching should copy all messages to the new session.""" + from cli import HermesCLI + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + messages = session_db.get_messages_as_conversation(cli_instance.session_id) + assert len(messages) == 4 # All 4 messages copied + + def test_branch_preserves_parent_link(self, cli_instance, session_db): + """The new session should reference the original as parent.""" + from cli import HermesCLI + original_id = cli_instance.session_id + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + new_session = session_db.get_session(cli_instance.session_id) + assert new_session["parent_session_id"] == original_id + + def test_branch_ends_original_session(self, cli_instance, session_db): + """The original session should be marked as ended with 'branched' reason.""" + from cli import HermesCLI + original_id = cli_instance.session_id + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + original = session_db.get_session(original_id) + assert original["end_reason"] == "branched" + + def test_branch_with_custom_name(self, cli_instance, session_db): + """Custom branch name should be used as the title.""" + from cli import HermesCLI + + HermesCLI._handle_branch_command(cli_instance, "/branch refactor approach") + + title = session_db.get_session_title(cli_instance.session_id) + assert title == "refactor approach" + + def test_branch_auto_title_lineage(self, cli_instance, session_db): + """Without a name, branch should auto-generate a title from the parent's title.""" + from cli import HermesCLI + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + title = session_db.get_session_title(cli_instance.session_id) + assert title == "My Coding Session #2" + + def test_branch_empty_conversation(self, cli_instance, session_db): + """Branching with no history should show an error.""" + from cli import HermesCLI + cli_instance.conversation_history = [] + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + # session_id should not have changed + assert cli_instance.session_id == "20260403_120000_abc123" + + def test_branch_no_session_db(self, cli_instance): + """Branching without a session DB should show an error.""" + from cli import HermesCLI + cli_instance._session_db = None + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + # session_id should not have changed + assert cli_instance.session_id == "20260403_120000_abc123" + + def test_branch_syncs_agent(self, cli_instance, session_db): + """If an agent is active, branch should sync it to the new session.""" + from cli import HermesCLI + + agent = MagicMock() + agent._last_flushed_db_idx = 0 + cli_instance.agent = agent + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + # Agent should have been updated + assert agent.session_id == cli_instance.session_id + assert agent.reset_session_state.called + assert agent._last_flushed_db_idx == 4 # len(conversation_history) + + def test_branch_sets_resumed_flag(self, cli_instance, session_db): + """Branch should set _resumed=True to prevent auto-title generation.""" + from cli import HermesCLI + + HermesCLI._handle_branch_command(cli_instance, "/branch") + + assert cli_instance._resumed is True + + def test_fork_alias(self): + """The /fork alias should resolve to 'branch'.""" + from hermes_cli.commands import resolve_command + result = resolve_command("fork") + assert result is not None + assert result.name == "branch" + + +class TestBranchCommandDef: + """Test the CommandDef registration for /branch.""" + + def test_branch_in_registry(self): + """The branch command should be in the command registry.""" + from hermes_cli.commands import COMMAND_REGISTRY + names = [c.name for c in COMMAND_REGISTRY] + assert "branch" in names + + def test_branch_has_fork_alias(self): + """The branch command should have 'fork' as an alias.""" + from hermes_cli.commands import COMMAND_REGISTRY + branch = next(c for c in COMMAND_REGISTRY if c.name == "branch") + assert "fork" in branch.aliases + + def test_branch_in_session_category(self): + """The branch command should be in the Session category.""" + from hermes_cli.commands import COMMAND_REGISTRY + branch = next(c for c in COMMAND_REGISTRY if c.name == "branch") + assert branch.category == "Session"