From 1755a9e38a77bf5e5f0d280635ddb61970fb9d0a Mon Sep 17 00:00:00 2001 From: unmodeled-tyler Date: Fri, 6 Mar 2026 15:12:45 -0800 Subject: [PATCH] Design agent migration skill for Hermes Agent from OpenClaw | Run successful dry tests with reports --- optional-skills/migration/DESCRIPTION.md | 2 + .../migration/openclaw-migration/SKILL.md | 83 ++ .../scripts/openclaw_to_hermes.py | 838 ++++++++++++++++++ tests/skills/test_openclaw_migration.py | 137 +++ 4 files changed, 1060 insertions(+) create mode 100644 optional-skills/migration/DESCRIPTION.md create mode 100644 optional-skills/migration/openclaw-migration/SKILL.md create mode 100644 optional-skills/migration/openclaw-migration/scripts/openclaw_to_hermes.py create mode 100644 tests/skills/test_openclaw_migration.py diff --git a/optional-skills/migration/DESCRIPTION.md b/optional-skills/migration/DESCRIPTION.md new file mode 100644 index 00000000..b1357339 --- /dev/null +++ b/optional-skills/migration/DESCRIPTION.md @@ -0,0 +1,2 @@ +Optional migration workflows for importing user state and customizations from +other agent systems into Hermes Agent. diff --git a/optional-skills/migration/openclaw-migration/SKILL.md b/optional-skills/migration/openclaw-migration/SKILL.md new file mode 100644 index 00000000..f965ca16 --- /dev/null +++ b/optional-skills/migration/openclaw-migration/SKILL.md @@ -0,0 +1,83 @@ +--- +name: openclaw-migration +description: Migrate a user's OpenClaw customization footprint into Hermes Agent. Imports Hermes-compatible memories, SOUL.md, command allowlists, user skills, and selected workspace assets from ~/.openclaw, then reports exactly what could not be migrated and why. +version: 1.0.0 +author: Hermes Agent (Nous Research) +license: MIT +metadata: + hermes: + tags: [Migration, OpenClaw, Hermes, Memory, Persona, Import] + related_skills: [hermes-agent] +--- + +# OpenClaw -> Hermes Migration + +Use this skill when a user wants to move their OpenClaw setup into Hermes Agent with minimal manual cleanup. + +## What this skill does + +It uses `scripts/openclaw_to_hermes.py` to: + +- import `SOUL.md` into `~/.hermes/SOUL.md` +- transform OpenClaw `MEMORY.md` and `USER.md` into Hermes memory entries +- merge OpenClaw command approval patterns into Hermes `command_allowlist` +- migrate Hermes-compatible messaging settings such as `TELEGRAM_ALLOWED_USERS` and `MESSAGING_CWD` +- copy OpenClaw skills into `~/.hermes/skills/openclaw-imports/` +- optionally copy the OpenClaw workspace `AGENTS.md` into a chosen Hermes workspace +- mirror compatible workspace assets such as `workspace/tts/` into `~/.hermes/tts/` +- archive non-secret docs that do not have a direct Hermes destination +- produce a structured report listing migrated items, conflicts, skipped items, and reasons + +With `--migrate-secrets`, it will also import a small allowlisted set of Hermes-compatible secrets, currently: + +- `TELEGRAM_BOT_TOKEN` + +## Default workflow + +1. Inspect first with a dry run. +2. Ask for a target workspace path if `AGENTS.md` should be brought over. +3. Execute the migration. +4. Summarize the results, especially: + - what was migrated + - what was archived for manual review + - what was skipped and why + +## Commands + +Dry run: + +```bash +python3 SKILL_DIR/scripts/openclaw_to_hermes.py --workspace-target "$PWD" +``` + +Execute: + +```bash +python3 SKILL_DIR/scripts/openclaw_to_hermes.py --execute --workspace-target "$PWD" +``` + +Execute with Hermes-compatible secret migration enabled: + +```bash +python3 SKILL_DIR/scripts/openclaw_to_hermes.py --execute --migrate-secrets --workspace-target "$PWD" +``` + +If the user does not want to import workspace instructions into the current directory, omit `--workspace-target`. + +## Important rules + +1. Run a dry run before writing unless the user explicitly says to proceed immediately. +2. Do not migrate secrets by default. Tokens, auth blobs, device credentials, and raw gateway config should stay out of Hermes unless the user explicitly asks for secret migration. +3. Do not silently overwrite non-empty Hermes targets unless the user explicitly wants that. The helper script will preserve backups when overwriting is enabled. +4. Always give the user the skipped-items report. That report is part of the migration, not an optional extra. +5. Prefer the primary OpenClaw workspace (`~/.openclaw/workspace/`) over `workspace.default/`. Only use the default workspace as fallback when the primary files are missing. +6. Even in secret-migration mode, only migrate secrets with a clean Hermes destination. Unsupported auth blobs must still be reported as skipped. + +## Expected result + +After a successful run, the user should have: + +- Hermes persona state imported +- Hermes memory files populated with converted OpenClaw knowledge +- OpenClaw skills available under `~/.hermes/skills/openclaw-imports/` +- a migration report showing any conflicts, omissions, or unsupported data diff --git a/optional-skills/migration/openclaw-migration/scripts/openclaw_to_hermes.py b/optional-skills/migration/openclaw-migration/scripts/openclaw_to_hermes.py new file mode 100644 index 00000000..6cb7d95c --- /dev/null +++ b/optional-skills/migration/openclaw-migration/scripts/openclaw_to_hermes.py @@ -0,0 +1,838 @@ +#!/usr/bin/env python3 +"""OpenClaw -> Hermes migration helper. + +This script migrates the parts of an OpenClaw user footprint that map cleanly +into Hermes Agent, archives selected unmapped docs for manual review, and +reports exactly what was skipped and why. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import os +import re +import shutil +from dataclasses import asdict, dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Tuple + +try: + import yaml +except Exception: # pragma: no cover - handled at runtime + yaml = None + + +ENTRY_DELIMITER = "\n§\n" +DEFAULT_MEMORY_CHAR_LIMIT = 2200 +DEFAULT_USER_CHAR_LIMIT = 1375 +SKILL_CATEGORY_DIRNAME = "openclaw-imports" +SKILL_CATEGORY_DESCRIPTION = ( + "Skills migrated from an OpenClaw workspace." +) +SUPPORTED_SECRET_TARGETS = { + "TELEGRAM_BOT_TOKEN", +} + + +@dataclass +class ItemResult: + kind: str + source: Optional[str] + destination: Optional[str] + status: str + reason: str = "" + details: Dict[str, Any] = field(default_factory=dict) + + +def sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as fh: + for chunk in iter(lambda: fh.read(65536), b""): + h.update(chunk) + return h.hexdigest() + + +def read_text(path: Path) -> str: + return path.read_text(encoding="utf-8") + + +def normalize_text(text: str) -> str: + return re.sub(r"\s+", " ", text.strip()) + + +def ensure_parent(path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + + +def load_yaml_file(path: Path) -> Dict[str, Any]: + if yaml is None or not path.exists(): + return {} + data = yaml.safe_load(path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {} + + +def dump_yaml_file(path: Path, data: Dict[str, Any]) -> None: + if yaml is None: + raise RuntimeError("PyYAML is required to update Hermes config.yaml") + ensure_parent(path) + path.write_text( + yaml.safe_dump(data, sort_keys=False, allow_unicode=False), + encoding="utf-8", + ) + + +def parse_env_file(path: Path) -> Dict[str, str]: + if not path.exists(): + return {} + data: Dict[str, str] = {} + for raw_line in path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, value = line.partition("=") + data[key.strip()] = value.strip() + return data + + +def save_env_file(path: Path, data: Dict[str, str]) -> None: + ensure_parent(path) + lines = [f"{key}={value}" for key, value in data.items()] + path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") + + +def backup_existing(path: Path, backup_root: Path) -> Optional[Path]: + if not path.exists(): + return None + rel = Path(*path.parts[1:]) if path.is_absolute() and len(path.parts) > 1 else path + dest = backup_root / rel + ensure_parent(dest) + if path.is_dir(): + shutil.copytree(path, dest, dirs_exist_ok=True) + else: + shutil.copy2(path, dest) + return dest + + +def parse_existing_memory_entries(path: Path) -> List[str]: + if not path.exists(): + return [] + raw = read_text(path) + if not raw.strip(): + return [] + if ENTRY_DELIMITER in raw: + return [e.strip() for e in raw.split(ENTRY_DELIMITER) if e.strip()] + return extract_markdown_entries(raw) + + +def extract_markdown_entries(text: str) -> List[str]: + entries: List[str] = [] + headings: List[str] = [] + paragraph_lines: List[str] = [] + + def context_prefix() -> str: + filtered = [h for h in headings if h and not re.search(r"\b(MEMORY|USER|SOUL|AGENTS|TOOLS|IDENTITY)\.md\b", h, re.I)] + return " > ".join(filtered) + + def flush_paragraph() -> None: + nonlocal paragraph_lines + if not paragraph_lines: + return + text_block = " ".join(line.strip() for line in paragraph_lines).strip() + paragraph_lines = [] + if not text_block: + return + prefix = context_prefix() + if prefix: + entries.append(f"{prefix}: {text_block}") + else: + entries.append(text_block) + + in_code_block = False + for raw_line in text.splitlines(): + line = raw_line.rstrip() + stripped = line.strip() + + if stripped.startswith("```"): + in_code_block = not in_code_block + flush_paragraph() + continue + if in_code_block: + continue + + heading_match = re.match(r"^(#{1,6})\s+(.*\S)\s*$", stripped) + if heading_match: + flush_paragraph() + level = len(heading_match.group(1)) + text_value = heading_match.group(2).strip() + while len(headings) >= level: + headings.pop() + headings.append(text_value) + continue + + bullet_match = re.match(r"^\s*(?:[-*]|\d+\.)\s+(.*\S)\s*$", line) + if bullet_match: + flush_paragraph() + content = bullet_match.group(1).strip() + prefix = context_prefix() + entries.append(f"{prefix}: {content}" if prefix else content) + continue + + if not stripped: + flush_paragraph() + continue + + if stripped.startswith("|") and stripped.endswith("|"): + flush_paragraph() + continue + + paragraph_lines.append(stripped) + + flush_paragraph() + + deduped: List[str] = [] + seen = set() + for entry in entries: + normalized = normalize_text(entry) + if not normalized or normalized in seen: + continue + seen.add(normalized) + deduped.append(entry.strip()) + return deduped + + +def merge_entries( + existing: Sequence[str], + incoming: Sequence[str], + limit: int, +) -> Tuple[List[str], Dict[str, int], List[str]]: + merged = list(existing) + seen = {normalize_text(entry) for entry in existing if entry.strip()} + stats = {"existing": len(existing), "added": 0, "duplicates": 0, "overflowed": 0} + overflowed: List[str] = [] + + current_len = len(ENTRY_DELIMITER.join(merged)) if merged else 0 + + for entry in incoming: + normalized = normalize_text(entry) + if not normalized: + continue + if normalized in seen: + stats["duplicates"] += 1 + continue + + candidate_len = len(entry) if not merged else current_len + len(ENTRY_DELIMITER) + len(entry) + if candidate_len > limit: + stats["overflowed"] += 1 + overflowed.append(entry) + continue + + merged.append(entry) + seen.add(normalized) + current_len = candidate_len + stats["added"] += 1 + + return merged, stats, overflowed + + +def relative_label(path: Path, root: Path) -> str: + try: + return str(path.relative_to(root)) + except ValueError: + return str(path) + + +def write_report(output_dir: Path, report: Dict[str, Any]) -> None: + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / "report.json").write_text( + json.dumps(report, indent=2, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + + grouped: Dict[str, List[Dict[str, Any]]] = {} + for item in report["items"]: + grouped.setdefault(item["status"], []).append(item) + + lines = [ + "# OpenClaw -> Hermes Migration Report", + "", + f"- Timestamp: {report['timestamp']}", + f"- Mode: {report['mode']}", + f"- Source: `{report['source_root']}`", + f"- Target: `{report['target_root']}`", + "", + "## Summary", + "", + ] + + for key, value in report["summary"].items(): + lines.append(f"- {key}: {value}") + + lines.extend(["", "## What Was Not Fully Brought Over", ""]) + skipped = grouped.get("skipped", []) + grouped.get("conflict", []) + grouped.get("error", []) + if not skipped: + lines.append("- Nothing. All discovered items were either migrated or archived.") + else: + for item in skipped: + source = item["source"] or "(n/a)" + dest = item["destination"] or "(n/a)" + reason = item["reason"] or item["status"] + lines.append(f"- `{source}` -> `{dest}`: {reason}") + + (output_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8") + + +class Migrator: + def __init__( + self, + source_root: Path, + target_root: Path, + execute: bool, + workspace_target: Optional[Path], + overwrite: bool, + migrate_secrets: bool, + output_dir: Optional[Path], + ): + self.source_root = source_root + self.target_root = target_root + self.execute = execute + self.workspace_target = workspace_target + self.overwrite = overwrite + self.migrate_secrets = migrate_secrets + self.timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") + self.output_dir = output_dir or ( + target_root / "migration" / "openclaw" / self.timestamp if execute else None + ) + self.archive_dir = self.output_dir / "archive" if self.output_dir else None + self.backup_dir = self.output_dir / "backups" if self.output_dir else None + self.items: List[ItemResult] = [] + + config = load_yaml_file(self.target_root / "config.yaml") + mem_cfg = config.get("memory", {}) if isinstance(config.get("memory"), dict) else {} + self.memory_limit = int(mem_cfg.get("memory_char_limit", DEFAULT_MEMORY_CHAR_LIMIT)) + self.user_limit = int(mem_cfg.get("user_char_limit", DEFAULT_USER_CHAR_LIMIT)) + + def record( + self, + kind: str, + source: Optional[Path], + destination: Optional[Path], + status: str, + reason: str = "", + **details: Any, + ) -> None: + self.items.append( + ItemResult( + kind=kind, + source=str(source) if source else None, + destination=str(destination) if destination else None, + status=status, + reason=reason, + details=details, + ) + ) + + def source_candidate(self, *relative_paths: str) -> Optional[Path]: + for rel in relative_paths: + candidate = self.source_root / rel + if candidate.exists(): + return candidate + return None + + def migrate(self) -> Dict[str, Any]: + if not self.source_root.exists(): + self.record("source", self.source_root, None, "error", "OpenClaw directory does not exist") + return self.build_report() + + self.migrate_soul() + self.migrate_workspace_agents() + self.migrate_memory( + self.source_candidate("workspace/MEMORY.md", "workspace.default/MEMORY.md"), + self.target_root / "memories" / "MEMORY.md", + self.memory_limit, + kind="memory", + ) + self.migrate_memory( + self.source_candidate("workspace/USER.md", "workspace.default/USER.md"), + self.target_root / "memories" / "USER.md", + self.user_limit, + kind="user-profile", + ) + self.migrate_messaging_settings() + self.migrate_command_allowlist() + self.migrate_skills() + self.copy_tree_non_destructive( + self.source_candidate("workspace/tts"), + self.target_root / "tts", + kind="tts-assets", + ignore_dir_names={".venv", "generated", "__pycache__"}, + ) + self.archive_docs() + return self.build_report() + + def build_report(self) -> Dict[str, Any]: + summary: Dict[str, int] = { + "migrated": 0, + "archived": 0, + "skipped": 0, + "conflict": 0, + "error": 0, + } + for item in self.items: + summary[item.status] = summary.get(item.status, 0) + 1 + + report = { + "timestamp": self.timestamp, + "mode": "execute" if self.execute else "dry-run", + "source_root": str(self.source_root), + "target_root": str(self.target_root), + "workspace_target": str(self.workspace_target) if self.workspace_target else None, + "output_dir": str(self.output_dir) if self.output_dir else None, + "migrate_secrets": self.migrate_secrets, + "summary": summary, + "items": [asdict(item) for item in self.items], + } + + if self.output_dir: + write_report(self.output_dir, report) + + return report + + def maybe_backup(self, path: Path) -> Optional[Path]: + if not self.execute or not self.backup_dir or not path.exists(): + return None + return backup_existing(path, self.backup_dir) + + def copy_file(self, source: Path, destination: Path, kind: str) -> None: + if not source or not source.exists(): + return + + if destination.exists(): + if sha256_file(source) == sha256_file(destination): + self.record(kind, source, destination, "skipped", "Target already matches source") + return + if not self.overwrite: + self.record(kind, source, destination, "conflict", "Target exists and overwrite is disabled") + return + + if self.execute: + backup_path = self.maybe_backup(destination) + ensure_parent(destination) + shutil.copy2(source, destination) + self.record(kind, source, destination, "migrated", backup=str(backup_path) if backup_path else None) + else: + self.record(kind, source, destination, "migrated", "Would copy") + + def migrate_soul(self) -> None: + source = self.source_candidate("workspace/SOUL.md", "workspace.default/SOUL.md") + if not source: + self.record("soul", None, self.target_root / "SOUL.md", "skipped", "No OpenClaw SOUL.md found") + return + self.copy_file(source, self.target_root / "SOUL.md", kind="soul") + + def migrate_workspace_agents(self) -> None: + source = self.source_candidate("workspace/AGENTS.md", "workspace.default/AGENTS.md") + if not source: + return + if not self.workspace_target: + self.record("workspace-agents", source, None, "skipped", "No workspace target was provided") + return + destination = self.workspace_target / "AGENTS.md" + self.copy_file(source, destination, kind="workspace-agents") + + def migrate_memory(self, source: Optional[Path], destination: Path, limit: int, kind: str) -> None: + if not source or not source.exists(): + self.record(kind, None, destination, "skipped", "Source file not found") + return + + incoming = extract_markdown_entries(read_text(source)) + if not incoming: + self.record(kind, source, destination, "skipped", "No importable entries found") + return + + existing = parse_existing_memory_entries(destination) + merged, stats, overflowed = merge_entries(existing, incoming, limit) + details = { + "existing_entries": stats["existing"], + "added_entries": stats["added"], + "duplicate_entries": stats["duplicates"], + "overflowed_entries": stats["overflowed"], + "char_limit": limit, + "final_char_count": len(ENTRY_DELIMITER.join(merged)) if merged else 0, + } + + if self.execute: + if stats["added"] == 0 and not overflowed: + self.record(kind, source, destination, "skipped", "No new entries to import", **details) + return + backup_path = self.maybe_backup(destination) + ensure_parent(destination) + destination.write_text(ENTRY_DELIMITER.join(merged) + ("\n" if merged else ""), encoding="utf-8") + self.record( + kind, + source, + destination, + "migrated", + backup=str(backup_path) if backup_path else "", + overflow_preview=overflowed[:5], + **details, + ) + else: + self.record(kind, source, destination, "migrated", "Would merge entries", overflow_preview=overflowed[:5], **details) + + def migrate_command_allowlist(self) -> None: + source = self.source_root / "exec-approvals.json" + destination = self.target_root / "config.yaml" + if not source.exists(): + self.record("command-allowlist", None, destination, "skipped", "No OpenClaw exec approvals file found") + return + if yaml is None: + self.record("command-allowlist", source, destination, "error", "PyYAML is not available") + return + + try: + data = json.loads(source.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + self.record("command-allowlist", source, destination, "error", f"Invalid JSON: {exc}") + return + + patterns: List[str] = [] + agents = data.get("agents", {}) + if isinstance(agents, dict): + for agent_data in agents.values(): + allowlist = agent_data.get("allowlist", []) if isinstance(agent_data, dict) else [] + for entry in allowlist: + pattern = entry.get("pattern") if isinstance(entry, dict) else None + if pattern: + patterns.append(pattern) + + patterns = sorted(dict.fromkeys(patterns)) + if not patterns: + self.record("command-allowlist", source, destination, "skipped", "No allowlist patterns found") + return + if not destination.exists(): + self.record("command-allowlist", source, destination, "skipped", "Hermes config.yaml does not exist yet") + return + + config = load_yaml_file(destination) + current = config.get("command_allowlist", []) + if not isinstance(current, list): + current = [] + merged = sorted(dict.fromkeys(list(current) + patterns)) + added = [pattern for pattern in merged if pattern not in current] + if not added: + self.record("command-allowlist", source, destination, "skipped", "All patterns already present") + return + + if self.execute: + backup_path = self.maybe_backup(destination) + config["command_allowlist"] = merged + dump_yaml_file(destination, config) + self.record( + "command-allowlist", + source, + destination, + "migrated", + backup=str(backup_path) if backup_path else "", + added_patterns=added, + ) + else: + self.record("command-allowlist", source, destination, "migrated", "Would merge patterns", added_patterns=added) + + def load_openclaw_config(self) -> Dict[str, Any]: + config_path = self.source_root / "openclaw.json" + if not config_path.exists(): + return {} + try: + data = json.loads(config_path.read_text(encoding="utf-8")) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + return {} + + def merge_env_values(self, additions: Dict[str, str], kind: str, source: Path) -> None: + destination = self.target_root / ".env" + env_data = parse_env_file(destination) + added: Dict[str, str] = {} + conflicts: List[str] = [] + + for key, value in additions.items(): + current = env_data.get(key) + if current == value: + continue + if current and not self.overwrite: + conflicts.append(key) + continue + env_data[key] = value + added[key] = value + + if conflicts and not added: + self.record(kind, source, destination, "conflict", "Destination .env already has different values", conflicting_keys=conflicts) + return + if not conflicts and not added: + self.record(kind, source, destination, "skipped", "All env values already present") + return + + if self.execute: + backup_path = self.maybe_backup(destination) + save_env_file(destination, env_data) + self.record( + kind, + source, + destination, + "migrated", + backup=str(backup_path) if backup_path else "", + added_keys=sorted(added.keys()), + conflicting_keys=conflicts, + ) + else: + self.record( + kind, + source, + destination, + "migrated", + "Would merge env values", + added_keys=sorted(added.keys()), + conflicting_keys=conflicts, + ) + + def migrate_messaging_settings(self) -> None: + config = self.load_openclaw_config() + additions: Dict[str, str] = {} + sources: List[str] = [] + + workspace = ( + config.get("agents", {}) + .get("defaults", {}) + .get("workspace") + ) + if isinstance(workspace, str) and workspace.strip(): + additions["MESSAGING_CWD"] = workspace.strip() + sources.append("openclaw.json:agents.defaults.workspace") + + allowlist_path = self.source_root / "credentials" / "telegram-default-allowFrom.json" + if allowlist_path.exists(): + try: + allow_data = json.loads(allowlist_path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + self.record("messaging-settings", allowlist_path, self.target_root / ".env", "error", "Invalid JSON in Telegram allowlist file") + else: + allow_from = allow_data.get("allowFrom", []) + if isinstance(allow_from, list): + users = [str(user).strip() for user in allow_from if str(user).strip()] + if users: + additions["TELEGRAM_ALLOWED_USERS"] = ",".join(users) + sources.append("credentials/telegram-default-allowFrom.json") + + if additions: + self.merge_env_values(additions, "messaging-settings", self.source_root / "openclaw.json") + else: + self.record("messaging-settings", self.source_root / "openclaw.json", self.target_root / ".env", "skipped", "No Hermes-compatible messaging settings found") + + if self.migrate_secrets: + self.migrate_secret_settings(config) + else: + config_path = self.source_root / "openclaw.json" + if config_path.exists(): + self.record( + "secret-settings", + config_path, + self.target_root / ".env", + "skipped", + "Secret migration disabled. Re-run with --migrate-secrets to import allowlisted secrets.", + supported_targets=sorted(SUPPORTED_SECRET_TARGETS), + ) + + def migrate_secret_settings(self, config: Dict[str, Any]) -> None: + secret_additions: Dict[str, str] = {} + sources: List[str] = [] + + telegram_token = ( + config.get("channels", {}) + .get("telegram", {}) + .get("botToken") + ) + if isinstance(telegram_token, str) and telegram_token.strip(): + secret_additions["TELEGRAM_BOT_TOKEN"] = telegram_token.strip() + sources.append("openclaw.json:channels.telegram.botToken") + + if secret_additions: + self.merge_env_values(secret_additions, "secret-settings", self.source_root / "openclaw.json") + else: + self.record( + "secret-settings", + self.source_root / "openclaw.json", + self.target_root / ".env", + "skipped", + "No allowlisted Hermes-compatible secrets found", + supported_targets=sorted(SUPPORTED_SECRET_TARGETS), + ) + + def migrate_skills(self) -> None: + source_root = self.source_candidate("workspace/skills") + destination_root = self.target_root / "skills" / SKILL_CATEGORY_DIRNAME + if not source_root or not source_root.exists(): + self.record("skills", None, destination_root, "skipped", "No OpenClaw skills directory found") + return + + skill_dirs = [p for p in sorted(source_root.iterdir()) if p.is_dir() and (p / "SKILL.md").exists()] + if not skill_dirs: + self.record("skills", source_root, destination_root, "skipped", "No skills with SKILL.md found") + return + + for skill_dir in skill_dirs: + destination = destination_root / skill_dir.name + if destination.exists() and not self.overwrite: + self.record("skill", skill_dir, destination, "conflict", "Destination skill already exists") + continue + if self.execute: + backup_path = self.maybe_backup(destination) + destination.parent.mkdir(parents=True, exist_ok=True) + if destination.exists(): + shutil.rmtree(destination) + shutil.copytree(skill_dir, destination) + self.record("skill", skill_dir, destination, "migrated", backup=str(backup_path) if backup_path else "") + else: + self.record("skill", skill_dir, destination, "migrated", "Would copy skill directory") + + desc_path = destination_root / "DESCRIPTION.md" + if self.execute: + desc_path.parent.mkdir(parents=True, exist_ok=True) + if not desc_path.exists(): + desc_path.write_text(SKILL_CATEGORY_DESCRIPTION + "\n", encoding="utf-8") + elif not desc_path.exists(): + self.record("skill-category", None, desc_path, "migrated", "Would create category description") + + def copy_tree_non_destructive( + self, + source_root: Optional[Path], + destination_root: Path, + kind: str, + ignore_dir_names: Optional[set[str]] = None, + ) -> None: + if not source_root or not source_root.exists(): + self.record(kind, None, destination_root, "skipped", "Source directory not found") + return + + ignore_dir_names = ignore_dir_names or set() + files = [ + p + for p in source_root.rglob("*") + if p.is_file() and not any(part in ignore_dir_names for part in p.relative_to(source_root).parts[:-1]) + ] + if not files: + self.record(kind, source_root, destination_root, "skipped", "No files found") + return + + copied = 0 + skipped = 0 + conflicts = 0 + + for source in files: + rel = source.relative_to(source_root) + destination = destination_root / rel + if destination.exists(): + if sha256_file(source) == sha256_file(destination): + skipped += 1 + continue + if not self.overwrite: + conflicts += 1 + self.record(kind, source, destination, "conflict", "Destination file already exists") + continue + + if self.execute: + self.maybe_backup(destination) + ensure_parent(destination) + shutil.copy2(source, destination) + copied += 1 + + status = "migrated" if copied else "skipped" + reason = "" + if not copied and conflicts: + status = "conflict" + reason = "All candidate files conflicted with existing destination files" + elif not copied: + reason = "No new files to copy" + + self.record(kind, source_root, destination_root, status, reason, copied_files=copied, unchanged_files=skipped, conflicts=conflicts) + + def archive_docs(self) -> None: + candidates = [ + self.source_candidate("workspace/IDENTITY.md", "workspace.default/IDENTITY.md"), + self.source_candidate("workspace/TOOLS.md", "workspace.default/TOOLS.md"), + self.source_candidate("workspace/HEARTBEAT.md", "workspace.default/HEARTBEAT.md"), + ] + for candidate in candidates: + if candidate: + self.archive_path(candidate, reason="No direct Hermes destination; archived for manual review") + + for rel in ("workspace/.learnings", "workspace/memory"): + candidate = self.source_root / rel + if candidate.exists(): + self.archive_path(candidate, reason="No direct Hermes destination; archived for manual review") + + partially_extracted = [ + ("openclaw.json", "Selected Hermes-compatible values were extracted; raw OpenClaw config was not copied."), + ("credentials/telegram-default-allowFrom.json", "Selected Hermes-compatible values were extracted; raw credentials file was not copied."), + ] + for rel, reason in partially_extracted: + candidate = self.source_root / rel + if candidate.exists(): + self.record("raw-config-skip", candidate, None, "skipped", reason) + + skipped_sensitive = [ + "memory/main.sqlite", + "credentials", + "devices", + "identity", + "workspace.zip", + ] + for rel in skipped_sensitive: + candidate = self.source_root / rel + if candidate.exists(): + self.record("sensitive-skip", candidate, None, "skipped", "Contains secrets, binary state, or product-specific runtime data") + + def archive_path(self, source: Path, reason: str) -> None: + destination = self.archive_dir / relative_label(source, self.source_root) if self.archive_dir else None + if self.execute and destination is not None: + ensure_parent(destination) + if source.is_dir(): + shutil.copytree(source, destination, dirs_exist_ok=True) + else: + shutil.copy2(source, destination) + self.record("archive", source, destination, "archived", reason) + else: + self.record("archive", source, destination, "archived", reason) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Migrate OpenClaw user state into Hermes Agent.") + parser.add_argument("--source", default=str(Path.home() / ".openclaw"), help="OpenClaw home directory") + parser.add_argument("--target", default=str(Path.home() / ".hermes"), help="Hermes home directory") + parser.add_argument("--workspace-target", help="Optional workspace root where AGENTS.md should be copied") + parser.add_argument("--execute", action="store_true", help="Apply changes instead of reporting a dry run") + parser.add_argument("--overwrite", action="store_true", help="Overwrite existing Hermes targets after backing them up") + parser.add_argument("--migrate-secrets", action="store_true", help="Import a narrow allowlist of Hermes-compatible secrets into ~/.hermes/.env") + parser.add_argument("--output-dir", help="Where to write report, backups, and archived docs") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + migrator = Migrator( + source_root=Path(os.path.expanduser(args.source)).resolve(), + target_root=Path(os.path.expanduser(args.target)).resolve(), + execute=bool(args.execute), + workspace_target=Path(os.path.expanduser(args.workspace_target)).resolve() if args.workspace_target else None, + overwrite=bool(args.overwrite), + migrate_secrets=bool(args.migrate_secrets), + output_dir=Path(os.path.expanduser(args.output_dir)).resolve() if args.output_dir else None, + ) + report = migrator.migrate() + print(json.dumps(report, indent=2, ensure_ascii=False)) + return 0 if report["summary"].get("error", 0) == 0 else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/skills/test_openclaw_migration.py b/tests/skills/test_openclaw_migration.py new file mode 100644 index 00000000..eb513afb --- /dev/null +++ b/tests/skills/test_openclaw_migration.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import importlib.util +import json +import sys +from pathlib import Path + + +SCRIPT_PATH = ( + Path(__file__).resolve().parents[2] + / "optional-skills" + / "migration" + / "openclaw-migration" + / "scripts" + / "openclaw_to_hermes.py" +) + + +def load_module(): + spec = importlib.util.spec_from_file_location("openclaw_to_hermes", SCRIPT_PATH) + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def test_extract_markdown_entries_promotes_heading_context(): + mod = load_module() + text = """# MEMORY.md - Long-Term Memory + +## Tyler Williams + +- Founder of VANTA Research +- Timezone: America/Los_Angeles + +### Active Projects + +- Hermes Agent +""" + entries = mod.extract_markdown_entries(text) + assert "Tyler Williams: Founder of VANTA Research" in entries + assert "Tyler Williams: Timezone: America/Los_Angeles" in entries + assert "Tyler Williams > Active Projects: Hermes Agent" in entries + + +def test_merge_entries_respects_limit_and_reports_overflow(): + mod = load_module() + existing = ["alpha"] + incoming = ["beta", "gamma is too long"] + merged, stats, overflowed = mod.merge_entries(existing, incoming, limit=12) + assert merged == ["alpha", "beta"] + assert stats["added"] == 1 + assert stats["overflowed"] == 1 + assert overflowed == ["gamma is too long"] + + +def test_migrator_copies_skill_and_merges_allowlist(tmp_path: Path): + mod = load_module() + source = tmp_path / ".openclaw" + target = tmp_path / ".hermes" + + (source / "workspace" / "skills" / "demo-skill").mkdir(parents=True) + (source / "workspace" / "skills" / "demo-skill" / "SKILL.md").write_text( + "---\nname: demo-skill\ndescription: demo\n---\n\nbody\n", + encoding="utf-8", + ) + (source / "exec-approvals.json").write_text( + json.dumps( + { + "agents": { + "*": { + "allowlist": [ + {"pattern": "/usr/bin/*"}, + {"pattern": "/home/test/**"}, + ] + } + } + } + ), + encoding="utf-8", + ) + (target / "config.yaml").write_text("command_allowlist:\n - /usr/bin/*\n", encoding="utf-8") + + migrator = mod.Migrator( + source_root=source, + target_root=target, + execute=True, + workspace_target=None, + overwrite=False, + migrate_secrets=False, + output_dir=target / "migration-report", + ) + report = migrator.migrate() + + imported_skill = target / "skills" / mod.SKILL_CATEGORY_DIRNAME / "demo-skill" / "SKILL.md" + assert imported_skill.exists() + assert "/home/test/**" in (target / "config.yaml").read_text(encoding="utf-8") + assert report["summary"]["migrated"] >= 2 + + +def test_migrator_optionally_imports_supported_secrets_and_messaging_settings(tmp_path: Path): + mod = load_module() + source = tmp_path / ".openclaw" + target = tmp_path / ".hermes" + + (source / "credentials").mkdir(parents=True) + (source / "openclaw.json").write_text( + json.dumps( + { + "agents": {"defaults": {"workspace": "/tmp/openclaw-workspace"}}, + "channels": {"telegram": {"botToken": "123:abc"}}, + } + ), + encoding="utf-8", + ) + (source / "credentials" / "telegram-default-allowFrom.json").write_text( + json.dumps({"allowFrom": ["111", "222"]}), + encoding="utf-8", + ) + target.mkdir() + + migrator = mod.Migrator( + source_root=source, + target_root=target, + execute=True, + workspace_target=None, + overwrite=False, + migrate_secrets=True, + output_dir=target / "migration-report", + ) + migrator.migrate() + + env_text = (target / ".env").read_text(encoding="utf-8") + assert "MESSAGING_CWD=/tmp/openclaw-workspace" in env_text + assert "TELEGRAM_ALLOWED_USERS=111,222" in env_text + assert "TELEGRAM_BOT_TOKEN=123:abc" in env_text