security: enforce 0600/0700 file permissions on sensitive files (inspired by openclaw)
Enforce owner-only permissions on files and directories that contain secrets or sensitive data: - cron/jobs.py: jobs.json (0600), cron dirs (0700), job output files (0600) - hermes_cli/config.py: config.yaml (0600), .env (0600), ~/.hermes/* dirs (0700) - cli.py: config.yaml via save_config_value (0600) All chmod calls use try/except for Windows compatibility. Includes _secure_file() and _secure_dir() helpers with graceful fallback. 8 new tests verify permissions on all file types. Inspired by openclaw v2026.3.7 file permission enforcement.
This commit is contained in:
6
cli.py
6
cli.py
@@ -992,6 +992,12 @@ def save_config_value(key_path: str, value: any) -> bool:
|
||||
with open(config_path, 'w') as f:
|
||||
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
|
||||
|
||||
# Enforce owner-only permissions on config files (contain API keys)
|
||||
try:
|
||||
os.chmod(config_path, 0o600)
|
||||
except (OSError, NotImplementedError):
|
||||
pass
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Failed to save config: %s", e)
|
||||
|
||||
24
cron/jobs.py
24
cron/jobs.py
@@ -32,10 +32,29 @@ JOBS_FILE = CRON_DIR / "jobs.json"
|
||||
OUTPUT_DIR = CRON_DIR / "output"
|
||||
|
||||
|
||||
def _secure_dir(path: Path):
|
||||
"""Set directory to owner-only access (0700). No-op on Windows."""
|
||||
try:
|
||||
os.chmod(path, 0o700)
|
||||
except (OSError, NotImplementedError):
|
||||
pass # Windows or other platforms where chmod is not supported
|
||||
|
||||
|
||||
def _secure_file(path: Path):
|
||||
"""Set file to owner-only read/write (0600). No-op on Windows."""
|
||||
try:
|
||||
if path.exists():
|
||||
os.chmod(path, 0o600)
|
||||
except (OSError, NotImplementedError):
|
||||
pass
|
||||
|
||||
|
||||
def ensure_dirs():
|
||||
"""Ensure cron directories exist."""
|
||||
"""Ensure cron directories exist with secure permissions."""
|
||||
CRON_DIR.mkdir(parents=True, exist_ok=True)
|
||||
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
_secure_dir(CRON_DIR)
|
||||
_secure_dir(OUTPUT_DIR)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -223,6 +242,7 @@ def save_jobs(jobs: List[Dict[str, Any]]):
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, JOBS_FILE)
|
||||
_secure_file(JOBS_FILE)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
@@ -400,11 +420,13 @@ def save_job_output(job_id: str, output: str):
|
||||
ensure_dirs()
|
||||
job_output_dir = OUTPUT_DIR / job_id
|
||||
job_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
_secure_dir(job_output_dir)
|
||||
|
||||
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
|
||||
output_file = job_output_dir / f"{timestamp}.md"
|
||||
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
f.write(output)
|
||||
_secure_file(output_file)
|
||||
|
||||
return output_file
|
||||
|
||||
@@ -46,13 +46,32 @@ def get_project_root() -> Path:
|
||||
"""Get the project installation directory."""
|
||||
return Path(__file__).parent.parent.resolve()
|
||||
|
||||
def _secure_dir(path):
|
||||
"""Set directory to owner-only access (0700). No-op on Windows."""
|
||||
try:
|
||||
os.chmod(path, 0o700)
|
||||
except (OSError, NotImplementedError):
|
||||
pass
|
||||
|
||||
|
||||
def _secure_file(path):
|
||||
"""Set file to owner-only read/write (0600). No-op on Windows."""
|
||||
try:
|
||||
if os.path.exists(str(path)):
|
||||
os.chmod(path, 0o600)
|
||||
except (OSError, NotImplementedError):
|
||||
pass
|
||||
|
||||
|
||||
def ensure_hermes_home():
|
||||
"""Ensure ~/.hermes directory structure exists."""
|
||||
"""Ensure ~/.hermes directory structure exists with secure permissions."""
|
||||
home = get_hermes_home()
|
||||
(home / "cron").mkdir(parents=True, exist_ok=True)
|
||||
(home / "sessions").mkdir(parents=True, exist_ok=True)
|
||||
(home / "logs").mkdir(parents=True, exist_ok=True)
|
||||
(home / "memories").mkdir(parents=True, exist_ok=True)
|
||||
home.mkdir(parents=True, exist_ok=True)
|
||||
_secure_dir(home)
|
||||
for subdir in ("cron", "sessions", "logs", "memories"):
|
||||
d = home / subdir
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
_secure_dir(d)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -808,6 +827,7 @@ def save_config(config: Dict[str, Any]):
|
||||
sections.append("fallback")
|
||||
if sections:
|
||||
f.write(_COMMENTED_SECTIONS)
|
||||
_secure_file(config_path)
|
||||
|
||||
|
||||
def load_env() -> Dict[str, str]:
|
||||
@@ -860,6 +880,7 @@ def save_env_value(key: str, value: str):
|
||||
|
||||
with open(env_path, 'w', **write_kw) as f:
|
||||
f.writelines(lines)
|
||||
_secure_file(env_path)
|
||||
|
||||
|
||||
def get_env_value(key: str) -> Optional[str]:
|
||||
|
||||
135
tests/test_file_permissions.py
Normal file
135
tests/test_file_permissions.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Tests for file permissions hardening on sensitive files."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestCronFilePermissions(unittest.TestCase):
|
||||
"""Verify cron files get secure permissions."""
|
||||
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.cron_dir = Path(self.tmpdir) / "cron"
|
||||
self.output_dir = self.cron_dir / "output"
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
@patch("cron.jobs.CRON_DIR")
|
||||
@patch("cron.jobs.OUTPUT_DIR")
|
||||
@patch("cron.jobs.JOBS_FILE")
|
||||
def test_ensure_dirs_sets_0700(self, mock_jobs_file, mock_output, mock_cron):
|
||||
mock_cron.__class__ = Path
|
||||
# Use real paths
|
||||
cron_dir = Path(self.tmpdir) / "cron"
|
||||
output_dir = cron_dir / "output"
|
||||
|
||||
with patch("cron.jobs.CRON_DIR", cron_dir), \
|
||||
patch("cron.jobs.OUTPUT_DIR", output_dir):
|
||||
from cron.jobs import ensure_dirs
|
||||
ensure_dirs()
|
||||
|
||||
cron_mode = stat.S_IMODE(os.stat(cron_dir).st_mode)
|
||||
output_mode = stat.S_IMODE(os.stat(output_dir).st_mode)
|
||||
self.assertEqual(cron_mode, 0o700)
|
||||
self.assertEqual(output_mode, 0o700)
|
||||
|
||||
@patch("cron.jobs.CRON_DIR")
|
||||
@patch("cron.jobs.OUTPUT_DIR")
|
||||
@patch("cron.jobs.JOBS_FILE")
|
||||
def test_save_jobs_sets_0600(self, mock_jobs_file, mock_output, mock_cron):
|
||||
cron_dir = Path(self.tmpdir) / "cron"
|
||||
output_dir = cron_dir / "output"
|
||||
jobs_file = cron_dir / "jobs.json"
|
||||
|
||||
with patch("cron.jobs.CRON_DIR", cron_dir), \
|
||||
patch("cron.jobs.OUTPUT_DIR", output_dir), \
|
||||
patch("cron.jobs.JOBS_FILE", jobs_file):
|
||||
from cron.jobs import save_jobs
|
||||
save_jobs([{"id": "test", "prompt": "hello"}])
|
||||
|
||||
file_mode = stat.S_IMODE(os.stat(jobs_file).st_mode)
|
||||
self.assertEqual(file_mode, 0o600)
|
||||
|
||||
def test_save_job_output_sets_0600(self):
|
||||
output_dir = Path(self.tmpdir) / "output"
|
||||
with patch("cron.jobs.OUTPUT_DIR", output_dir), \
|
||||
patch("cron.jobs.CRON_DIR", Path(self.tmpdir)), \
|
||||
patch("cron.jobs.ensure_dirs"):
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
from cron.jobs import save_job_output
|
||||
output_file = save_job_output("test-job", "test output content")
|
||||
|
||||
file_mode = stat.S_IMODE(os.stat(output_file).st_mode)
|
||||
self.assertEqual(file_mode, 0o600)
|
||||
|
||||
# Job output dir should also be 0700
|
||||
job_dir = output_dir / "test-job"
|
||||
dir_mode = stat.S_IMODE(os.stat(job_dir).st_mode)
|
||||
self.assertEqual(dir_mode, 0o700)
|
||||
|
||||
|
||||
class TestConfigFilePermissions(unittest.TestCase):
|
||||
"""Verify config files get secure permissions."""
|
||||
|
||||
def setUp(self):
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
|
||||
def test_save_config_sets_0600(self):
|
||||
config_path = Path(self.tmpdir) / "config.yaml"
|
||||
with patch("hermes_cli.config.get_config_path", return_value=config_path), \
|
||||
patch("hermes_cli.config.ensure_hermes_home"):
|
||||
from hermes_cli.config import save_config
|
||||
save_config({"model": "test/model"})
|
||||
|
||||
file_mode = stat.S_IMODE(os.stat(config_path).st_mode)
|
||||
self.assertEqual(file_mode, 0o600)
|
||||
|
||||
def test_save_env_value_sets_0600(self):
|
||||
env_path = Path(self.tmpdir) / ".env"
|
||||
with patch("hermes_cli.config.get_env_path", return_value=env_path), \
|
||||
patch("hermes_cli.config.ensure_hermes_home"):
|
||||
from hermes_cli.config import save_env_value
|
||||
save_env_value("TEST_KEY", "test_value")
|
||||
|
||||
file_mode = stat.S_IMODE(os.stat(env_path).st_mode)
|
||||
self.assertEqual(file_mode, 0o600)
|
||||
|
||||
def test_ensure_hermes_home_sets_0700(self):
|
||||
home = Path(self.tmpdir) / ".hermes"
|
||||
with patch("hermes_cli.config.get_hermes_home", return_value=home):
|
||||
from hermes_cli.config import ensure_hermes_home
|
||||
ensure_hermes_home()
|
||||
|
||||
home_mode = stat.S_IMODE(os.stat(home).st_mode)
|
||||
self.assertEqual(home_mode, 0o700)
|
||||
|
||||
for subdir in ("cron", "sessions", "logs", "memories"):
|
||||
subdir_mode = stat.S_IMODE(os.stat(home / subdir).st_mode)
|
||||
self.assertEqual(subdir_mode, 0o700, f"{subdir} should be 0700")
|
||||
|
||||
|
||||
class TestSecureHelpers(unittest.TestCase):
|
||||
"""Test the _secure_file and _secure_dir helpers."""
|
||||
|
||||
def test_secure_file_nonexistent_no_error(self):
|
||||
from cron.jobs import _secure_file
|
||||
_secure_file(Path("/nonexistent/path/file.json")) # Should not raise
|
||||
|
||||
def test_secure_dir_nonexistent_no_error(self):
|
||||
from cron.jobs import _secure_dir
|
||||
_secure_dir(Path("/nonexistent/path")) # Should not raise
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user