After migrating from OpenClaw, leftover workspace directories contain state files (todo.json, sessions, logs) that confuse the agent — it discovers them and reads/writes to stale locations instead of the Hermes state directory, causing issues like cron jobs reading a different todo list than interactive sessions. Changes: - hermes claw migrate now offers to archive the source directory after successful migration (rename to .pre-migration, not delete) - New `hermes claw cleanup` subcommand for users who already migrated and need to archive leftover OpenClaw directories - Migration notes updated with explicit cleanup guidance - 42 tests covering all new functionality Reported by SteveSkedasticity — multiple todo.json files across ~/.hermes/, ~/.openclaw/workspace/, and ~/.openclaw/workspace-assistant/ caused cron jobs to read from wrong locations.
569 lines
20 KiB
Python
569 lines
20 KiB
Python
"""hermes claw — OpenClaw migration commands.
|
|
|
|
Usage:
|
|
hermes claw migrate # Interactive migration from ~/.openclaw
|
|
hermes claw migrate --dry-run # Preview what would be migrated
|
|
hermes claw migrate --preset full --overwrite # Full migration, overwrite conflicts
|
|
hermes claw cleanup # Archive leftover OpenClaw directories
|
|
hermes claw cleanup --dry-run # Preview what would be archived
|
|
"""
|
|
|
|
import importlib.util
|
|
import logging
|
|
import shutil
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from hermes_cli.config import get_hermes_home, get_config_path, load_config, save_config
|
|
from hermes_constants import get_optional_skills_dir
|
|
from hermes_cli.setup import (
|
|
Colors,
|
|
color,
|
|
print_header,
|
|
print_info,
|
|
print_success,
|
|
print_error,
|
|
print_warning,
|
|
prompt_yes_no,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
|
|
|
_OPENCLAW_SCRIPT = (
|
|
get_optional_skills_dir(PROJECT_ROOT / "optional-skills")
|
|
/ "migration"
|
|
/ "openclaw-migration"
|
|
/ "scripts"
|
|
/ "openclaw_to_hermes.py"
|
|
)
|
|
|
|
# Fallback: user may have installed the skill from the Hub
|
|
_OPENCLAW_SCRIPT_INSTALLED = (
|
|
get_hermes_home()
|
|
/ "skills"
|
|
/ "migration"
|
|
/ "openclaw-migration"
|
|
/ "scripts"
|
|
/ "openclaw_to_hermes.py"
|
|
)
|
|
|
|
# Known OpenClaw directory names (current + legacy)
|
|
_OPENCLAW_DIR_NAMES = (".openclaw", ".clawdbot", ".moldbot")
|
|
|
|
# State files commonly found in OpenClaw workspace directories that cause
|
|
# confusion after migration (the agent discovers them and writes to them)
|
|
_WORKSPACE_STATE_GLOBS = (
|
|
"*/todo.json",
|
|
"*/sessions/*",
|
|
"*/memory/*.json",
|
|
"*/logs/*",
|
|
)
|
|
|
|
|
|
def _find_migration_script() -> Path | None:
|
|
"""Find the openclaw_to_hermes.py script in known locations."""
|
|
for candidate in [_OPENCLAW_SCRIPT, _OPENCLAW_SCRIPT_INSTALLED]:
|
|
if candidate.exists():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def _load_migration_module(script_path: Path):
|
|
"""Dynamically load the migration script as a module."""
|
|
spec = importlib.util.spec_from_file_location("openclaw_to_hermes", script_path)
|
|
if spec is None or spec.loader is None:
|
|
return None
|
|
mod = importlib.util.module_from_spec(spec)
|
|
# Register in sys.modules so @dataclass can resolve the module
|
|
# (Python 3.11+ requires this for dynamically loaded modules)
|
|
sys.modules[spec.name] = mod
|
|
try:
|
|
spec.loader.exec_module(mod)
|
|
except Exception:
|
|
sys.modules.pop(spec.name, None)
|
|
raise
|
|
return mod
|
|
|
|
|
|
def _find_openclaw_dirs() -> list[Path]:
|
|
"""Find all OpenClaw directories on disk."""
|
|
found = []
|
|
for name in _OPENCLAW_DIR_NAMES:
|
|
candidate = Path.home() / name
|
|
if candidate.is_dir():
|
|
found.append(candidate)
|
|
return found
|
|
|
|
|
|
def _scan_workspace_state(source_dir: Path) -> list[tuple[Path, str]]:
|
|
"""Scan an OpenClaw directory for workspace state files that cause confusion.
|
|
|
|
Returns a list of (path, description) tuples.
|
|
"""
|
|
findings: list[tuple[Path, str]] = []
|
|
|
|
# Direct state files in the root
|
|
for name in ("todo.json", "sessions", "logs"):
|
|
candidate = source_dir / name
|
|
if candidate.exists():
|
|
kind = "directory" if candidate.is_dir() else "file"
|
|
findings.append((candidate, f"Root {kind}: {name}"))
|
|
|
|
# State files inside workspace directories
|
|
for child in sorted(source_dir.iterdir()):
|
|
if not child.is_dir() or child.name.startswith("."):
|
|
continue
|
|
# Check for workspace-like subdirectories
|
|
for state_name in ("todo.json", "sessions", "logs", "memory"):
|
|
state_path = child / state_name
|
|
if state_path.exists():
|
|
kind = "directory" if state_path.is_dir() else "file"
|
|
rel = state_path.relative_to(source_dir)
|
|
findings.append((state_path, f"Workspace {kind}: {rel}"))
|
|
|
|
return findings
|
|
|
|
|
|
def _archive_directory(source_dir: Path, dry_run: bool = False) -> Path:
|
|
"""Rename an OpenClaw directory to .pre-migration.
|
|
|
|
Returns the archive path.
|
|
"""
|
|
timestamp = datetime.now().strftime("%Y%m%d")
|
|
archive_name = f"{source_dir.name}.pre-migration"
|
|
archive_path = source_dir.parent / archive_name
|
|
|
|
# If archive already exists, add timestamp
|
|
if archive_path.exists():
|
|
archive_name = f"{source_dir.name}.pre-migration-{timestamp}"
|
|
archive_path = source_dir.parent / archive_name
|
|
|
|
# If still exists (multiple runs same day), add counter
|
|
counter = 2
|
|
while archive_path.exists():
|
|
archive_name = f"{source_dir.name}.pre-migration-{timestamp}-{counter}"
|
|
archive_path = source_dir.parent / archive_name
|
|
counter += 1
|
|
|
|
if not dry_run:
|
|
source_dir.rename(archive_path)
|
|
|
|
return archive_path
|
|
|
|
|
|
def claw_command(args):
|
|
"""Route hermes claw subcommands."""
|
|
action = getattr(args, "claw_action", None)
|
|
|
|
if action == "migrate":
|
|
_cmd_migrate(args)
|
|
elif action in ("cleanup", "clean"):
|
|
_cmd_cleanup(args)
|
|
else:
|
|
print("Usage: hermes claw <command> [options]")
|
|
print()
|
|
print("Commands:")
|
|
print(" migrate Migrate settings from OpenClaw to Hermes")
|
|
print(" cleanup Archive leftover OpenClaw directories after migration")
|
|
print()
|
|
print("Run 'hermes claw <command> --help' for options.")
|
|
|
|
|
|
def _cmd_migrate(args):
|
|
"""Run the OpenClaw → Hermes migration."""
|
|
# Check current and legacy OpenClaw directories
|
|
explicit_source = getattr(args, "source", None)
|
|
if explicit_source:
|
|
source_dir = Path(explicit_source)
|
|
else:
|
|
source_dir = Path.home() / ".openclaw"
|
|
if not source_dir.is_dir():
|
|
# Try legacy directory names
|
|
for legacy in (".clawdbot", ".moldbot"):
|
|
candidate = Path.home() / legacy
|
|
if candidate.is_dir():
|
|
source_dir = candidate
|
|
break
|
|
dry_run = getattr(args, "dry_run", False)
|
|
preset = getattr(args, "preset", "full")
|
|
overwrite = getattr(args, "overwrite", False)
|
|
migrate_secrets = getattr(args, "migrate_secrets", False)
|
|
workspace_target = getattr(args, "workspace_target", None)
|
|
skill_conflict = getattr(args, "skill_conflict", "skip")
|
|
|
|
# If using the "full" preset, secrets are included by default
|
|
if preset == "full":
|
|
migrate_secrets = True
|
|
|
|
print()
|
|
print(
|
|
color(
|
|
"┌─────────────────────────────────────────────────────────┐",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
print(
|
|
color(
|
|
"│ ⚕ Hermes — OpenClaw Migration │",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
print(
|
|
color(
|
|
"└─────────────────────────────────────────────────────────┘",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
|
|
# Check source directory
|
|
if not source_dir.is_dir():
|
|
print()
|
|
print_error(f"OpenClaw directory not found: {source_dir}")
|
|
print_info("Make sure your OpenClaw installation is at the expected path.")
|
|
print_info("You can specify a custom path: hermes claw migrate --source /path/to/.openclaw")
|
|
return
|
|
|
|
# Find the migration script
|
|
script_path = _find_migration_script()
|
|
if not script_path:
|
|
print()
|
|
print_error("Migration script not found.")
|
|
print_info("Expected at one of:")
|
|
print_info(f" {_OPENCLAW_SCRIPT}")
|
|
print_info(f" {_OPENCLAW_SCRIPT_INSTALLED}")
|
|
print_info("Make sure the openclaw-migration skill is installed.")
|
|
return
|
|
|
|
# Show what we're doing
|
|
hermes_home = get_hermes_home()
|
|
print()
|
|
print_header("Migration Settings")
|
|
print_info(f"Source: {source_dir}")
|
|
print_info(f"Target: {hermes_home}")
|
|
print_info(f"Preset: {preset}")
|
|
print_info(f"Mode: {'dry run (preview only)' if dry_run else 'execute'}")
|
|
print_info(f"Overwrite: {'yes' if overwrite else 'no (skip conflicts)'}")
|
|
print_info(f"Secrets: {'yes (allowlisted only)' if migrate_secrets else 'no'}")
|
|
if skill_conflict != "skip":
|
|
print_info(f"Skill conflicts: {skill_conflict}")
|
|
if workspace_target:
|
|
print_info(f"Workspace: {workspace_target}")
|
|
print()
|
|
|
|
# For execute mode (non-dry-run), confirm unless --yes was passed
|
|
if not dry_run and not getattr(args, "yes", False):
|
|
if not prompt_yes_no("Proceed with migration?", default=True):
|
|
print_info("Migration cancelled.")
|
|
return
|
|
|
|
# Ensure config.yaml exists before migration tries to read it
|
|
config_path = get_config_path()
|
|
if not config_path.exists():
|
|
save_config(load_config())
|
|
|
|
# Load and run the migration
|
|
try:
|
|
mod = _load_migration_module(script_path)
|
|
if mod is None:
|
|
print_error("Could not load migration script.")
|
|
return
|
|
|
|
selected = mod.resolve_selected_options(None, None, preset=preset)
|
|
ws_target = Path(workspace_target).resolve() if workspace_target else None
|
|
|
|
migrator = mod.Migrator(
|
|
source_root=source_dir.resolve(),
|
|
target_root=hermes_home.resolve(),
|
|
execute=not dry_run,
|
|
workspace_target=ws_target,
|
|
overwrite=overwrite,
|
|
migrate_secrets=migrate_secrets,
|
|
output_dir=None,
|
|
selected_options=selected,
|
|
preset_name=preset,
|
|
skill_conflict_mode=skill_conflict,
|
|
)
|
|
report = migrator.migrate()
|
|
except Exception as e:
|
|
print()
|
|
print_error(f"Migration failed: {e}")
|
|
logger.debug("OpenClaw migration error", exc_info=True)
|
|
return
|
|
|
|
# Print results
|
|
_print_migration_report(report, dry_run)
|
|
|
|
# After successful non-dry-run migration, offer to archive the source directory
|
|
if not dry_run and report.get("summary", {}).get("migrated", 0) > 0:
|
|
_offer_source_archival(source_dir, getattr(args, "yes", False))
|
|
|
|
|
|
def _offer_source_archival(source_dir: Path, auto_yes: bool = False):
|
|
"""After migration, offer to rename the source directory to prevent state fragmentation.
|
|
|
|
OpenClaw workspace directories contain state files (todo.json, sessions, etc.)
|
|
that the agent may discover and write to, causing confusion. Renaming the
|
|
directory prevents this.
|
|
"""
|
|
if not source_dir.is_dir():
|
|
return
|
|
|
|
# Scan for state files that could cause problems
|
|
state_files = _scan_workspace_state(source_dir)
|
|
|
|
print()
|
|
print_header("Post-Migration Cleanup")
|
|
print_info("The OpenClaw directory still exists and contains workspace state files")
|
|
print_info("that can confuse the agent (todo lists, sessions, logs).")
|
|
if state_files:
|
|
print()
|
|
print(color(" Found state files:", Colors.YELLOW))
|
|
# Show up to 10 most relevant findings
|
|
for path, desc in state_files[:10]:
|
|
print(f" {desc}")
|
|
if len(state_files) > 10:
|
|
print(f" ... and {len(state_files) - 10} more")
|
|
print()
|
|
print_info(f"Recommend: rename {source_dir.name}/ to {source_dir.name}.pre-migration/")
|
|
print_info("This prevents the agent from discovering old workspace directories.")
|
|
print_info("You can always rename it back if needed.")
|
|
print()
|
|
|
|
if auto_yes or prompt_yes_no(f"Archive {source_dir} now?", default=True):
|
|
try:
|
|
archive_path = _archive_directory(source_dir)
|
|
print_success(f"Archived: {source_dir} → {archive_path}")
|
|
print_info("The original directory has been renamed, not deleted.")
|
|
print_info(f"To undo: mv {archive_path} {source_dir}")
|
|
except OSError as e:
|
|
print_error(f"Could not archive: {e}")
|
|
print_info(f"You can do it manually: mv {source_dir} {source_dir}.pre-migration")
|
|
else:
|
|
print_info("Skipped. You can archive later with: hermes claw cleanup")
|
|
|
|
|
|
def _cmd_cleanup(args):
|
|
"""Archive leftover OpenClaw directories after migration.
|
|
|
|
Scans for OpenClaw directories that still exist after migration and offers
|
|
to rename them to .pre-migration to prevent state fragmentation.
|
|
"""
|
|
dry_run = getattr(args, "dry_run", False)
|
|
auto_yes = getattr(args, "yes", False)
|
|
explicit_source = getattr(args, "source", None)
|
|
|
|
print()
|
|
print(
|
|
color(
|
|
"┌─────────────────────────────────────────────────────────┐",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
print(
|
|
color(
|
|
"│ ⚕ Hermes — OpenClaw Cleanup │",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
print(
|
|
color(
|
|
"└─────────────────────────────────────────────────────────┘",
|
|
Colors.MAGENTA,
|
|
)
|
|
)
|
|
|
|
# Find OpenClaw directories
|
|
if explicit_source:
|
|
dirs_to_check = [Path(explicit_source)]
|
|
else:
|
|
dirs_to_check = _find_openclaw_dirs()
|
|
|
|
if not dirs_to_check:
|
|
print()
|
|
print_success("No OpenClaw directories found. Nothing to clean up.")
|
|
return
|
|
|
|
total_archived = 0
|
|
|
|
for source_dir in dirs_to_check:
|
|
print()
|
|
print_header(f"Found: {source_dir}")
|
|
|
|
# Scan for state files
|
|
state_files = _scan_workspace_state(source_dir)
|
|
|
|
# Show directory stats
|
|
try:
|
|
workspace_dirs = [
|
|
d for d in source_dir.iterdir()
|
|
if d.is_dir() and not d.name.startswith(".")
|
|
and any((d / name).exists() for name in ("todo.json", "SOUL.md", "MEMORY.md", "USER.md"))
|
|
]
|
|
except OSError:
|
|
workspace_dirs = []
|
|
|
|
if workspace_dirs:
|
|
print_info(f"Workspace directories: {len(workspace_dirs)}")
|
|
for ws in workspace_dirs[:5]:
|
|
items = []
|
|
if (ws / "todo.json").exists():
|
|
items.append("todo.json")
|
|
if (ws / "sessions").is_dir():
|
|
items.append("sessions/")
|
|
if (ws / "SOUL.md").exists():
|
|
items.append("SOUL.md")
|
|
if (ws / "MEMORY.md").exists():
|
|
items.append("MEMORY.md")
|
|
detail = ", ".join(items) if items else "empty"
|
|
print(f" {ws.name}/ ({detail})")
|
|
if len(workspace_dirs) > 5:
|
|
print(f" ... and {len(workspace_dirs) - 5} more")
|
|
|
|
if state_files:
|
|
print()
|
|
print(color(f" {len(state_files)} state file(s) that could cause confusion:", Colors.YELLOW))
|
|
for path, desc in state_files[:8]:
|
|
print(f" {desc}")
|
|
if len(state_files) > 8:
|
|
print(f" ... and {len(state_files) - 8} more")
|
|
|
|
print()
|
|
|
|
if dry_run:
|
|
archive_path = _archive_directory(source_dir, dry_run=True)
|
|
print_info(f"Would archive: {source_dir} → {archive_path}")
|
|
else:
|
|
if auto_yes or prompt_yes_no(f"Archive {source_dir}?", default=True):
|
|
try:
|
|
archive_path = _archive_directory(source_dir)
|
|
print_success(f"Archived: {source_dir} → {archive_path}")
|
|
total_archived += 1
|
|
except OSError as e:
|
|
print_error(f"Could not archive: {e}")
|
|
print_info(f"Try manually: mv {source_dir} {source_dir}.pre-migration")
|
|
else:
|
|
print_info("Skipped.")
|
|
|
|
# Summary
|
|
print()
|
|
if dry_run:
|
|
print_info(f"Dry run complete. {len(dirs_to_check)} directory(ies) would be archived.")
|
|
print_info("Run without --dry-run to archive them.")
|
|
elif total_archived:
|
|
print_success(f"Cleaned up {total_archived} OpenClaw directory(ies).")
|
|
print_info("Directories were renamed, not deleted. You can undo by renaming them back.")
|
|
else:
|
|
print_info("No directories were archived.")
|
|
|
|
|
|
def _print_migration_report(report: dict, dry_run: bool):
|
|
"""Print a formatted migration report."""
|
|
summary = report.get("summary", {})
|
|
migrated = summary.get("migrated", 0)
|
|
skipped = summary.get("skipped", 0)
|
|
conflicts = summary.get("conflict", 0)
|
|
errors = summary.get("error", 0)
|
|
|
|
print()
|
|
if dry_run:
|
|
print_header("Dry Run Results")
|
|
print_info("No files were modified. This is a preview of what would happen.")
|
|
else:
|
|
print_header("Migration Results")
|
|
|
|
print()
|
|
|
|
# Detailed items
|
|
items = report.get("items", [])
|
|
if items:
|
|
# Group by status
|
|
migrated_items = [i for i in items if i.get("status") == "migrated"]
|
|
skipped_items = [i for i in items if i.get("status") == "skipped"]
|
|
conflict_items = [i for i in items if i.get("status") == "conflict"]
|
|
error_items = [i for i in items if i.get("status") == "error"]
|
|
|
|
if migrated_items:
|
|
label = "Would migrate" if dry_run else "Migrated"
|
|
print(color(f" ✓ {label}:", Colors.GREEN))
|
|
for item in migrated_items:
|
|
kind = item.get("kind", "unknown")
|
|
dest = item.get("destination", "")
|
|
if dest:
|
|
dest_short = str(dest).replace(str(Path.home()), "~")
|
|
print(f" {kind:<22s} → {dest_short}")
|
|
else:
|
|
print(f" {kind}")
|
|
print()
|
|
|
|
if conflict_items:
|
|
print(color(" ⚠ Conflicts (skipped — use --overwrite to force):", Colors.YELLOW))
|
|
for item in conflict_items:
|
|
kind = item.get("kind", "unknown")
|
|
reason = item.get("reason", "already exists")
|
|
print(f" {kind:<22s} {reason}")
|
|
print()
|
|
|
|
if skipped_items:
|
|
print(color(" ─ Skipped:", Colors.DIM))
|
|
for item in skipped_items:
|
|
kind = item.get("kind", "unknown")
|
|
reason = item.get("reason", "")
|
|
print(f" {kind:<22s} {reason}")
|
|
print()
|
|
|
|
if error_items:
|
|
print(color(" ✗ Errors:", Colors.RED))
|
|
for item in error_items:
|
|
kind = item.get("kind", "unknown")
|
|
reason = item.get("reason", "unknown error")
|
|
print(f" {kind:<22s} {reason}")
|
|
print()
|
|
|
|
# Summary line
|
|
parts = []
|
|
if migrated:
|
|
action = "would migrate" if dry_run else "migrated"
|
|
parts.append(f"{migrated} {action}")
|
|
if conflicts:
|
|
parts.append(f"{conflicts} conflict(s)")
|
|
if skipped:
|
|
parts.append(f"{skipped} skipped")
|
|
if errors:
|
|
parts.append(f"{errors} error(s)")
|
|
|
|
if parts:
|
|
print_info(f"Summary: {', '.join(parts)}")
|
|
else:
|
|
print_info("Nothing to migrate.")
|
|
|
|
# Output directory
|
|
output_dir = report.get("output_dir")
|
|
if output_dir:
|
|
print_info(f"Full report saved to: {output_dir}")
|
|
|
|
if dry_run:
|
|
print()
|
|
print_info("To execute the migration, run without --dry-run:")
|
|
print_info(f" hermes claw migrate --preset {report.get('preset', 'full')}")
|
|
elif migrated:
|
|
print()
|
|
print_success("Migration complete!")
|
|
# Warn if API keys were skipped (migrate_secrets not enabled)
|
|
skipped_keys = [
|
|
i for i in report.get("items", [])
|
|
if i.get("kind") == "provider-keys" and i.get("status") == "skipped"
|
|
]
|
|
if skipped_keys:
|
|
print()
|
|
print(color(" ⚠ API keys were NOT migrated (secrets migration is disabled by default).", Colors.YELLOW))
|
|
print(color(" Your OPENROUTER_API_KEY and other provider keys must be added manually.", Colors.YELLOW))
|
|
print()
|
|
print_info("To migrate API keys, re-run with:")
|
|
print_info(" hermes claw migrate --migrate-secrets")
|
|
print()
|
|
print_info("Or add your key manually:")
|
|
print_info(" hermes config set OPENROUTER_API_KEY sk-or-v1-...")
|