diff --git a/cli.py b/cli.py index a63e6053c..41f804815 100755 --- a/cli.py +++ b/cli.py @@ -992,6 +992,12 @@ def save_config_value(key_path: str, value: any) -> bool: with open(config_path, 'w') as f: yaml.dump(config, f, default_flow_style=False, sort_keys=False) + # Enforce owner-only permissions on config files (contain API keys) + try: + os.chmod(config_path, 0o600) + except (OSError, NotImplementedError): + pass + return True except Exception as e: logger.error("Failed to save config: %s", e) diff --git a/cron/jobs.py b/cron/jobs.py index c69ee7cf2..8d5c1829d 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -32,10 +32,29 @@ JOBS_FILE = CRON_DIR / "jobs.json" OUTPUT_DIR = CRON_DIR / "output" +def _secure_dir(path: Path): + """Set directory to owner-only access (0700). No-op on Windows.""" + try: + os.chmod(path, 0o700) + except (OSError, NotImplementedError): + pass # Windows or other platforms where chmod is not supported + + +def _secure_file(path: Path): + """Set file to owner-only read/write (0600). No-op on Windows.""" + try: + if path.exists(): + os.chmod(path, 0o600) + except (OSError, NotImplementedError): + pass + + def ensure_dirs(): - """Ensure cron directories exist.""" + """Ensure cron directories exist with secure permissions.""" CRON_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + _secure_dir(CRON_DIR) + _secure_dir(OUTPUT_DIR) # ============================================================================= @@ -223,6 +242,7 @@ def save_jobs(jobs: List[Dict[str, Any]]): f.flush() os.fsync(f.fileno()) os.replace(tmp_path, JOBS_FILE) + _secure_file(JOBS_FILE) except BaseException: try: os.unlink(tmp_path) @@ -400,11 +420,13 @@ def save_job_output(job_id: str, output: str): ensure_dirs() job_output_dir = OUTPUT_DIR / job_id job_output_dir.mkdir(parents=True, exist_ok=True) + _secure_dir(job_output_dir) timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S") output_file = job_output_dir / f"{timestamp}.md" with open(output_file, 'w', encoding='utf-8') as f: f.write(output) + _secure_file(output_file) return output_file diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 7a31b551d..300d18ab2 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -46,13 +46,32 @@ def get_project_root() -> Path: """Get the project installation directory.""" return Path(__file__).parent.parent.resolve() +def _secure_dir(path): + """Set directory to owner-only access (0700). No-op on Windows.""" + try: + os.chmod(path, 0o700) + except (OSError, NotImplementedError): + pass + + +def _secure_file(path): + """Set file to owner-only read/write (0600). No-op on Windows.""" + try: + if os.path.exists(str(path)): + os.chmod(path, 0o600) + except (OSError, NotImplementedError): + pass + + def ensure_hermes_home(): - """Ensure ~/.hermes directory structure exists.""" + """Ensure ~/.hermes directory structure exists with secure permissions.""" home = get_hermes_home() - (home / "cron").mkdir(parents=True, exist_ok=True) - (home / "sessions").mkdir(parents=True, exist_ok=True) - (home / "logs").mkdir(parents=True, exist_ok=True) - (home / "memories").mkdir(parents=True, exist_ok=True) + home.mkdir(parents=True, exist_ok=True) + _secure_dir(home) + for subdir in ("cron", "sessions", "logs", "memories"): + d = home / subdir + d.mkdir(parents=True, exist_ok=True) + _secure_dir(d) # ============================================================================= @@ -808,6 +827,7 @@ def save_config(config: Dict[str, Any]): sections.append("fallback") if sections: f.write(_COMMENTED_SECTIONS) + _secure_file(config_path) def load_env() -> Dict[str, str]: @@ -860,6 +880,7 @@ def save_env_value(key: str, value: str): with open(env_path, 'w', **write_kw) as f: f.writelines(lines) + _secure_file(env_path) def get_env_value(key: str) -> Optional[str]: diff --git a/tests/test_file_permissions.py b/tests/test_file_permissions.py new file mode 100644 index 000000000..cc816f6fa --- /dev/null +++ b/tests/test_file_permissions.py @@ -0,0 +1,135 @@ +"""Tests for file permissions hardening on sensitive files.""" + +import json +import os +import stat +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + + +class TestCronFilePermissions(unittest.TestCase): + """Verify cron files get secure permissions.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.cron_dir = Path(self.tmpdir) / "cron" + self.output_dir = self.cron_dir / "output" + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + @patch("cron.jobs.CRON_DIR") + @patch("cron.jobs.OUTPUT_DIR") + @patch("cron.jobs.JOBS_FILE") + def test_ensure_dirs_sets_0700(self, mock_jobs_file, mock_output, mock_cron): + mock_cron.__class__ = Path + # Use real paths + cron_dir = Path(self.tmpdir) / "cron" + output_dir = cron_dir / "output" + + with patch("cron.jobs.CRON_DIR", cron_dir), \ + patch("cron.jobs.OUTPUT_DIR", output_dir): + from cron.jobs import ensure_dirs + ensure_dirs() + + cron_mode = stat.S_IMODE(os.stat(cron_dir).st_mode) + output_mode = stat.S_IMODE(os.stat(output_dir).st_mode) + self.assertEqual(cron_mode, 0o700) + self.assertEqual(output_mode, 0o700) + + @patch("cron.jobs.CRON_DIR") + @patch("cron.jobs.OUTPUT_DIR") + @patch("cron.jobs.JOBS_FILE") + def test_save_jobs_sets_0600(self, mock_jobs_file, mock_output, mock_cron): + cron_dir = Path(self.tmpdir) / "cron" + output_dir = cron_dir / "output" + jobs_file = cron_dir / "jobs.json" + + with patch("cron.jobs.CRON_DIR", cron_dir), \ + patch("cron.jobs.OUTPUT_DIR", output_dir), \ + patch("cron.jobs.JOBS_FILE", jobs_file): + from cron.jobs import save_jobs + save_jobs([{"id": "test", "prompt": "hello"}]) + + file_mode = stat.S_IMODE(os.stat(jobs_file).st_mode) + self.assertEqual(file_mode, 0o600) + + def test_save_job_output_sets_0600(self): + output_dir = Path(self.tmpdir) / "output" + with patch("cron.jobs.OUTPUT_DIR", output_dir), \ + patch("cron.jobs.CRON_DIR", Path(self.tmpdir)), \ + patch("cron.jobs.ensure_dirs"): + output_dir.mkdir(parents=True, exist_ok=True) + from cron.jobs import save_job_output + output_file = save_job_output("test-job", "test output content") + + file_mode = stat.S_IMODE(os.stat(output_file).st_mode) + self.assertEqual(file_mode, 0o600) + + # Job output dir should also be 0700 + job_dir = output_dir / "test-job" + dir_mode = stat.S_IMODE(os.stat(job_dir).st_mode) + self.assertEqual(dir_mode, 0o700) + + +class TestConfigFilePermissions(unittest.TestCase): + """Verify config files get secure permissions.""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + + def tearDown(self): + import shutil + shutil.rmtree(self.tmpdir, ignore_errors=True) + + def test_save_config_sets_0600(self): + config_path = Path(self.tmpdir) / "config.yaml" + with patch("hermes_cli.config.get_config_path", return_value=config_path), \ + patch("hermes_cli.config.ensure_hermes_home"): + from hermes_cli.config import save_config + save_config({"model": "test/model"}) + + file_mode = stat.S_IMODE(os.stat(config_path).st_mode) + self.assertEqual(file_mode, 0o600) + + def test_save_env_value_sets_0600(self): + env_path = Path(self.tmpdir) / ".env" + with patch("hermes_cli.config.get_env_path", return_value=env_path), \ + patch("hermes_cli.config.ensure_hermes_home"): + from hermes_cli.config import save_env_value + save_env_value("TEST_KEY", "test_value") + + file_mode = stat.S_IMODE(os.stat(env_path).st_mode) + self.assertEqual(file_mode, 0o600) + + def test_ensure_hermes_home_sets_0700(self): + home = Path(self.tmpdir) / ".hermes" + with patch("hermes_cli.config.get_hermes_home", return_value=home): + from hermes_cli.config import ensure_hermes_home + ensure_hermes_home() + + home_mode = stat.S_IMODE(os.stat(home).st_mode) + self.assertEqual(home_mode, 0o700) + + for subdir in ("cron", "sessions", "logs", "memories"): + subdir_mode = stat.S_IMODE(os.stat(home / subdir).st_mode) + self.assertEqual(subdir_mode, 0o700, f"{subdir} should be 0700") + + +class TestSecureHelpers(unittest.TestCase): + """Test the _secure_file and _secure_dir helpers.""" + + def test_secure_file_nonexistent_no_error(self): + from cron.jobs import _secure_file + _secure_file(Path("/nonexistent/path/file.json")) # Should not raise + + def test_secure_dir_nonexistent_no_error(self): + from cron.jobs import _secure_dir + _secure_dir(Path("/nonexistent/path")) # Should not raise + + +if __name__ == "__main__": + unittest.main()