fix: sanitize corrupted .env files on read and during migration

Fixes two corruption patterns that break API keys during updates:

1. Concatenated KEY=VALUE pairs on a single line due to missing newlines
   (e.g. ANTHROPIC_API_KEY=sk-...OPENAI_BASE_URL=https://...). Uses a
   known-keys set to safely detect and split concatenated entries without
   false-splitting values that contain uppercase text.

2. Stale KEY=*** placeholder entries left by incomplete setup runs that
   never get updated and shadow real credentials.

Changes:
- Add _sanitize_env_lines() that splits concatenated known keys and drops
  *** placeholders
- Add sanitize_env_file() public API for explicit repair
- Call sanitization in save_env_value() on every read (self-healing)
- Call sanitize_env_file() at the start of migrate_config() so existing
  corrupted files are repaired on update
- 12 new tests covering splits, placeholders, edge cases, and integration
This commit is contained in:
teknium1
2026-03-17 01:13:34 -07:00
parent 37862f74fa
commit 634c1f6752
2 changed files with 256 additions and 0 deletions

View File

@@ -25,6 +25,18 @@ from typing import Dict, Any, Optional, List, Tuple
_IS_WINDOWS = platform.system() == "Windows"
_ENV_VAR_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
# Env var names written to .env that aren't in OPTIONAL_ENV_VARS
# (managed by setup/provider flows directly).
_EXTRA_ENV_KEYS = frozenset({
"OPENAI_API_KEY", "OPENAI_BASE_URL",
"ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN",
"AUXILIARY_VISION_MODEL",
"DISCORD_HOME_CHANNEL", "TELEGRAM_HOME_CHANNEL",
"SIGNAL_ACCOUNT", "SIGNAL_HTTP_URL",
"SIGNAL_ALLOWED_USERS", "SIGNAL_GROUP_ALLOWED_USERS",
"TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT",
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
})
import yaml
@@ -765,6 +777,14 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
Dict with migration results: {"env_added": [...], "config_added": [...], "warnings": [...]}
"""
results = {"env_added": [], "config_added": [], "warnings": []}
# ── Always: sanitize .env (split concatenated keys, drop *** placeholders) ──
try:
fixes = sanitize_env_file()
if fixes and not quiet:
print(f" ✓ Repaired .env file ({fixes} corrupted entries fixed)")
except Exception:
pass # best-effort; don't block migration on sanitize failure
# Check config version
current_ver, latest_ver = check_config_version()
@@ -1121,6 +1141,108 @@ def load_env() -> Dict[str, str]:
return env_vars
def _sanitize_env_lines(lines: list) -> list:
"""Fix corrupted .env lines before writing.
Handles two known corruption patterns:
1. Concatenated KEY=VALUE pairs on a single line (missing newline between
entries, e.g. ``ANTHROPIC_API_KEY=sk-...OPENAI_BASE_URL=https://...``).
2. Stale ``KEY=***`` placeholder entries left by incomplete setup runs.
Uses a known-keys set (OPTIONAL_ENV_VARS + _EXTRA_ENV_KEYS) so we only
split on real Hermes env var names, avoiding false positives from values
that happen to contain uppercase text with ``=``.
"""
# Build the known keys set lazily from OPTIONAL_ENV_VARS + extras.
# Done inside the function so OPTIONAL_ENV_VARS is guaranteed to be defined.
known_keys = set(OPTIONAL_ENV_VARS.keys()) | _EXTRA_ENV_KEYS
sanitized: list[str] = []
for line in lines:
raw = line.rstrip("\r\n")
stripped = raw.strip()
# Preserve blank lines and comments
if not stripped or stripped.startswith("#"):
sanitized.append(raw + "\n")
continue
# Drop stale *** placeholder entries
if "=" in stripped:
_k, _, _v = stripped.partition("=")
if _v.strip().strip("'\"") == "***":
continue
# Detect concatenated KEY=VALUE pairs on one line.
# Search for known KEY= patterns at any position in the line.
split_positions = []
for key_name in known_keys:
needle = key_name + "="
idx = stripped.find(needle)
while idx >= 0:
split_positions.append(idx)
idx = stripped.find(needle, idx + len(needle))
if len(split_positions) > 1:
split_positions.sort()
# Deduplicate (shouldn't happen, but be safe)
split_positions = sorted(set(split_positions))
for i, pos in enumerate(split_positions):
end = split_positions[i + 1] if i + 1 < len(split_positions) else len(stripped)
part = stripped[pos:end].strip()
if part:
sanitized.append(part + "\n")
else:
sanitized.append(stripped + "\n")
return sanitized
def sanitize_env_file() -> int:
"""Read, sanitize, and rewrite ~/.hermes/.env in place.
Returns the number of lines that were fixed (concatenation splits +
placeholder removals). Returns 0 when no changes are needed.
"""
env_path = get_env_path()
if not env_path.exists():
return 0
read_kw = {"encoding": "utf-8", "errors": "replace"} if _IS_WINDOWS else {}
write_kw = {"encoding": "utf-8"} if _IS_WINDOWS else {}
with open(env_path, **read_kw) as f:
original_lines = f.readlines()
sanitized = _sanitize_env_lines(original_lines)
if sanitized == original_lines:
return 0
# Count fixes: difference in line count (from splits) + removed lines
fixes = abs(len(sanitized) - len(original_lines))
if fixes == 0:
# Lines changed content (e.g. *** removal) even if count is same
fixes = sum(1 for a, b in zip(original_lines, sanitized) if a != b)
fixes += abs(len(sanitized) - len(original_lines))
fd, tmp_path = tempfile.mkstemp(dir=str(env_path.parent), suffix=".tmp", prefix=".env_")
try:
with os.fdopen(fd, "w", **write_kw) as f:
f.writelines(sanitized)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, env_path)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
_secure_file(env_path)
return fixes
def save_env_value(key: str, value: str):
"""Save or update a value in ~/.hermes/.env."""
if not _ENV_VAR_NAME_RE.match(key):
@@ -1138,6 +1260,8 @@ def save_env_value(key: str, value: str):
if env_path.exists():
with open(env_path, **read_kw) as f:
lines = f.readlines()
# Sanitize on every read: split concatenated keys, drop stale placeholders
lines = _sanitize_env_lines(lines)
# Find and update or append
found = False

View File

@@ -15,6 +15,8 @@ from hermes_cli.config import (
save_config,
save_env_value,
save_env_value_secure,
sanitize_env_file,
_sanitize_env_lines,
)
@@ -203,3 +205,133 @@ class TestSaveConfigAtomicity:
raw = yaml.safe_load(f)
assert raw["model"] == "test/atomic-model"
assert raw["agent"]["max_turns"] == 77
class TestSanitizeEnvLines:
"""Tests for .env file corruption repair."""
def test_splits_concatenated_keys(self):
"""Two KEY=VALUE pairs jammed on one line get split."""
lines = ["ANTHROPIC_API_KEY=sk-ant-xxxOPENAI_BASE_URL=https://api.openai.com/v1\n"]
result = _sanitize_env_lines(lines)
assert result == [
"ANTHROPIC_API_KEY=sk-ant-xxx\n",
"OPENAI_BASE_URL=https://api.openai.com/v1\n",
]
def test_drops_stale_placeholder(self):
"""KEY=*** entries are removed."""
lines = [
"OPENROUTER_API_KEY=sk-or-real\n",
"ANTHROPIC_TOKEN=***\n",
"FAL_KEY=fal-real\n",
]
result = _sanitize_env_lines(lines)
assert result == [
"OPENROUTER_API_KEY=sk-or-real\n",
"FAL_KEY=fal-real\n",
]
def test_drops_quoted_placeholder(self):
"""KEY='***' and KEY=\"***\" are also removed."""
lines = ['ANTHROPIC_TOKEN="***"\n', "OTHER_KEY='***'\n"]
result = _sanitize_env_lines(lines)
assert result == []
def test_preserves_clean_file(self):
"""A well-formed .env file passes through unchanged (modulo trailing newlines)."""
lines = [
"OPENROUTER_API_KEY=sk-or-xxx\n",
"FIRECRAWL_API_KEY=fc-xxx\n",
"# a comment\n",
"\n",
]
result = _sanitize_env_lines(lines)
assert result == lines
def test_preserves_comments_and_blanks(self):
lines = ["# comment\n", "\n", "KEY=val\n"]
result = _sanitize_env_lines(lines)
assert result == lines
def test_adds_missing_trailing_newline(self):
"""Lines missing trailing newline get one added."""
lines = ["FOO_BAR=baz"]
result = _sanitize_env_lines(lines)
assert result == ["FOO_BAR=baz\n"]
def test_three_concatenated_keys(self):
"""Three known keys on one line all get separated."""
lines = ["FAL_KEY=111FIRECRAWL_API_KEY=222GITHUB_TOKEN=333\n"]
result = _sanitize_env_lines(lines)
assert result == [
"FAL_KEY=111\n",
"FIRECRAWL_API_KEY=222\n",
"GITHUB_TOKEN=333\n",
]
def test_value_with_equals_sign_not_split(self):
"""A value containing '=' shouldn't be falsely split (lowercase in value)."""
lines = ["OPENAI_BASE_URL=https://api.example.com/v1?key=abc123\n"]
result = _sanitize_env_lines(lines)
assert result == lines
def test_unknown_keys_not_split(self):
"""Unknown key names on one line are NOT split (avoids false positives)."""
lines = ["CUSTOM_VAR=value123OTHER_THING=value456\n"]
result = _sanitize_env_lines(lines)
# Unknown keys stay on one line — no false split
assert len(result) == 1
def test_value_ending_with_digits_still_splits(self):
"""Concatenation is detected even when value ends with digits."""
lines = ["OPENROUTER_API_KEY=sk-or-v1-abc123OPENAI_BASE_URL=https://api.openai.com/v1\n"]
result = _sanitize_env_lines(lines)
assert len(result) == 2
assert result[0].startswith("OPENROUTER_API_KEY=")
assert result[1].startswith("OPENAI_BASE_URL=")
def test_save_env_value_fixes_corruption_on_write(self, tmp_path):
"""save_env_value sanitizes corrupted lines when writing a new key."""
env_file = tmp_path / ".env"
env_file.write_text(
"ANTHROPIC_API_KEY=sk-antOPENAI_BASE_URL=https://api.openai.com/v1\n"
"STALE_KEY=***\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
save_env_value("NEW_KEY", "new-value")
content = env_file.read_text()
lines = content.strip().split("\n")
# Corrupted line should be split, placeholder removed, new key added
assert "ANTHROPIC_API_KEY=sk-ant" in lines
assert "OPENAI_BASE_URL=https://api.openai.com/v1" in lines
assert "NEW_KEY=new-value" in lines
assert "STALE_KEY=***" not in content
def test_sanitize_env_file_returns_fix_count(self, tmp_path):
"""sanitize_env_file reports how many entries were fixed."""
env_file = tmp_path / ".env"
env_file.write_text(
"FAL_KEY=good\n"
"OPENROUTER_API_KEY=valFIRECRAWL_API_KEY=val2\n"
"STALE=***\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
fixes = sanitize_env_file()
assert fixes > 0
# Verify file is now clean
content = env_file.read_text()
assert "STALE=***" not in content
assert "OPENROUTER_API_KEY=val\n" in content
assert "FIRECRAWL_API_KEY=val2\n" in content
def test_sanitize_env_file_noop_on_clean_file(self, tmp_path):
"""No changes when file is already clean."""
env_file = tmp_path / ".env"
env_file.write_text("GOOD_KEY=good\nOTHER_KEY=other\n")
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
fixes = sanitize_env_file()
assert fixes == 0