Files
hermes-agent/tools/tirith_security.py
Teknium cbf195e806 chore: fix 154 f-strings, simplify getattr/URL patterns, remove dead code (#3119)
Three categories of cleanup, all zero-behavioral-change:

1. F-strings without placeholders (154 fixes across 29 files)
   - Converted f'...' to '...' where no {expression} was present
   - Heaviest files: run_agent.py (24), cli.py (20), honcho_integration/cli.py (34)

2. Simplify defensive patterns in run_agent.py
   - Added explicit self._is_anthropic_oauth = False in __init__ (before
     the api_mode branch that conditionally sets it)
   - Replaced 7x getattr(self, '_is_anthropic_oauth', False) with direct
     self._is_anthropic_oauth (attribute always initialized now)
   - Added _is_openrouter_url() and _is_anthropic_url() helper methods
   - Replaced 3 inline 'openrouter' in self._base_url_lower checks

3. Remove dead code in small files
   - hermes_cli/claw.py: removed unused 'total' computation
   - tools/fuzzy_match.py: removed unused strip_indent() function and
     pattern_stripped variable

Full test suite: 6184 passed, 0 failures
E2E PTY: banner clean, tool calls work, zero garbled ANSI
2026-03-25 19:47:58 -07:00

671 lines
24 KiB
Python

"""Tirith pre-exec security scanning wrapper.
Runs the tirith binary as a subprocess to scan commands for content-level
threats (homograph URLs, pipe-to-interpreter, terminal injection, etc.).
Exit code is the verdict source of truth:
0 = allow, 1 = block, 2 = warn
JSON stdout enriches findings/summary but never overrides the verdict.
Operational failures (spawn error, timeout, unknown exit code) respect
the fail_open config setting. Programming errors propagate.
Auto-install: if tirith is not found on PATH or at the configured path,
it is automatically downloaded from GitHub releases to $HERMES_HOME/bin/tirith.
The download always verifies SHA-256 checksums. When cosign is available on
PATH, provenance verification (GitHub Actions workflow signature) is also
performed. If cosign is not installed, the download proceeds with SHA-256
verification only — still secure via HTTPS + checksum, just without supply
chain provenance proof. Installation runs in a background thread so startup
never blocks.
"""
import hashlib
import json
import logging
import os
import platform
import shutil
import stat
import subprocess
import tarfile
import tempfile
import threading
import time
import urllib.request
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
_REPO = "sheeki03/tirith"
# Cosign provenance verification — pinned to the specific release workflow
_COSIGN_IDENTITY_REGEXP = f"^https://github.com/{_REPO}/\\.github/workflows/release\\.yml@refs/tags/v"
_COSIGN_ISSUER = "https://token.actions.githubusercontent.com"
# ---------------------------------------------------------------------------
# Config helpers
# ---------------------------------------------------------------------------
def _env_bool(key: str, default: bool) -> bool:
val = os.getenv(key)
if val is None:
return default
return val.lower() in ("1", "true", "yes")
def _env_int(key: str, default: int) -> int:
val = os.getenv(key)
if val is None:
return default
try:
return int(val)
except ValueError:
return default
def _load_security_config() -> dict:
"""Load security settings from config.yaml, with env var overrides."""
defaults = {
"tirith_enabled": True,
"tirith_path": "tirith",
"tirith_timeout": 5,
"tirith_fail_open": True,
}
try:
from hermes_cli.config import load_config
cfg = load_config().get("security", {}) or {}
except Exception:
cfg = {}
return {
"tirith_enabled": _env_bool("TIRITH_ENABLED", cfg.get("tirith_enabled", defaults["tirith_enabled"])),
"tirith_path": os.getenv("TIRITH_BIN", cfg.get("tirith_path", defaults["tirith_path"])),
"tirith_timeout": _env_int("TIRITH_TIMEOUT", cfg.get("tirith_timeout", defaults["tirith_timeout"])),
"tirith_fail_open": _env_bool("TIRITH_FAIL_OPEN", cfg.get("tirith_fail_open", defaults["tirith_fail_open"])),
}
# ---------------------------------------------------------------------------
# Auto-install
# ---------------------------------------------------------------------------
# Cached path after first resolution (avoids repeated shutil.which per command).
# _INSTALL_FAILED means "we tried and failed" — prevents retry on every command.
_resolved_path: str | None | bool = None
_INSTALL_FAILED = False # sentinel: distinct from "not yet tried"
_install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_FAILED
# Background install thread coordination
_install_lock = threading.Lock()
_install_thread: threading.Thread | None = None
# Disk-persistent failure marker — avoids retry across process restarts
_MARKER_TTL = 86400 # 24 hours
def _get_hermes_home() -> str:
"""Return the Hermes home directory, respecting HERMES_HOME env var."""
return str(get_hermes_home())
def _failure_marker_path() -> str:
"""Return the path to the install-failure marker file."""
return os.path.join(_get_hermes_home(), ".tirith-install-failed")
def _read_failure_reason() -> str | None:
"""Read the failure reason from the disk marker.
Returns the reason string, or None if the marker doesn't exist or is
older than _MARKER_TTL.
"""
try:
p = _failure_marker_path()
mtime = os.path.getmtime(p)
if (time.time() - mtime) >= _MARKER_TTL:
return None
with open(p, "r") as f:
return f.read().strip()
except OSError:
return None
def _is_install_failed_on_disk() -> bool:
"""Check if a recent install failure was persisted to disk.
Returns False (allowing retry) when:
- No marker exists
- Marker is older than _MARKER_TTL (24h)
- Marker reason is 'cosign_missing' and cosign is now on PATH
"""
reason = _read_failure_reason()
if reason is None:
return False
if reason == "cosign_missing" and shutil.which("cosign"):
_clear_install_failed()
return False
return True
def _mark_install_failed(reason: str = ""):
"""Persist install failure to disk to avoid retry on next process.
Args:
reason: Short tag identifying the failure cause. Use "cosign_missing"
when cosign is not on PATH so the marker can be auto-cleared
once cosign becomes available.
"""
try:
p = _failure_marker_path()
os.makedirs(os.path.dirname(p), exist_ok=True)
with open(p, "w") as f:
f.write(reason)
except OSError:
pass
def _clear_install_failed():
"""Remove the failure marker after successful install."""
try:
os.unlink(_failure_marker_path())
except OSError:
pass
def _hermes_bin_dir() -> str:
"""Return $HERMES_HOME/bin, creating it if needed."""
d = os.path.join(_get_hermes_home(), "bin")
os.makedirs(d, exist_ok=True)
return d
def _detect_target() -> str | None:
"""Return the Rust target triple for the current platform, or None."""
system = platform.system()
machine = platform.machine().lower()
if system == "Darwin":
plat = "apple-darwin"
elif system == "Linux":
plat = "unknown-linux-gnu"
else:
return None
if machine in ("x86_64", "amd64"):
arch = "x86_64"
elif machine in ("aarch64", "arm64"):
arch = "aarch64"
else:
return None
return f"{arch}-{plat}"
def _download_file(url: str, dest: str, timeout: int = 10):
"""Download a URL to a local file."""
req = urllib.request.Request(url)
token = os.getenv("GITHUB_TOKEN")
if token:
req.add_header("Authorization", f"token {token}")
with urllib.request.urlopen(req, timeout=timeout) as resp, open(dest, "wb") as f:
shutil.copyfileobj(resp, f)
def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool | None:
"""Verify cosign provenance signature on checksums.txt.
Returns:
True — cosign verified successfully
False — cosign found but verification failed
None — cosign not available (not on PATH, or execution failed)
The caller treats both False and None as "abort auto-install" — only
True allows the install to proceed.
"""
cosign = shutil.which("cosign")
if not cosign:
logger.info("cosign not found on PATH")
return None
try:
result = subprocess.run(
[cosign, "verify-blob",
"--certificate", cert_path,
"--signature", sig_path,
"--certificate-identity-regexp", _COSIGN_IDENTITY_REGEXP,
"--certificate-oidc-issuer", _COSIGN_ISSUER,
checksums_path],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode == 0:
logger.info("cosign provenance verification passed")
return True
else:
logger.warning("cosign verification failed (exit %d): %s",
result.returncode, result.stderr.strip())
return False
except (OSError, subprocess.TimeoutExpired) as exc:
logger.warning("cosign execution failed: %s", exc)
return None
def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool:
"""Verify SHA-256 of the archive against checksums.txt."""
expected = None
with open(checksums_path) as f:
for line in f:
# Format: "<hash> <filename>"
parts = line.strip().split(" ", 1)
if len(parts) == 2 and parts[1] == archive_name:
expected = parts[0]
break
if not expected:
logger.warning("No checksum entry for %s", archive_name)
return False
sha = hashlib.sha256()
with open(archive_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha.update(chunk)
actual = sha.hexdigest()
if actual != expected:
logger.warning("Checksum mismatch: expected %s, got %s", expected, actual)
return False
return True
def _install_tirith(*, log_failures: bool = True) -> tuple[str | None, str]:
"""Download and install tirith to $HERMES_HOME/bin/tirith.
Verifies provenance via cosign and SHA-256 checksum.
Returns (installed_path, failure_reason). On success failure_reason is "".
failure_reason is a short tag used by the disk marker to decide if the
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
"""
log = logger.warning if log_failures else logger.debug
target = _detect_target()
if not target:
logger.info("tirith auto-install: unsupported platform %s/%s",
platform.system(), platform.machine())
return None, "unsupported_platform"
archive_name = f"tirith-{target}.tar.gz"
base_url = f"https://github.com/{_REPO}/releases/latest/download"
tmpdir = tempfile.mkdtemp(prefix="tirith-install-")
try:
archive_path = os.path.join(tmpdir, archive_name)
checksums_path = os.path.join(tmpdir, "checksums.txt")
sig_path = os.path.join(tmpdir, "checksums.txt.sig")
cert_path = os.path.join(tmpdir, "checksums.txt.pem")
logger.info("tirith not found — downloading latest release for %s...", target)
try:
_download_file(f"{base_url}/{archive_name}", archive_path)
_download_file(f"{base_url}/checksums.txt", checksums_path)
except Exception as exc:
log("tirith download failed: %s", exc)
return None, "download_failed"
# Cosign provenance verification — preferred but not mandatory.
# When cosign is available, we verify that the release was produced
# by the expected GitHub Actions workflow (full supply chain proof).
# Without cosign, SHA-256 checksum + HTTPS still provides integrity
# and transport-level authenticity.
cosign_verified = False
if shutil.which("cosign"):
try:
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
except Exception as exc:
logger.info("cosign artifacts unavailable (%s), proceeding with SHA-256 only", exc)
else:
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
if cosign_result is True:
cosign_verified = True
elif cosign_result is False:
# Verification explicitly rejected — abort, the release
# may have been tampered with.
log("tirith install aborted: cosign provenance verification failed")
return None, "cosign_verification_failed"
else:
# None = execution failure (timeout/OSError) — proceed
# with SHA-256 only since cosign itself is broken.
logger.info("cosign execution failed, proceeding with SHA-256 only")
else:
logger.info("cosign not on PATH — installing tirith with SHA-256 verification only "
"(install cosign for full supply chain verification)")
if not _verify_checksum(archive_path, checksums_path, archive_name):
return None, "checksum_failed"
with tarfile.open(archive_path, "r:gz") as tar:
# Extract only the tirith binary (safety: reject paths with ..)
for member in tar.getmembers():
if member.name == "tirith" or member.name.endswith("/tirith"):
if ".." in member.name:
continue
member.name = "tirith"
tar.extract(member, tmpdir)
break
else:
log("tirith binary not found in archive")
return None, "binary_not_in_archive"
src = os.path.join(tmpdir, "tirith")
dest = os.path.join(_hermes_bin_dir(), "tirith")
shutil.move(src, dest)
os.chmod(dest, os.stat(dest).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
verification = "cosign + SHA-256" if cosign_verified else "SHA-256 only"
logger.info("tirith installed to %s (%s)", dest, verification)
return dest, ""
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def _is_explicit_path(configured_path: str) -> bool:
"""Return True if the user explicitly configured a non-default tirith path."""
return configured_path != "tirith"
def _resolve_tirith_path(configured_path: str) -> str:
"""Resolve the tirith binary path, auto-installing if necessary.
If the user explicitly set a path (anything other than the bare "tirith"
default), that path is authoritative — we never fall through to
auto-download a different binary.
For the default "tirith":
1. PATH lookup via shutil.which
2. $HERMES_HOME/bin/tirith (previously auto-installed)
3. Auto-install from GitHub releases → $HERMES_HOME/bin/tirith
Failed installs are cached for the process lifetime (and persisted to
disk for 24h) to avoid repeated network attempts.
"""
global _resolved_path, _install_failure_reason
# Fast path: successfully resolved on a previous call.
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
return _resolved_path
expanded = os.path.expanduser(configured_path)
explicit = _is_explicit_path(configured_path)
install_failed = _resolved_path is _INSTALL_FAILED
# Explicit path: check it and stop. Never auto-download a replacement.
if explicit:
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
_resolved_path = expanded
return expanded
# Also try shutil.which in case it's a bare name on PATH
found = shutil.which(expanded)
if found:
_resolved_path = found
return found
logger.warning("Configured tirith path %r not found; scanning disabled", configured_path)
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "explicit_path_missing"
return expanded
# Default "tirith" — always re-run cheap local checks so a manual
# install is picked up even after a previous network failure (P2 fix:
# long-lived gateway/CLI recovers without restart).
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
_clear_install_failed()
return found
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
_clear_install_failed()
return hermes_bin
# Local checks failed. If a previous install attempt already failed,
# skip the network retry — UNLESS the failure was "cosign_missing" and
# cosign is now available (retryable cause resolved in-process).
if install_failed:
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
# Retryable cause resolved — clear sentinel and fall through to retry
_resolved_path = None
_install_failure_reason = ""
_clear_install_failed()
install_failed = False
else:
return expanded
# If a background install thread is running, don't start a parallel one —
# return the configured path; the OSError handler in check_command_security
# will apply fail_open until the thread finishes.
if _install_thread is not None and _install_thread.is_alive():
return expanded
# Check disk failure marker before attempting network download.
# Preserve the marker's real reason so in-memory retry logic can
# detect retryable causes (e.g. cosign_missing) without restart.
disk_reason = _read_failure_reason()
if disk_reason is not None and _is_install_failed_on_disk():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = disk_reason
return expanded
installed, reason = _install_tirith()
if installed:
_resolved_path = installed
_install_failure_reason = ""
_clear_install_failed()
return installed
# Install failed — cache the miss and persist reason to disk
_resolved_path = _INSTALL_FAILED
_install_failure_reason = reason
_mark_install_failed(reason)
return expanded
def _background_install(*, log_failures: bool = True):
"""Background thread target: download and install tirith."""
global _resolved_path, _install_failure_reason
with _install_lock:
# Double-check after acquiring lock (another thread may have resolved)
if _resolved_path is not None:
return
# Re-check local paths (may have been installed by another process)
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
return
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
return
installed, reason = _install_tirith(log_failures=log_failures)
if installed:
_resolved_path = installed
_install_failure_reason = ""
_clear_install_failed()
else:
_resolved_path = _INSTALL_FAILED
_install_failure_reason = reason
_mark_install_failed(reason)
def ensure_installed(*, log_failures: bool = True):
"""Ensure tirith is available, downloading in background if needed.
Quick PATH/local checks are synchronous; network download runs in a
daemon thread so startup never blocks. Safe to call multiple times.
Returns the resolved path immediately if available, or None.
"""
global _resolved_path, _install_thread, _install_failure_reason
cfg = _load_security_config()
if not cfg["tirith_enabled"]:
return None
# Already resolved from a previous call
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
path = _resolved_path
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None
configured_path = cfg["tirith_path"]
explicit = _is_explicit_path(configured_path)
expanded = os.path.expanduser(configured_path)
# Explicit path: synchronous check only, no download
if explicit:
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
_resolved_path = expanded
return expanded
found = shutil.which(expanded)
if found:
_resolved_path = found
return found
_resolved_path = _INSTALL_FAILED
_install_failure_reason = "explicit_path_missing"
return None
# Default "tirith" — quick local checks first (no network)
found = shutil.which("tirith")
if found:
_resolved_path = found
_install_failure_reason = ""
_clear_install_failed()
return found
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
_resolved_path = hermes_bin
_install_failure_reason = ""
_clear_install_failed()
return hermes_bin
# If previously failed in-memory, check if the cause is now resolved
if _resolved_path is _INSTALL_FAILED:
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
_resolved_path = None
_install_failure_reason = ""
_clear_install_failed()
else:
return None
# Check disk failure marker (skip network attempt for 24h, unless
# the cosign_missing reason was resolved — handled by _is_install_failed_on_disk).
# Preserve the marker's real reason for in-memory retry logic.
disk_reason = _read_failure_reason()
if disk_reason is not None and _is_install_failed_on_disk():
_resolved_path = _INSTALL_FAILED
_install_failure_reason = disk_reason
return None
# Need to download — launch background thread so startup doesn't block
if _install_thread is None or not _install_thread.is_alive():
_install_thread = threading.Thread(
target=_background_install,
kwargs={"log_failures": log_failures},
daemon=True,
)
_install_thread.start()
return None # Not available yet; commands will fail-open until ready
# ---------------------------------------------------------------------------
# Main API
# ---------------------------------------------------------------------------
_MAX_FINDINGS = 50
_MAX_SUMMARY_LEN = 500
def check_command_security(command: str) -> dict:
"""Run tirith security scan on a command.
Exit code determines action (0=allow, 1=block, 2=warn). JSON enriches
findings/summary. Spawn failures and timeouts respect fail_open config.
Programming errors propagate.
Returns:
{"action": "allow"|"warn"|"block", "findings": [...], "summary": str}
"""
cfg = _load_security_config()
if not cfg["tirith_enabled"]:
return {"action": "allow", "findings": [], "summary": ""}
tirith_path = _resolve_tirith_path(cfg["tirith_path"])
timeout = cfg["tirith_timeout"]
fail_open = cfg["tirith_fail_open"]
try:
result = subprocess.run(
[tirith_path, "check", "--json", "--non-interactive",
"--shell", "posix", "--", command],
capture_output=True,
text=True,
timeout=timeout,
)
except OSError as exc:
# Covers FileNotFoundError, PermissionError, exec format error
logger.warning("tirith spawn failed: %s", exc)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"}
return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"}
except subprocess.TimeoutExpired:
logger.warning("tirith timed out after %ds", timeout)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}
# Map exit code to action
exit_code = result.returncode
if exit_code == 0:
action = "allow"
elif exit_code == 1:
action = "block"
elif exit_code == 2:
action = "warn"
else:
# Unknown exit code — respect fail_open
logger.warning("tirith returned unexpected exit code %d", exit_code)
if fail_open:
return {"action": "allow", "findings": [], "summary": f"tirith exit code {exit_code} (fail-open)"}
return {"action": "block", "findings": [], "summary": f"tirith exit code {exit_code} (fail-closed)"}
# Parse JSON for enrichment (never overrides the exit code verdict)
findings = []
summary = ""
try:
data = json.loads(result.stdout) if result.stdout.strip() else {}
raw_findings = data.get("findings", [])
findings = raw_findings[:_MAX_FINDINGS]
summary = (data.get("summary", "") or "")[:_MAX_SUMMARY_LEN]
except (json.JSONDecodeError, AttributeError):
# JSON parse failure degrades findings/summary, not the verdict
logger.debug("tirith JSON parse failed, using exit code only")
if action == "block":
summary = "security issue detected (details unavailable)"
elif action == "warn":
summary = "security warning detected (details unavailable)"
return {"action": action, "findings": findings, "summary": summary}