Design agent migration skill for Hermes Agent from OpenClaw | Run

successful dry tests with reports
This commit is contained in:
unmodeled-tyler
2026-03-06 15:12:45 -08:00
parent 68fbae5692
commit 1755a9e38a
4 changed files with 1060 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
Optional migration workflows for importing user state and customizations from
other agent systems into Hermes Agent.

View File

@@ -0,0 +1,83 @@
---
name: openclaw-migration
description: Migrate a user's OpenClaw customization footprint into Hermes Agent. Imports Hermes-compatible memories, SOUL.md, command allowlists, user skills, and selected workspace assets from ~/.openclaw, then reports exactly what could not be migrated and why.
version: 1.0.0
author: Hermes Agent (Nous Research)
license: MIT
metadata:
hermes:
tags: [Migration, OpenClaw, Hermes, Memory, Persona, Import]
related_skills: [hermes-agent]
---
# OpenClaw -> Hermes Migration
Use this skill when a user wants to move their OpenClaw setup into Hermes Agent with minimal manual cleanup.
## What this skill does
It uses `scripts/openclaw_to_hermes.py` to:
- import `SOUL.md` into `~/.hermes/SOUL.md`
- transform OpenClaw `MEMORY.md` and `USER.md` into Hermes memory entries
- merge OpenClaw command approval patterns into Hermes `command_allowlist`
- migrate Hermes-compatible messaging settings such as `TELEGRAM_ALLOWED_USERS` and `MESSAGING_CWD`
- copy OpenClaw skills into `~/.hermes/skills/openclaw-imports/`
- optionally copy the OpenClaw workspace `AGENTS.md` into a chosen Hermes workspace
- mirror compatible workspace assets such as `workspace/tts/` into `~/.hermes/tts/`
- archive non-secret docs that do not have a direct Hermes destination
- produce a structured report listing migrated items, conflicts, skipped items, and reasons
With `--migrate-secrets`, it will also import a small allowlisted set of Hermes-compatible secrets, currently:
- `TELEGRAM_BOT_TOKEN`
## Default workflow
1. Inspect first with a dry run.
2. Ask for a target workspace path if `AGENTS.md` should be brought over.
3. Execute the migration.
4. Summarize the results, especially:
- what was migrated
- what was archived for manual review
- what was skipped and why
## Commands
Dry run:
```bash
python3 SKILL_DIR/scripts/openclaw_to_hermes.py --workspace-target "$PWD"
```
Execute:
```bash
python3 SKILL_DIR/scripts/openclaw_to_hermes.py --execute --workspace-target "$PWD"
```
Execute with Hermes-compatible secret migration enabled:
```bash
python3 SKILL_DIR/scripts/openclaw_to_hermes.py --execute --migrate-secrets --workspace-target "$PWD"
```
If the user does not want to import workspace instructions into the current directory, omit `--workspace-target`.
## Important rules
1. Run a dry run before writing unless the user explicitly says to proceed immediately.
2. Do not migrate secrets by default. Tokens, auth blobs, device credentials, and raw gateway config should stay out of Hermes unless the user explicitly asks for secret migration.
3. Do not silently overwrite non-empty Hermes targets unless the user explicitly wants that. The helper script will preserve backups when overwriting is enabled.
4. Always give the user the skipped-items report. That report is part of the migration, not an optional extra.
5. Prefer the primary OpenClaw workspace (`~/.openclaw/workspace/`) over `workspace.default/`. Only use the default workspace as fallback when the primary files are missing.
6. Even in secret-migration mode, only migrate secrets with a clean Hermes destination. Unsupported auth blobs must still be reported as skipped.
## Expected result
After a successful run, the user should have:
- Hermes persona state imported
- Hermes memory files populated with converted OpenClaw knowledge
- OpenClaw skills available under `~/.hermes/skills/openclaw-imports/`
- a migration report showing any conflicts, omissions, or unsupported data

View File

@@ -0,0 +1,838 @@
#!/usr/bin/env python3
"""OpenClaw -> Hermes migration helper.
This script migrates the parts of an OpenClaw user footprint that map cleanly
into Hermes Agent, archives selected unmapped docs for manual review, and
reports exactly what was skipped and why.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import shutil
from dataclasses import asdict, dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
try:
import yaml
except Exception: # pragma: no cover - handled at runtime
yaml = None
ENTRY_DELIMITER = "\n§\n"
DEFAULT_MEMORY_CHAR_LIMIT = 2200
DEFAULT_USER_CHAR_LIMIT = 1375
SKILL_CATEGORY_DIRNAME = "openclaw-imports"
SKILL_CATEGORY_DESCRIPTION = (
"Skills migrated from an OpenClaw workspace."
)
SUPPORTED_SECRET_TARGETS = {
"TELEGRAM_BOT_TOKEN",
}
@dataclass
class ItemResult:
kind: str
source: Optional[str]
destination: Optional[str]
status: str
reason: str = ""
details: Dict[str, Any] = field(default_factory=dict)
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8")
def normalize_text(text: str) -> str:
return re.sub(r"\s+", " ", text.strip())
def ensure_parent(path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
def load_yaml_file(path: Path) -> Dict[str, Any]:
if yaml is None or not path.exists():
return {}
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def dump_yaml_file(path: Path, data: Dict[str, Any]) -> None:
if yaml is None:
raise RuntimeError("PyYAML is required to update Hermes config.yaml")
ensure_parent(path)
path.write_text(
yaml.safe_dump(data, sort_keys=False, allow_unicode=False),
encoding="utf-8",
)
def parse_env_file(path: Path) -> Dict[str, str]:
if not path.exists():
return {}
data: Dict[str, str] = {}
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
data[key.strip()] = value.strip()
return data
def save_env_file(path: Path, data: Dict[str, str]) -> None:
ensure_parent(path)
lines = [f"{key}={value}" for key, value in data.items()]
path.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8")
def backup_existing(path: Path, backup_root: Path) -> Optional[Path]:
if not path.exists():
return None
rel = Path(*path.parts[1:]) if path.is_absolute() and len(path.parts) > 1 else path
dest = backup_root / rel
ensure_parent(dest)
if path.is_dir():
shutil.copytree(path, dest, dirs_exist_ok=True)
else:
shutil.copy2(path, dest)
return dest
def parse_existing_memory_entries(path: Path) -> List[str]:
if not path.exists():
return []
raw = read_text(path)
if not raw.strip():
return []
if ENTRY_DELIMITER in raw:
return [e.strip() for e in raw.split(ENTRY_DELIMITER) if e.strip()]
return extract_markdown_entries(raw)
def extract_markdown_entries(text: str) -> List[str]:
entries: List[str] = []
headings: List[str] = []
paragraph_lines: List[str] = []
def context_prefix() -> str:
filtered = [h for h in headings if h and not re.search(r"\b(MEMORY|USER|SOUL|AGENTS|TOOLS|IDENTITY)\.md\b", h, re.I)]
return " > ".join(filtered)
def flush_paragraph() -> None:
nonlocal paragraph_lines
if not paragraph_lines:
return
text_block = " ".join(line.strip() for line in paragraph_lines).strip()
paragraph_lines = []
if not text_block:
return
prefix = context_prefix()
if prefix:
entries.append(f"{prefix}: {text_block}")
else:
entries.append(text_block)
in_code_block = False
for raw_line in text.splitlines():
line = raw_line.rstrip()
stripped = line.strip()
if stripped.startswith("```"):
in_code_block = not in_code_block
flush_paragraph()
continue
if in_code_block:
continue
heading_match = re.match(r"^(#{1,6})\s+(.*\S)\s*$", stripped)
if heading_match:
flush_paragraph()
level = len(heading_match.group(1))
text_value = heading_match.group(2).strip()
while len(headings) >= level:
headings.pop()
headings.append(text_value)
continue
bullet_match = re.match(r"^\s*(?:[-*]|\d+\.)\s+(.*\S)\s*$", line)
if bullet_match:
flush_paragraph()
content = bullet_match.group(1).strip()
prefix = context_prefix()
entries.append(f"{prefix}: {content}" if prefix else content)
continue
if not stripped:
flush_paragraph()
continue
if stripped.startswith("|") and stripped.endswith("|"):
flush_paragraph()
continue
paragraph_lines.append(stripped)
flush_paragraph()
deduped: List[str] = []
seen = set()
for entry in entries:
normalized = normalize_text(entry)
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduped.append(entry.strip())
return deduped
def merge_entries(
existing: Sequence[str],
incoming: Sequence[str],
limit: int,
) -> Tuple[List[str], Dict[str, int], List[str]]:
merged = list(existing)
seen = {normalize_text(entry) for entry in existing if entry.strip()}
stats = {"existing": len(existing), "added": 0, "duplicates": 0, "overflowed": 0}
overflowed: List[str] = []
current_len = len(ENTRY_DELIMITER.join(merged)) if merged else 0
for entry in incoming:
normalized = normalize_text(entry)
if not normalized:
continue
if normalized in seen:
stats["duplicates"] += 1
continue
candidate_len = len(entry) if not merged else current_len + len(ENTRY_DELIMITER) + len(entry)
if candidate_len > limit:
stats["overflowed"] += 1
overflowed.append(entry)
continue
merged.append(entry)
seen.add(normalized)
current_len = candidate_len
stats["added"] += 1
return merged, stats, overflowed
def relative_label(path: Path, root: Path) -> str:
try:
return str(path.relative_to(root))
except ValueError:
return str(path)
def write_report(output_dir: Path, report: Dict[str, Any]) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "report.json").write_text(
json.dumps(report, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
grouped: Dict[str, List[Dict[str, Any]]] = {}
for item in report["items"]:
grouped.setdefault(item["status"], []).append(item)
lines = [
"# OpenClaw -> Hermes Migration Report",
"",
f"- Timestamp: {report['timestamp']}",
f"- Mode: {report['mode']}",
f"- Source: `{report['source_root']}`",
f"- Target: `{report['target_root']}`",
"",
"## Summary",
"",
]
for key, value in report["summary"].items():
lines.append(f"- {key}: {value}")
lines.extend(["", "## What Was Not Fully Brought Over", ""])
skipped = grouped.get("skipped", []) + grouped.get("conflict", []) + grouped.get("error", [])
if not skipped:
lines.append("- Nothing. All discovered items were either migrated or archived.")
else:
for item in skipped:
source = item["source"] or "(n/a)"
dest = item["destination"] or "(n/a)"
reason = item["reason"] or item["status"]
lines.append(f"- `{source}` -> `{dest}`: {reason}")
(output_dir / "summary.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
class Migrator:
def __init__(
self,
source_root: Path,
target_root: Path,
execute: bool,
workspace_target: Optional[Path],
overwrite: bool,
migrate_secrets: bool,
output_dir: Optional[Path],
):
self.source_root = source_root
self.target_root = target_root
self.execute = execute
self.workspace_target = workspace_target
self.overwrite = overwrite
self.migrate_secrets = migrate_secrets
self.timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
self.output_dir = output_dir or (
target_root / "migration" / "openclaw" / self.timestamp if execute else None
)
self.archive_dir = self.output_dir / "archive" if self.output_dir else None
self.backup_dir = self.output_dir / "backups" if self.output_dir else None
self.items: List[ItemResult] = []
config = load_yaml_file(self.target_root / "config.yaml")
mem_cfg = config.get("memory", {}) if isinstance(config.get("memory"), dict) else {}
self.memory_limit = int(mem_cfg.get("memory_char_limit", DEFAULT_MEMORY_CHAR_LIMIT))
self.user_limit = int(mem_cfg.get("user_char_limit", DEFAULT_USER_CHAR_LIMIT))
def record(
self,
kind: str,
source: Optional[Path],
destination: Optional[Path],
status: str,
reason: str = "",
**details: Any,
) -> None:
self.items.append(
ItemResult(
kind=kind,
source=str(source) if source else None,
destination=str(destination) if destination else None,
status=status,
reason=reason,
details=details,
)
)
def source_candidate(self, *relative_paths: str) -> Optional[Path]:
for rel in relative_paths:
candidate = self.source_root / rel
if candidate.exists():
return candidate
return None
def migrate(self) -> Dict[str, Any]:
if not self.source_root.exists():
self.record("source", self.source_root, None, "error", "OpenClaw directory does not exist")
return self.build_report()
self.migrate_soul()
self.migrate_workspace_agents()
self.migrate_memory(
self.source_candidate("workspace/MEMORY.md", "workspace.default/MEMORY.md"),
self.target_root / "memories" / "MEMORY.md",
self.memory_limit,
kind="memory",
)
self.migrate_memory(
self.source_candidate("workspace/USER.md", "workspace.default/USER.md"),
self.target_root / "memories" / "USER.md",
self.user_limit,
kind="user-profile",
)
self.migrate_messaging_settings()
self.migrate_command_allowlist()
self.migrate_skills()
self.copy_tree_non_destructive(
self.source_candidate("workspace/tts"),
self.target_root / "tts",
kind="tts-assets",
ignore_dir_names={".venv", "generated", "__pycache__"},
)
self.archive_docs()
return self.build_report()
def build_report(self) -> Dict[str, Any]:
summary: Dict[str, int] = {
"migrated": 0,
"archived": 0,
"skipped": 0,
"conflict": 0,
"error": 0,
}
for item in self.items:
summary[item.status] = summary.get(item.status, 0) + 1
report = {
"timestamp": self.timestamp,
"mode": "execute" if self.execute else "dry-run",
"source_root": str(self.source_root),
"target_root": str(self.target_root),
"workspace_target": str(self.workspace_target) if self.workspace_target else None,
"output_dir": str(self.output_dir) if self.output_dir else None,
"migrate_secrets": self.migrate_secrets,
"summary": summary,
"items": [asdict(item) for item in self.items],
}
if self.output_dir:
write_report(self.output_dir, report)
return report
def maybe_backup(self, path: Path) -> Optional[Path]:
if not self.execute or not self.backup_dir or not path.exists():
return None
return backup_existing(path, self.backup_dir)
def copy_file(self, source: Path, destination: Path, kind: str) -> None:
if not source or not source.exists():
return
if destination.exists():
if sha256_file(source) == sha256_file(destination):
self.record(kind, source, destination, "skipped", "Target already matches source")
return
if not self.overwrite:
self.record(kind, source, destination, "conflict", "Target exists and overwrite is disabled")
return
if self.execute:
backup_path = self.maybe_backup(destination)
ensure_parent(destination)
shutil.copy2(source, destination)
self.record(kind, source, destination, "migrated", backup=str(backup_path) if backup_path else None)
else:
self.record(kind, source, destination, "migrated", "Would copy")
def migrate_soul(self) -> None:
source = self.source_candidate("workspace/SOUL.md", "workspace.default/SOUL.md")
if not source:
self.record("soul", None, self.target_root / "SOUL.md", "skipped", "No OpenClaw SOUL.md found")
return
self.copy_file(source, self.target_root / "SOUL.md", kind="soul")
def migrate_workspace_agents(self) -> None:
source = self.source_candidate("workspace/AGENTS.md", "workspace.default/AGENTS.md")
if not source:
return
if not self.workspace_target:
self.record("workspace-agents", source, None, "skipped", "No workspace target was provided")
return
destination = self.workspace_target / "AGENTS.md"
self.copy_file(source, destination, kind="workspace-agents")
def migrate_memory(self, source: Optional[Path], destination: Path, limit: int, kind: str) -> None:
if not source or not source.exists():
self.record(kind, None, destination, "skipped", "Source file not found")
return
incoming = extract_markdown_entries(read_text(source))
if not incoming:
self.record(kind, source, destination, "skipped", "No importable entries found")
return
existing = parse_existing_memory_entries(destination)
merged, stats, overflowed = merge_entries(existing, incoming, limit)
details = {
"existing_entries": stats["existing"],
"added_entries": stats["added"],
"duplicate_entries": stats["duplicates"],
"overflowed_entries": stats["overflowed"],
"char_limit": limit,
"final_char_count": len(ENTRY_DELIMITER.join(merged)) if merged else 0,
}
if self.execute:
if stats["added"] == 0 and not overflowed:
self.record(kind, source, destination, "skipped", "No new entries to import", **details)
return
backup_path = self.maybe_backup(destination)
ensure_parent(destination)
destination.write_text(ENTRY_DELIMITER.join(merged) + ("\n" if merged else ""), encoding="utf-8")
self.record(
kind,
source,
destination,
"migrated",
backup=str(backup_path) if backup_path else "",
overflow_preview=overflowed[:5],
**details,
)
else:
self.record(kind, source, destination, "migrated", "Would merge entries", overflow_preview=overflowed[:5], **details)
def migrate_command_allowlist(self) -> None:
source = self.source_root / "exec-approvals.json"
destination = self.target_root / "config.yaml"
if not source.exists():
self.record("command-allowlist", None, destination, "skipped", "No OpenClaw exec approvals file found")
return
if yaml is None:
self.record("command-allowlist", source, destination, "error", "PyYAML is not available")
return
try:
data = json.loads(source.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
self.record("command-allowlist", source, destination, "error", f"Invalid JSON: {exc}")
return
patterns: List[str] = []
agents = data.get("agents", {})
if isinstance(agents, dict):
for agent_data in agents.values():
allowlist = agent_data.get("allowlist", []) if isinstance(agent_data, dict) else []
for entry in allowlist:
pattern = entry.get("pattern") if isinstance(entry, dict) else None
if pattern:
patterns.append(pattern)
patterns = sorted(dict.fromkeys(patterns))
if not patterns:
self.record("command-allowlist", source, destination, "skipped", "No allowlist patterns found")
return
if not destination.exists():
self.record("command-allowlist", source, destination, "skipped", "Hermes config.yaml does not exist yet")
return
config = load_yaml_file(destination)
current = config.get("command_allowlist", [])
if not isinstance(current, list):
current = []
merged = sorted(dict.fromkeys(list(current) + patterns))
added = [pattern for pattern in merged if pattern not in current]
if not added:
self.record("command-allowlist", source, destination, "skipped", "All patterns already present")
return
if self.execute:
backup_path = self.maybe_backup(destination)
config["command_allowlist"] = merged
dump_yaml_file(destination, config)
self.record(
"command-allowlist",
source,
destination,
"migrated",
backup=str(backup_path) if backup_path else "",
added_patterns=added,
)
else:
self.record("command-allowlist", source, destination, "migrated", "Would merge patterns", added_patterns=added)
def load_openclaw_config(self) -> Dict[str, Any]:
config_path = self.source_root / "openclaw.json"
if not config_path.exists():
return {}
try:
data = json.loads(config_path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except json.JSONDecodeError:
return {}
def merge_env_values(self, additions: Dict[str, str], kind: str, source: Path) -> None:
destination = self.target_root / ".env"
env_data = parse_env_file(destination)
added: Dict[str, str] = {}
conflicts: List[str] = []
for key, value in additions.items():
current = env_data.get(key)
if current == value:
continue
if current and not self.overwrite:
conflicts.append(key)
continue
env_data[key] = value
added[key] = value
if conflicts and not added:
self.record(kind, source, destination, "conflict", "Destination .env already has different values", conflicting_keys=conflicts)
return
if not conflicts and not added:
self.record(kind, source, destination, "skipped", "All env values already present")
return
if self.execute:
backup_path = self.maybe_backup(destination)
save_env_file(destination, env_data)
self.record(
kind,
source,
destination,
"migrated",
backup=str(backup_path) if backup_path else "",
added_keys=sorted(added.keys()),
conflicting_keys=conflicts,
)
else:
self.record(
kind,
source,
destination,
"migrated",
"Would merge env values",
added_keys=sorted(added.keys()),
conflicting_keys=conflicts,
)
def migrate_messaging_settings(self) -> None:
config = self.load_openclaw_config()
additions: Dict[str, str] = {}
sources: List[str] = []
workspace = (
config.get("agents", {})
.get("defaults", {})
.get("workspace")
)
if isinstance(workspace, str) and workspace.strip():
additions["MESSAGING_CWD"] = workspace.strip()
sources.append("openclaw.json:agents.defaults.workspace")
allowlist_path = self.source_root / "credentials" / "telegram-default-allowFrom.json"
if allowlist_path.exists():
try:
allow_data = json.loads(allowlist_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
self.record("messaging-settings", allowlist_path, self.target_root / ".env", "error", "Invalid JSON in Telegram allowlist file")
else:
allow_from = allow_data.get("allowFrom", [])
if isinstance(allow_from, list):
users = [str(user).strip() for user in allow_from if str(user).strip()]
if users:
additions["TELEGRAM_ALLOWED_USERS"] = ",".join(users)
sources.append("credentials/telegram-default-allowFrom.json")
if additions:
self.merge_env_values(additions, "messaging-settings", self.source_root / "openclaw.json")
else:
self.record("messaging-settings", self.source_root / "openclaw.json", self.target_root / ".env", "skipped", "No Hermes-compatible messaging settings found")
if self.migrate_secrets:
self.migrate_secret_settings(config)
else:
config_path = self.source_root / "openclaw.json"
if config_path.exists():
self.record(
"secret-settings",
config_path,
self.target_root / ".env",
"skipped",
"Secret migration disabled. Re-run with --migrate-secrets to import allowlisted secrets.",
supported_targets=sorted(SUPPORTED_SECRET_TARGETS),
)
def migrate_secret_settings(self, config: Dict[str, Any]) -> None:
secret_additions: Dict[str, str] = {}
sources: List[str] = []
telegram_token = (
config.get("channels", {})
.get("telegram", {})
.get("botToken")
)
if isinstance(telegram_token, str) and telegram_token.strip():
secret_additions["TELEGRAM_BOT_TOKEN"] = telegram_token.strip()
sources.append("openclaw.json:channels.telegram.botToken")
if secret_additions:
self.merge_env_values(secret_additions, "secret-settings", self.source_root / "openclaw.json")
else:
self.record(
"secret-settings",
self.source_root / "openclaw.json",
self.target_root / ".env",
"skipped",
"No allowlisted Hermes-compatible secrets found",
supported_targets=sorted(SUPPORTED_SECRET_TARGETS),
)
def migrate_skills(self) -> None:
source_root = self.source_candidate("workspace/skills")
destination_root = self.target_root / "skills" / SKILL_CATEGORY_DIRNAME
if not source_root or not source_root.exists():
self.record("skills", None, destination_root, "skipped", "No OpenClaw skills directory found")
return
skill_dirs = [p for p in sorted(source_root.iterdir()) if p.is_dir() and (p / "SKILL.md").exists()]
if not skill_dirs:
self.record("skills", source_root, destination_root, "skipped", "No skills with SKILL.md found")
return
for skill_dir in skill_dirs:
destination = destination_root / skill_dir.name
if destination.exists() and not self.overwrite:
self.record("skill", skill_dir, destination, "conflict", "Destination skill already exists")
continue
if self.execute:
backup_path = self.maybe_backup(destination)
destination.parent.mkdir(parents=True, exist_ok=True)
if destination.exists():
shutil.rmtree(destination)
shutil.copytree(skill_dir, destination)
self.record("skill", skill_dir, destination, "migrated", backup=str(backup_path) if backup_path else "")
else:
self.record("skill", skill_dir, destination, "migrated", "Would copy skill directory")
desc_path = destination_root / "DESCRIPTION.md"
if self.execute:
desc_path.parent.mkdir(parents=True, exist_ok=True)
if not desc_path.exists():
desc_path.write_text(SKILL_CATEGORY_DESCRIPTION + "\n", encoding="utf-8")
elif not desc_path.exists():
self.record("skill-category", None, desc_path, "migrated", "Would create category description")
def copy_tree_non_destructive(
self,
source_root: Optional[Path],
destination_root: Path,
kind: str,
ignore_dir_names: Optional[set[str]] = None,
) -> None:
if not source_root or not source_root.exists():
self.record(kind, None, destination_root, "skipped", "Source directory not found")
return
ignore_dir_names = ignore_dir_names or set()
files = [
p
for p in source_root.rglob("*")
if p.is_file() and not any(part in ignore_dir_names for part in p.relative_to(source_root).parts[:-1])
]
if not files:
self.record(kind, source_root, destination_root, "skipped", "No files found")
return
copied = 0
skipped = 0
conflicts = 0
for source in files:
rel = source.relative_to(source_root)
destination = destination_root / rel
if destination.exists():
if sha256_file(source) == sha256_file(destination):
skipped += 1
continue
if not self.overwrite:
conflicts += 1
self.record(kind, source, destination, "conflict", "Destination file already exists")
continue
if self.execute:
self.maybe_backup(destination)
ensure_parent(destination)
shutil.copy2(source, destination)
copied += 1
status = "migrated" if copied else "skipped"
reason = ""
if not copied and conflicts:
status = "conflict"
reason = "All candidate files conflicted with existing destination files"
elif not copied:
reason = "No new files to copy"
self.record(kind, source_root, destination_root, status, reason, copied_files=copied, unchanged_files=skipped, conflicts=conflicts)
def archive_docs(self) -> None:
candidates = [
self.source_candidate("workspace/IDENTITY.md", "workspace.default/IDENTITY.md"),
self.source_candidate("workspace/TOOLS.md", "workspace.default/TOOLS.md"),
self.source_candidate("workspace/HEARTBEAT.md", "workspace.default/HEARTBEAT.md"),
]
for candidate in candidates:
if candidate:
self.archive_path(candidate, reason="No direct Hermes destination; archived for manual review")
for rel in ("workspace/.learnings", "workspace/memory"):
candidate = self.source_root / rel
if candidate.exists():
self.archive_path(candidate, reason="No direct Hermes destination; archived for manual review")
partially_extracted = [
("openclaw.json", "Selected Hermes-compatible values were extracted; raw OpenClaw config was not copied."),
("credentials/telegram-default-allowFrom.json", "Selected Hermes-compatible values were extracted; raw credentials file was not copied."),
]
for rel, reason in partially_extracted:
candidate = self.source_root / rel
if candidate.exists():
self.record("raw-config-skip", candidate, None, "skipped", reason)
skipped_sensitive = [
"memory/main.sqlite",
"credentials",
"devices",
"identity",
"workspace.zip",
]
for rel in skipped_sensitive:
candidate = self.source_root / rel
if candidate.exists():
self.record("sensitive-skip", candidate, None, "skipped", "Contains secrets, binary state, or product-specific runtime data")
def archive_path(self, source: Path, reason: str) -> None:
destination = self.archive_dir / relative_label(source, self.source_root) if self.archive_dir else None
if self.execute and destination is not None:
ensure_parent(destination)
if source.is_dir():
shutil.copytree(source, destination, dirs_exist_ok=True)
else:
shutil.copy2(source, destination)
self.record("archive", source, destination, "archived", reason)
else:
self.record("archive", source, destination, "archived", reason)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Migrate OpenClaw user state into Hermes Agent.")
parser.add_argument("--source", default=str(Path.home() / ".openclaw"), help="OpenClaw home directory")
parser.add_argument("--target", default=str(Path.home() / ".hermes"), help="Hermes home directory")
parser.add_argument("--workspace-target", help="Optional workspace root where AGENTS.md should be copied")
parser.add_argument("--execute", action="store_true", help="Apply changes instead of reporting a dry run")
parser.add_argument("--overwrite", action="store_true", help="Overwrite existing Hermes targets after backing them up")
parser.add_argument("--migrate-secrets", action="store_true", help="Import a narrow allowlist of Hermes-compatible secrets into ~/.hermes/.env")
parser.add_argument("--output-dir", help="Where to write report, backups, and archived docs")
return parser.parse_args()
def main() -> int:
args = parse_args()
migrator = Migrator(
source_root=Path(os.path.expanduser(args.source)).resolve(),
target_root=Path(os.path.expanduser(args.target)).resolve(),
execute=bool(args.execute),
workspace_target=Path(os.path.expanduser(args.workspace_target)).resolve() if args.workspace_target else None,
overwrite=bool(args.overwrite),
migrate_secrets=bool(args.migrate_secrets),
output_dir=Path(os.path.expanduser(args.output_dir)).resolve() if args.output_dir else None,
)
report = migrator.migrate()
print(json.dumps(report, indent=2, ensure_ascii=False))
return 0 if report["summary"].get("error", 0) == 0 else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,137 @@
from __future__ import annotations
import importlib.util
import json
import sys
from pathlib import Path
SCRIPT_PATH = (
Path(__file__).resolve().parents[2]
/ "optional-skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
def load_module():
spec = importlib.util.spec_from_file_location("openclaw_to_hermes", SCRIPT_PATH)
module = importlib.util.module_from_spec(spec)
assert spec.loader is not None
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_extract_markdown_entries_promotes_heading_context():
mod = load_module()
text = """# MEMORY.md - Long-Term Memory
## Tyler Williams
- Founder of VANTA Research
- Timezone: America/Los_Angeles
### Active Projects
- Hermes Agent
"""
entries = mod.extract_markdown_entries(text)
assert "Tyler Williams: Founder of VANTA Research" in entries
assert "Tyler Williams: Timezone: America/Los_Angeles" in entries
assert "Tyler Williams > Active Projects: Hermes Agent" in entries
def test_merge_entries_respects_limit_and_reports_overflow():
mod = load_module()
existing = ["alpha"]
incoming = ["beta", "gamma is too long"]
merged, stats, overflowed = mod.merge_entries(existing, incoming, limit=12)
assert merged == ["alpha", "beta"]
assert stats["added"] == 1
assert stats["overflowed"] == 1
assert overflowed == ["gamma is too long"]
def test_migrator_copies_skill_and_merges_allowlist(tmp_path: Path):
mod = load_module()
source = tmp_path / ".openclaw"
target = tmp_path / ".hermes"
(source / "workspace" / "skills" / "demo-skill").mkdir(parents=True)
(source / "workspace" / "skills" / "demo-skill" / "SKILL.md").write_text(
"---\nname: demo-skill\ndescription: demo\n---\n\nbody\n",
encoding="utf-8",
)
(source / "exec-approvals.json").write_text(
json.dumps(
{
"agents": {
"*": {
"allowlist": [
{"pattern": "/usr/bin/*"},
{"pattern": "/home/test/**"},
]
}
}
}
),
encoding="utf-8",
)
(target / "config.yaml").write_text("command_allowlist:\n - /usr/bin/*\n", encoding="utf-8")
migrator = mod.Migrator(
source_root=source,
target_root=target,
execute=True,
workspace_target=None,
overwrite=False,
migrate_secrets=False,
output_dir=target / "migration-report",
)
report = migrator.migrate()
imported_skill = target / "skills" / mod.SKILL_CATEGORY_DIRNAME / "demo-skill" / "SKILL.md"
assert imported_skill.exists()
assert "/home/test/**" in (target / "config.yaml").read_text(encoding="utf-8")
assert report["summary"]["migrated"] >= 2
def test_migrator_optionally_imports_supported_secrets_and_messaging_settings(tmp_path: Path):
mod = load_module()
source = tmp_path / ".openclaw"
target = tmp_path / ".hermes"
(source / "credentials").mkdir(parents=True)
(source / "openclaw.json").write_text(
json.dumps(
{
"agents": {"defaults": {"workspace": "/tmp/openclaw-workspace"}},
"channels": {"telegram": {"botToken": "123:abc"}},
}
),
encoding="utf-8",
)
(source / "credentials" / "telegram-default-allowFrom.json").write_text(
json.dumps({"allowFrom": ["111", "222"]}),
encoding="utf-8",
)
target.mkdir()
migrator = mod.Migrator(
source_root=source,
target_root=target,
execute=True,
workspace_target=None,
overwrite=False,
migrate_secrets=True,
output_dir=target / "migration-report",
)
migrator.migrate()
env_text = (target / ".env").read_text(encoding="utf-8")
assert "MESSAGING_CWD=/tmp/openclaw-workspace" in env_text
assert "TELEGRAM_ALLOWED_USERS=111,222" in env_text
assert "TELEGRAM_BOT_TOKEN=123:abc" in env_text