"""Tests for scripts/ssh_trust.py verified SSH trust helpers.""" from __future__ import annotations import importlib.util import shlex import subprocess from pathlib import Path import pytest REPO_ROOT = Path(__file__).parent.parent spec = importlib.util.spec_from_file_location("ssh_trust", REPO_ROOT / "scripts" / "ssh_trust.py") ssh_trust = importlib.util.module_from_spec(spec) spec.loader.exec_module(ssh_trust) def test_enroll_host_key_writes_scanned_key(tmp_path): calls = [] known_hosts = tmp_path / "known_hosts" def fake_run(argv, capture_output, text, timeout): calls.append(argv) return subprocess.CompletedProcess( argv, 0, stdout="example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITestKey\n", stderr="", ) written_path = ssh_trust.enroll_host_key( "example.com", port=2222, known_hosts_path=known_hosts, runner=fake_run, ) assert written_path == known_hosts assert known_hosts.read_text() == "example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITestKey\n" assert calls == [["ssh-keyscan", "-p", "2222", "-H", "example.com"]] def test_executor_requires_known_hosts_or_auto_enroll(tmp_path): executor = ssh_trust.VerifiedSSHExecutor( known_hosts_path=tmp_path / "known_hosts", auto_enroll=False, ) with pytest.raises(ssh_trust.HostKeyEnrollmentError): executor.plan("203.0.113.10", ["echo", "ok"]) def test_remote_command_is_quoted_and_local_execution_stays_shell_free(tmp_path): known_hosts = tmp_path / "known_hosts" known_hosts.write_text("203.0.113.10 ssh-ed25519 AAAAC3NzaTest\n") executor = ssh_trust.VerifiedSSHExecutor(known_hosts_path=known_hosts) command = ["python3", "run_agent.py", "--task", "hello 'quoted' world"] plan = executor.plan("203.0.113.10", command, port=2222) expected_remote_command = shlex.join(command) assert plan.local is False assert plan.remote_command == expected_remote_command assert plan.argv[-1] == expected_remote_command assert "StrictHostKeyChecking=yes" in plan.argv assert f"UserKnownHostsFile={known_hosts}" in plan.argv assert plan.argv[-2] == "root@203.0.113.10" local_plan = executor.plan("127.0.0.1", ["python3", "-V"], local=True) assert local_plan.local is True assert local_plan.argv == ["python3", "-V"] assert local_plan.remote_command is None def test_run_raises_host_key_verification_error(tmp_path): known_hosts = tmp_path / "known_hosts" known_hosts.write_text("203.0.113.10 ssh-ed25519 AAAAC3NzaTest\n") def fake_run(argv, capture_output, text, timeout): return subprocess.CompletedProcess( argv, 255, stdout="", stderr="Host key verification failed.\n", ) executor = ssh_trust.VerifiedSSHExecutor( known_hosts_path=known_hosts, runner=fake_run, ) with pytest.raises(ssh_trust.HostKeyVerificationError): executor.run("203.0.113.10", ["true"])