diff --git a/cli.py b/cli.py index 2e17060af..470186572 100755 --- a/cli.py +++ b/cli.py @@ -1879,7 +1879,14 @@ class HermesCLI: return False def _handle_rollback_command(self, command: str): - """Handle /rollback — list or restore filesystem checkpoints.""" + """Handle /rollback — list, diff, or restore filesystem checkpoints. + + Syntax: + /rollback — list checkpoints + /rollback — restore checkpoint N (also undoes last chat turn) + /rollback diff — preview changes since checkpoint N + /rollback — restore a single file from checkpoint N + """ from tools.checkpoint_manager import CheckpointManager, format_checkpoint_list if not hasattr(self, 'agent') or not self.agent: @@ -1894,38 +1901,89 @@ class HermesCLI: return cwd = os.getenv("TERMINAL_CWD", os.getcwd()) - parts = command.split(maxsplit=1) - arg = parts[1].strip() if len(parts) > 1 else "" + parts = command.split() + args = parts[1:] if len(parts) > 1 else [] - if not arg: + if not args: # List checkpoints checkpoints = mgr.list_checkpoints(cwd) print(format_checkpoint_list(checkpoints, cwd)) - else: - # Restore by number or hash + return + + # Handle /rollback diff + if args[0].lower() == "diff": + if len(args) < 2: + print(" Usage: /rollback diff ") + return checkpoints = mgr.list_checkpoints(cwd) if not checkpoints: print(f" No checkpoints found for {cwd}") return - - target_hash = None - try: - idx = int(arg) - 1 # 1-indexed for user - if 0 <= idx < len(checkpoints): - target_hash = checkpoints[idx]["hash"] - else: - print(f" Invalid checkpoint number. Use 1-{len(checkpoints)}.") - return - except ValueError: - # Try as a git hash - target_hash = arg - - result = mgr.restore(cwd, target_hash) + target_hash = self._resolve_checkpoint_ref(args[1], checkpoints) + if not target_hash: + return + result = mgr.diff(cwd, target_hash) if result["success"]: - print(f" ✅ Restored to checkpoint {result['restored_to']}: {result['reason']}") - print(f" A pre-rollback snapshot was saved automatically.") + stat = result.get("stat", "") + diff = result.get("diff", "") + if not stat and not diff: + print(" No changes since this checkpoint.") + else: + if stat: + print(f"\n{stat}") + if diff: + # Limit diff output to avoid terminal flood + diff_lines = diff.splitlines() + if len(diff_lines) > 80: + print("\n".join(diff_lines[:80])) + print(f"\n ... ({len(diff_lines) - 80} more lines, showing first 80)") + else: + print(f"\n{diff}") else: print(f" ❌ {result['error']}") + return + + # Resolve checkpoint reference (number or hash) + checkpoints = mgr.list_checkpoints(cwd) + if not checkpoints: + print(f" No checkpoints found for {cwd}") + return + + target_hash = self._resolve_checkpoint_ref(args[0], checkpoints) + if not target_hash: + return + + # Check for file-level restore: /rollback + file_path = args[1] if len(args) > 1 else None + + result = mgr.restore(cwd, target_hash, file_path=file_path) + if result["success"]: + if file_path: + print(f" ✅ Restored {file_path} from checkpoint {result['restored_to']}: {result['reason']}") + else: + print(f" ✅ Restored to checkpoint {result['restored_to']}: {result['reason']}") + print(f" A pre-rollback snapshot was saved automatically.") + + # Also undo the last conversation turn so the agent's context + # matches the restored filesystem state + if self.conversation_history: + self.undo_last() + print(f" Chat turn undone to match restored file state.") + else: + print(f" ❌ {result['error']}") + + def _resolve_checkpoint_ref(self, ref: str, checkpoints: list) -> str | None: + """Resolve a checkpoint number or hash to a full commit hash.""" + try: + idx = int(ref) - 1 # 1-indexed for user + if 0 <= idx < len(checkpoints): + return checkpoints[idx]["hash"] + else: + print(f" Invalid checkpoint number. Use 1-{len(checkpoints)}.") + return None + except ValueError: + # Treat as a git hash + return ref def _handle_paste_command(self): """Handle /paste — explicitly check clipboard for an image. diff --git a/hermes_cli/config.py b/hermes_cli/config.py index df30d566a..f78131308 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -134,7 +134,7 @@ DEFAULT_CONFIG = { # When enabled, the agent takes a snapshot of the working directory once per # conversation turn (on first write_file/patch call). Use /rollback to restore. "checkpoints": { - "enabled": False, + "enabled": True, "max_snapshots": 50, # Max checkpoints to keep per directory }, diff --git a/run_agent.py b/run_agent.py index 394e31adf..8a4147a8b 100644 --- a/run_agent.py +++ b/run_agent.py @@ -205,6 +205,33 @@ _NEVER_PARALLEL_TOOLS = frozenset({"clarify"}) # Maximum number of concurrent worker threads for parallel tool execution. _MAX_TOOL_WORKERS = 8 +# Patterns that indicate a terminal command may modify/delete files. +_DESTRUCTIVE_PATTERNS = re.compile( + r"""(?:^|\s|&&|\|\||;|`)(?: + rm\s|rmdir\s| + mv\s| + sed\s+-i| + truncate\s| + dd\s| + shred\s| + git\s+(?:reset|clean|checkout)\s + )""", + re.VERBOSE, +) +# Output redirects that overwrite files (> but not >>) +_REDIRECT_OVERWRITE = re.compile(r'[^>]>[^>]|^>[^>]') + + +def _is_destructive_command(cmd: str) -> bool: + """Heuristic: does this terminal command look like it modifies/deletes files?""" + if not cmd: + return False + if _DESTRUCTIVE_PATTERNS.search(cmd): + return True + if _REDIRECT_OVERWRITE.search(cmd): + return True + return False + def _inject_honcho_turn_context(content, turn_context: str): """Append Honcho recall to the current-turn user message without mutating history. @@ -3842,6 +3869,18 @@ class AIAgent: except Exception: pass + # Checkpoint before destructive terminal commands + if function_name == "terminal" and self._checkpoint_mgr.enabled: + try: + cmd = function_args.get("command", "") + if _is_destructive_command(cmd): + cwd = function_args.get("workdir") or os.getenv("TERMINAL_CWD", os.getcwd()) + self._checkpoint_mgr.ensure_checkpoint( + cwd, f"before terminal: {cmd[:60]}" + ) + except Exception: + pass + parsed_calls.append((tool_call, function_name, function_args)) # ── Logging / callbacks ────────────────────────────────────────── @@ -4035,6 +4074,18 @@ class AIAgent: except Exception: pass # never block tool execution + # Checkpoint before destructive terminal commands + if function_name == "terminal" and self._checkpoint_mgr.enabled: + try: + cmd = function_args.get("command", "") + if _is_destructive_command(cmd): + cwd = function_args.get("workdir") or os.getenv("TERMINAL_CWD", os.getcwd()) + self._checkpoint_mgr.ensure_checkpoint( + cwd, f"before terminal: {cmd[:60]}" + ) + except Exception: + pass # never block tool execution + tool_start_time = time.time() if function_name == "todo": diff --git a/tools/checkpoint_manager.py b/tools/checkpoint_manager.py index 5315e37b1..0227c9ee1 100644 --- a/tools/checkpoint_manager.py +++ b/tools/checkpoint_manager.py @@ -251,8 +251,8 @@ class CheckpointManager: def list_checkpoints(self, working_dir: str) -> List[Dict]: """List available checkpoints for a directory. - Returns a list of dicts with keys: hash, short_hash, timestamp, reason. - Most recent first. + Returns a list of dicts with keys: hash, short_hash, timestamp, reason, + files_changed, insertions, deletions. Most recent first. """ abs_dir = str(Path(working_dir).resolve()) shadow = _shadow_repo_path(abs_dir) @@ -260,14 +260,6 @@ class CheckpointManager: if not (shadow / "HEAD").exists(): return [] - ok, stdout, _ = _run_git( - ["log", "--format=%H|%h|%aI|%s", "--no-walk=unsorted", - "--all" if False else "HEAD", # just HEAD lineage - "-n", str(self.max_snapshots)], - shadow, abs_dir, - ) - - # Simpler: just use regular log ok, stdout, _ = _run_git( ["log", "--format=%H|%h|%aI|%s", "-n", str(self.max_snapshots)], shadow, abs_dir, @@ -280,19 +272,95 @@ class CheckpointManager: for line in stdout.splitlines(): parts = line.split("|", 3) if len(parts) == 4: - results.append({ + entry = { "hash": parts[0], "short_hash": parts[1], "timestamp": parts[2], "reason": parts[3], - }) + "files_changed": 0, + "insertions": 0, + "deletions": 0, + } + # Get diffstat for this commit + stat_ok, stat_out, _ = _run_git( + ["diff", "--shortstat", f"{parts[0]}~1", parts[0]], + shadow, abs_dir, + allowed_returncodes={128, 129}, # first commit has no parent + ) + if stat_ok and stat_out: + self._parse_shortstat(stat_out, entry) + results.append(entry) return results - def restore(self, working_dir: str, commit_hash: str) -> Dict: + @staticmethod + def _parse_shortstat(stat_line: str, entry: Dict) -> None: + """Parse git --shortstat output into entry dict.""" + import re + m = re.search(r'(\d+) file', stat_line) + if m: + entry["files_changed"] = int(m.group(1)) + m = re.search(r'(\d+) insertion', stat_line) + if m: + entry["insertions"] = int(m.group(1)) + m = re.search(r'(\d+) deletion', stat_line) + if m: + entry["deletions"] = int(m.group(1)) + + def diff(self, working_dir: str, commit_hash: str) -> Dict: + """Show diff between a checkpoint and the current working tree. + + Returns dict with success, diff text, and stat summary. + """ + abs_dir = str(Path(working_dir).resolve()) + shadow = _shadow_repo_path(abs_dir) + + if not (shadow / "HEAD").exists(): + return {"success": False, "error": "No checkpoints exist for this directory"} + + # Verify the commit exists + ok, _, err = _run_git( + ["cat-file", "-t", commit_hash], shadow, abs_dir, + ) + if not ok: + return {"success": False, "error": f"Checkpoint '{commit_hash}' not found"} + + # Stage current state to compare against checkpoint + _run_git(["add", "-A"], shadow, abs_dir, timeout=_GIT_TIMEOUT * 2) + + # Get stat summary: checkpoint vs current working tree + ok_stat, stat_out, _ = _run_git( + ["diff", "--stat", commit_hash, "--cached"], + shadow, abs_dir, + ) + + # Get actual diff (limited to avoid terminal flood) + ok_diff, diff_out, _ = _run_git( + ["diff", commit_hash, "--cached", "--no-color"], + shadow, abs_dir, + ) + + # Unstage to avoid polluting the shadow repo index + _run_git(["reset", "HEAD", "--quiet"], shadow, abs_dir) + + if not ok_stat and not ok_diff: + return {"success": False, "error": "Could not generate diff"} + + return { + "success": True, + "stat": stat_out if ok_stat else "", + "diff": diff_out if ok_diff else "", + } + + def restore(self, working_dir: str, commit_hash: str, file_path: str = None) -> Dict: """Restore files to a checkpoint state. - Uses ``git checkout -- .`` which restores tracked files - without moving HEAD — safe and reversible. + Uses ``git checkout -- .`` (or a specific file) which restores + tracked files without moving HEAD — safe and reversible. + + Parameters + ---------- + file_path : str, optional + If provided, restore only this file instead of the entire directory. Returns dict with success/error info. """ @@ -312,14 +380,15 @@ class CheckpointManager: # Take a checkpoint of current state before restoring (so you can undo the undo) self._take(abs_dir, f"pre-rollback snapshot (restoring to {commit_hash[:8]})") - # Restore + # Restore — full directory or single file + restore_target = file_path if file_path else "." ok, stdout, err = _run_git( - ["checkout", commit_hash, "--", "."], + ["checkout", commit_hash, "--", restore_target], shadow, abs_dir, timeout=_GIT_TIMEOUT * 2, ) if not ok: - return {"success": False, "error": "Restore failed", "debug": err or None} + return {"success": False, "error": f"Restore failed: {err}", "debug": err or None} # Get info about what was restored ok2, reason_out, _ = _run_git( @@ -327,12 +396,15 @@ class CheckpointManager: ) reason = reason_out if ok2 else "unknown" - return { + result = { "success": True, "restored_to": commit_hash[:8], "reason": reason, "directory": abs_dir, } + if file_path: + result["file"] = file_path + return result def get_working_dir_for_path(self, file_path: str) -> str: """Resolve a file path to its working directory for checkpointing. @@ -458,7 +530,19 @@ def format_checkpoint_list(checkpoints: List[Dict], directory: str) -> str: ts = ts.split("T")[1].split("+")[0].split("-")[0][:5] # HH:MM date = cp["timestamp"].split("T")[0] ts = f"{date} {ts}" - lines.append(f" {i}. {cp['short_hash']} {ts} {cp['reason']}") - lines.append(f"\nUse /rollback to restore, e.g. /rollback 1") + # Build change summary + files = cp.get("files_changed", 0) + ins = cp.get("insertions", 0) + dele = cp.get("deletions", 0) + if files: + stat = f" ({files} file{'s' if files != 1 else ''}, +{ins}/-{dele})" + else: + stat = "" + + lines.append(f" {i}. {cp['short_hash']} {ts} {cp['reason']}{stat}") + + lines.append(f"\n /rollback restore to checkpoint N") + lines.append(f" /rollback diff preview changes since checkpoint N") + lines.append(f" /rollback restore a single file from checkpoint N") return "\n".join(lines)