Three categories of cleanup, all zero-behavioral-change:
1. F-strings without placeholders (154 fixes across 29 files)
- Converted f'...' to '...' where no {expression} was present
- Heaviest files: run_agent.py (24), cli.py (20), honcho_integration/cli.py (34)
2. Simplify defensive patterns in run_agent.py
- Added explicit self._is_anthropic_oauth = False in __init__ (before
the api_mode branch that conditionally sets it)
- Replaced 7x getattr(self, '_is_anthropic_oauth', False) with direct
self._is_anthropic_oauth (attribute always initialized now)
- Added _is_openrouter_url() and _is_anthropic_url() helper methods
- Replaced 3 inline 'openrouter' in self._base_url_lower checks
3. Remove dead code in small files
- hermes_cli/claw.py: removed unused 'total' computation
- tools/fuzzy_match.py: removed unused strip_indent() function and
pattern_stripped variable
Full test suite: 6184 passed, 0 failures
E2E PTY: banner clean, tool calls work, zero garbled ANSI
671 lines
24 KiB
Python
671 lines
24 KiB
Python
"""Tirith pre-exec security scanning wrapper.
|
|
|
|
Runs the tirith binary as a subprocess to scan commands for content-level
|
|
threats (homograph URLs, pipe-to-interpreter, terminal injection, etc.).
|
|
|
|
Exit code is the verdict source of truth:
|
|
0 = allow, 1 = block, 2 = warn
|
|
|
|
JSON stdout enriches findings/summary but never overrides the verdict.
|
|
Operational failures (spawn error, timeout, unknown exit code) respect
|
|
the fail_open config setting. Programming errors propagate.
|
|
|
|
Auto-install: if tirith is not found on PATH or at the configured path,
|
|
it is automatically downloaded from GitHub releases to $HERMES_HOME/bin/tirith.
|
|
The download always verifies SHA-256 checksums. When cosign is available on
|
|
PATH, provenance verification (GitHub Actions workflow signature) is also
|
|
performed. If cosign is not installed, the download proceeds with SHA-256
|
|
verification only — still secure via HTTPS + checksum, just without supply
|
|
chain provenance proof. Installation runs in a background thread so startup
|
|
never blocks.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tarfile
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
|
|
from hermes_constants import get_hermes_home
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_REPO = "sheeki03/tirith"
|
|
|
|
# Cosign provenance verification — pinned to the specific release workflow
|
|
_COSIGN_IDENTITY_REGEXP = f"^https://github.com/{_REPO}/\\.github/workflows/release\\.yml@refs/tags/v"
|
|
_COSIGN_ISSUER = "https://token.actions.githubusercontent.com"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _env_bool(key: str, default: bool) -> bool:
|
|
val = os.getenv(key)
|
|
if val is None:
|
|
return default
|
|
return val.lower() in ("1", "true", "yes")
|
|
|
|
|
|
def _env_int(key: str, default: int) -> int:
|
|
val = os.getenv(key)
|
|
if val is None:
|
|
return default
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return default
|
|
|
|
|
|
def _load_security_config() -> dict:
|
|
"""Load security settings from config.yaml, with env var overrides."""
|
|
defaults = {
|
|
"tirith_enabled": True,
|
|
"tirith_path": "tirith",
|
|
"tirith_timeout": 5,
|
|
"tirith_fail_open": True,
|
|
}
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config().get("security", {}) or {}
|
|
except Exception:
|
|
cfg = {}
|
|
|
|
return {
|
|
"tirith_enabled": _env_bool("TIRITH_ENABLED", cfg.get("tirith_enabled", defaults["tirith_enabled"])),
|
|
"tirith_path": os.getenv("TIRITH_BIN", cfg.get("tirith_path", defaults["tirith_path"])),
|
|
"tirith_timeout": _env_int("TIRITH_TIMEOUT", cfg.get("tirith_timeout", defaults["tirith_timeout"])),
|
|
"tirith_fail_open": _env_bool("TIRITH_FAIL_OPEN", cfg.get("tirith_fail_open", defaults["tirith_fail_open"])),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-install
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Cached path after first resolution (avoids repeated shutil.which per command).
|
|
# _INSTALL_FAILED means "we tried and failed" — prevents retry on every command.
|
|
_resolved_path: str | None | bool = None
|
|
_INSTALL_FAILED = False # sentinel: distinct from "not yet tried"
|
|
_install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_FAILED
|
|
|
|
# Background install thread coordination
|
|
_install_lock = threading.Lock()
|
|
_install_thread: threading.Thread | None = None
|
|
|
|
# Disk-persistent failure marker — avoids retry across process restarts
|
|
_MARKER_TTL = 86400 # 24 hours
|
|
|
|
|
|
def _get_hermes_home() -> str:
|
|
"""Return the Hermes home directory, respecting HERMES_HOME env var."""
|
|
return str(get_hermes_home())
|
|
|
|
|
|
def _failure_marker_path() -> str:
|
|
"""Return the path to the install-failure marker file."""
|
|
return os.path.join(_get_hermes_home(), ".tirith-install-failed")
|
|
|
|
|
|
def _read_failure_reason() -> str | None:
|
|
"""Read the failure reason from the disk marker.
|
|
|
|
Returns the reason string, or None if the marker doesn't exist or is
|
|
older than _MARKER_TTL.
|
|
"""
|
|
try:
|
|
p = _failure_marker_path()
|
|
mtime = os.path.getmtime(p)
|
|
if (time.time() - mtime) >= _MARKER_TTL:
|
|
return None
|
|
with open(p, "r") as f:
|
|
return f.read().strip()
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _is_install_failed_on_disk() -> bool:
|
|
"""Check if a recent install failure was persisted to disk.
|
|
|
|
Returns False (allowing retry) when:
|
|
- No marker exists
|
|
- Marker is older than _MARKER_TTL (24h)
|
|
- Marker reason is 'cosign_missing' and cosign is now on PATH
|
|
"""
|
|
reason = _read_failure_reason()
|
|
if reason is None:
|
|
return False
|
|
if reason == "cosign_missing" and shutil.which("cosign"):
|
|
_clear_install_failed()
|
|
return False
|
|
return True
|
|
|
|
|
|
def _mark_install_failed(reason: str = ""):
|
|
"""Persist install failure to disk to avoid retry on next process.
|
|
|
|
Args:
|
|
reason: Short tag identifying the failure cause. Use "cosign_missing"
|
|
when cosign is not on PATH so the marker can be auto-cleared
|
|
once cosign becomes available.
|
|
"""
|
|
try:
|
|
p = _failure_marker_path()
|
|
os.makedirs(os.path.dirname(p), exist_ok=True)
|
|
with open(p, "w") as f:
|
|
f.write(reason)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _clear_install_failed():
|
|
"""Remove the failure marker after successful install."""
|
|
try:
|
|
os.unlink(_failure_marker_path())
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _hermes_bin_dir() -> str:
|
|
"""Return $HERMES_HOME/bin, creating it if needed."""
|
|
d = os.path.join(_get_hermes_home(), "bin")
|
|
os.makedirs(d, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _detect_target() -> str | None:
|
|
"""Return the Rust target triple for the current platform, or None."""
|
|
system = platform.system()
|
|
machine = platform.machine().lower()
|
|
|
|
if system == "Darwin":
|
|
plat = "apple-darwin"
|
|
elif system == "Linux":
|
|
plat = "unknown-linux-gnu"
|
|
else:
|
|
return None
|
|
|
|
if machine in ("x86_64", "amd64"):
|
|
arch = "x86_64"
|
|
elif machine in ("aarch64", "arm64"):
|
|
arch = "aarch64"
|
|
else:
|
|
return None
|
|
|
|
return f"{arch}-{plat}"
|
|
|
|
|
|
def _download_file(url: str, dest: str, timeout: int = 10):
|
|
"""Download a URL to a local file."""
|
|
req = urllib.request.Request(url)
|
|
token = os.getenv("GITHUB_TOKEN")
|
|
if token:
|
|
req.add_header("Authorization", f"token {token}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp, open(dest, "wb") as f:
|
|
shutil.copyfileobj(resp, f)
|
|
|
|
|
|
def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool | None:
|
|
"""Verify cosign provenance signature on checksums.txt.
|
|
|
|
Returns:
|
|
True — cosign verified successfully
|
|
False — cosign found but verification failed
|
|
None — cosign not available (not on PATH, or execution failed)
|
|
|
|
The caller treats both False and None as "abort auto-install" — only
|
|
True allows the install to proceed.
|
|
"""
|
|
cosign = shutil.which("cosign")
|
|
if not cosign:
|
|
logger.info("cosign not found on PATH")
|
|
return None
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[cosign, "verify-blob",
|
|
"--certificate", cert_path,
|
|
"--signature", sig_path,
|
|
"--certificate-identity-regexp", _COSIGN_IDENTITY_REGEXP,
|
|
"--certificate-oidc-issuer", _COSIGN_ISSUER,
|
|
checksums_path],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info("cosign provenance verification passed")
|
|
return True
|
|
else:
|
|
logger.warning("cosign verification failed (exit %d): %s",
|
|
result.returncode, result.stderr.strip())
|
|
return False
|
|
except (OSError, subprocess.TimeoutExpired) as exc:
|
|
logger.warning("cosign execution failed: %s", exc)
|
|
return None
|
|
|
|
|
|
def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool:
|
|
"""Verify SHA-256 of the archive against checksums.txt."""
|
|
expected = None
|
|
with open(checksums_path) as f:
|
|
for line in f:
|
|
# Format: "<hash> <filename>"
|
|
parts = line.strip().split(" ", 1)
|
|
if len(parts) == 2 and parts[1] == archive_name:
|
|
expected = parts[0]
|
|
break
|
|
if not expected:
|
|
logger.warning("No checksum entry for %s", archive_name)
|
|
return False
|
|
|
|
sha = hashlib.sha256()
|
|
with open(archive_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
sha.update(chunk)
|
|
actual = sha.hexdigest()
|
|
if actual != expected:
|
|
logger.warning("Checksum mismatch: expected %s, got %s", expected, actual)
|
|
return False
|
|
return True
|
|
|
|
|
|
def _install_tirith(*, log_failures: bool = True) -> tuple[str | None, str]:
|
|
"""Download and install tirith to $HERMES_HOME/bin/tirith.
|
|
|
|
Verifies provenance via cosign and SHA-256 checksum.
|
|
Returns (installed_path, failure_reason). On success failure_reason is "".
|
|
failure_reason is a short tag used by the disk marker to decide if the
|
|
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
|
|
"""
|
|
log = logger.warning if log_failures else logger.debug
|
|
|
|
target = _detect_target()
|
|
if not target:
|
|
logger.info("tirith auto-install: unsupported platform %s/%s",
|
|
platform.system(), platform.machine())
|
|
return None, "unsupported_platform"
|
|
|
|
archive_name = f"tirith-{target}.tar.gz"
|
|
base_url = f"https://github.com/{_REPO}/releases/latest/download"
|
|
|
|
tmpdir = tempfile.mkdtemp(prefix="tirith-install-")
|
|
try:
|
|
archive_path = os.path.join(tmpdir, archive_name)
|
|
checksums_path = os.path.join(tmpdir, "checksums.txt")
|
|
sig_path = os.path.join(tmpdir, "checksums.txt.sig")
|
|
cert_path = os.path.join(tmpdir, "checksums.txt.pem")
|
|
|
|
logger.info("tirith not found — downloading latest release for %s...", target)
|
|
|
|
try:
|
|
_download_file(f"{base_url}/{archive_name}", archive_path)
|
|
_download_file(f"{base_url}/checksums.txt", checksums_path)
|
|
except Exception as exc:
|
|
log("tirith download failed: %s", exc)
|
|
return None, "download_failed"
|
|
|
|
# Cosign provenance verification — preferred but not mandatory.
|
|
# When cosign is available, we verify that the release was produced
|
|
# by the expected GitHub Actions workflow (full supply chain proof).
|
|
# Without cosign, SHA-256 checksum + HTTPS still provides integrity
|
|
# and transport-level authenticity.
|
|
cosign_verified = False
|
|
if shutil.which("cosign"):
|
|
try:
|
|
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
|
|
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
|
|
except Exception as exc:
|
|
logger.info("cosign artifacts unavailable (%s), proceeding with SHA-256 only", exc)
|
|
else:
|
|
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
|
|
if cosign_result is True:
|
|
cosign_verified = True
|
|
elif cosign_result is False:
|
|
# Verification explicitly rejected — abort, the release
|
|
# may have been tampered with.
|
|
log("tirith install aborted: cosign provenance verification failed")
|
|
return None, "cosign_verification_failed"
|
|
else:
|
|
# None = execution failure (timeout/OSError) — proceed
|
|
# with SHA-256 only since cosign itself is broken.
|
|
logger.info("cosign execution failed, proceeding with SHA-256 only")
|
|
else:
|
|
logger.info("cosign not on PATH — installing tirith with SHA-256 verification only "
|
|
"(install cosign for full supply chain verification)")
|
|
|
|
if not _verify_checksum(archive_path, checksums_path, archive_name):
|
|
return None, "checksum_failed"
|
|
|
|
with tarfile.open(archive_path, "r:gz") as tar:
|
|
# Extract only the tirith binary (safety: reject paths with ..)
|
|
for member in tar.getmembers():
|
|
if member.name == "tirith" or member.name.endswith("/tirith"):
|
|
if ".." in member.name:
|
|
continue
|
|
member.name = "tirith"
|
|
tar.extract(member, tmpdir)
|
|
break
|
|
else:
|
|
log("tirith binary not found in archive")
|
|
return None, "binary_not_in_archive"
|
|
|
|
src = os.path.join(tmpdir, "tirith")
|
|
dest = os.path.join(_hermes_bin_dir(), "tirith")
|
|
shutil.move(src, dest)
|
|
os.chmod(dest, os.stat(dest).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
|
|
|
verification = "cosign + SHA-256" if cosign_verified else "SHA-256 only"
|
|
logger.info("tirith installed to %s (%s)", dest, verification)
|
|
return dest, ""
|
|
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
|
|
def _is_explicit_path(configured_path: str) -> bool:
|
|
"""Return True if the user explicitly configured a non-default tirith path."""
|
|
return configured_path != "tirith"
|
|
|
|
|
|
def _resolve_tirith_path(configured_path: str) -> str:
|
|
"""Resolve the tirith binary path, auto-installing if necessary.
|
|
|
|
If the user explicitly set a path (anything other than the bare "tirith"
|
|
default), that path is authoritative — we never fall through to
|
|
auto-download a different binary.
|
|
|
|
For the default "tirith":
|
|
1. PATH lookup via shutil.which
|
|
2. $HERMES_HOME/bin/tirith (previously auto-installed)
|
|
3. Auto-install from GitHub releases → $HERMES_HOME/bin/tirith
|
|
|
|
Failed installs are cached for the process lifetime (and persisted to
|
|
disk for 24h) to avoid repeated network attempts.
|
|
"""
|
|
global _resolved_path, _install_failure_reason
|
|
|
|
# Fast path: successfully resolved on a previous call.
|
|
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
|
|
return _resolved_path
|
|
|
|
expanded = os.path.expanduser(configured_path)
|
|
explicit = _is_explicit_path(configured_path)
|
|
install_failed = _resolved_path is _INSTALL_FAILED
|
|
|
|
# Explicit path: check it and stop. Never auto-download a replacement.
|
|
if explicit:
|
|
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
|
|
_resolved_path = expanded
|
|
return expanded
|
|
# Also try shutil.which in case it's a bare name on PATH
|
|
found = shutil.which(expanded)
|
|
if found:
|
|
_resolved_path = found
|
|
return found
|
|
logger.warning("Configured tirith path %r not found; scanning disabled", configured_path)
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = "explicit_path_missing"
|
|
return expanded
|
|
|
|
# Default "tirith" — always re-run cheap local checks so a manual
|
|
# install is picked up even after a previous network failure (P2 fix:
|
|
# long-lived gateway/CLI recovers without restart).
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return found
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return hermes_bin
|
|
|
|
# Local checks failed. If a previous install attempt already failed,
|
|
# skip the network retry — UNLESS the failure was "cosign_missing" and
|
|
# cosign is now available (retryable cause resolved in-process).
|
|
if install_failed:
|
|
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
|
|
# Retryable cause resolved — clear sentinel and fall through to retry
|
|
_resolved_path = None
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
install_failed = False
|
|
else:
|
|
return expanded
|
|
|
|
# If a background install thread is running, don't start a parallel one —
|
|
# return the configured path; the OSError handler in check_command_security
|
|
# will apply fail_open until the thread finishes.
|
|
if _install_thread is not None and _install_thread.is_alive():
|
|
return expanded
|
|
|
|
# Check disk failure marker before attempting network download.
|
|
# Preserve the marker's real reason so in-memory retry logic can
|
|
# detect retryable causes (e.g. cosign_missing) without restart.
|
|
disk_reason = _read_failure_reason()
|
|
if disk_reason is not None and _is_install_failed_on_disk():
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = disk_reason
|
|
return expanded
|
|
|
|
installed, reason = _install_tirith()
|
|
if installed:
|
|
_resolved_path = installed
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return installed
|
|
|
|
# Install failed — cache the miss and persist reason to disk
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = reason
|
|
_mark_install_failed(reason)
|
|
return expanded
|
|
|
|
|
|
def _background_install(*, log_failures: bool = True):
|
|
"""Background thread target: download and install tirith."""
|
|
global _resolved_path, _install_failure_reason
|
|
with _install_lock:
|
|
# Double-check after acquiring lock (another thread may have resolved)
|
|
if _resolved_path is not None:
|
|
return
|
|
|
|
# Re-check local paths (may have been installed by another process)
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
return
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
return
|
|
|
|
installed, reason = _install_tirith(log_failures=log_failures)
|
|
if installed:
|
|
_resolved_path = installed
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
else:
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = reason
|
|
_mark_install_failed(reason)
|
|
|
|
|
|
def ensure_installed(*, log_failures: bool = True):
|
|
"""Ensure tirith is available, downloading in background if needed.
|
|
|
|
Quick PATH/local checks are synchronous; network download runs in a
|
|
daemon thread so startup never blocks. Safe to call multiple times.
|
|
Returns the resolved path immediately if available, or None.
|
|
"""
|
|
global _resolved_path, _install_thread, _install_failure_reason
|
|
|
|
cfg = _load_security_config()
|
|
if not cfg["tirith_enabled"]:
|
|
return None
|
|
|
|
# Already resolved from a previous call
|
|
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
|
|
path = _resolved_path
|
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
|
return path
|
|
return None
|
|
|
|
configured_path = cfg["tirith_path"]
|
|
explicit = _is_explicit_path(configured_path)
|
|
expanded = os.path.expanduser(configured_path)
|
|
|
|
# Explicit path: synchronous check only, no download
|
|
if explicit:
|
|
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
|
|
_resolved_path = expanded
|
|
return expanded
|
|
found = shutil.which(expanded)
|
|
if found:
|
|
_resolved_path = found
|
|
return found
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = "explicit_path_missing"
|
|
return None
|
|
|
|
# Default "tirith" — quick local checks first (no network)
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return found
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return hermes_bin
|
|
|
|
# If previously failed in-memory, check if the cause is now resolved
|
|
if _resolved_path is _INSTALL_FAILED:
|
|
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
|
|
_resolved_path = None
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
else:
|
|
return None
|
|
|
|
# Check disk failure marker (skip network attempt for 24h, unless
|
|
# the cosign_missing reason was resolved — handled by _is_install_failed_on_disk).
|
|
# Preserve the marker's real reason for in-memory retry logic.
|
|
disk_reason = _read_failure_reason()
|
|
if disk_reason is not None and _is_install_failed_on_disk():
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = disk_reason
|
|
return None
|
|
|
|
# Need to download — launch background thread so startup doesn't block
|
|
if _install_thread is None or not _install_thread.is_alive():
|
|
_install_thread = threading.Thread(
|
|
target=_background_install,
|
|
kwargs={"log_failures": log_failures},
|
|
daemon=True,
|
|
)
|
|
_install_thread.start()
|
|
|
|
return None # Not available yet; commands will fail-open until ready
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MAX_FINDINGS = 50
|
|
_MAX_SUMMARY_LEN = 500
|
|
|
|
|
|
def check_command_security(command: str) -> dict:
|
|
"""Run tirith security scan on a command.
|
|
|
|
Exit code determines action (0=allow, 1=block, 2=warn). JSON enriches
|
|
findings/summary. Spawn failures and timeouts respect fail_open config.
|
|
Programming errors propagate.
|
|
|
|
Returns:
|
|
{"action": "allow"|"warn"|"block", "findings": [...], "summary": str}
|
|
"""
|
|
cfg = _load_security_config()
|
|
|
|
if not cfg["tirith_enabled"]:
|
|
return {"action": "allow", "findings": [], "summary": ""}
|
|
|
|
tirith_path = _resolve_tirith_path(cfg["tirith_path"])
|
|
timeout = cfg["tirith_timeout"]
|
|
fail_open = cfg["tirith_fail_open"]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[tirith_path, "check", "--json", "--non-interactive",
|
|
"--shell", "posix", "--", command],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
except OSError as exc:
|
|
# Covers FileNotFoundError, PermissionError, exec format error
|
|
logger.warning("tirith spawn failed: %s", exc)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"}
|
|
return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"}
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("tirith timed out after %ds", timeout)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
|
|
return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}
|
|
|
|
# Map exit code to action
|
|
exit_code = result.returncode
|
|
if exit_code == 0:
|
|
action = "allow"
|
|
elif exit_code == 1:
|
|
action = "block"
|
|
elif exit_code == 2:
|
|
action = "warn"
|
|
else:
|
|
# Unknown exit code — respect fail_open
|
|
logger.warning("tirith returned unexpected exit code %d", exit_code)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith exit code {exit_code} (fail-open)"}
|
|
return {"action": "block", "findings": [], "summary": f"tirith exit code {exit_code} (fail-closed)"}
|
|
|
|
# Parse JSON for enrichment (never overrides the exit code verdict)
|
|
findings = []
|
|
summary = ""
|
|
try:
|
|
data = json.loads(result.stdout) if result.stdout.strip() else {}
|
|
raw_findings = data.get("findings", [])
|
|
findings = raw_findings[:_MAX_FINDINGS]
|
|
summary = (data.get("summary", "") or "")[:_MAX_SUMMARY_LEN]
|
|
except (json.JSONDecodeError, AttributeError):
|
|
# JSON parse failure degrades findings/summary, not the verdict
|
|
logger.debug("tirith JSON parse failed, using exit code only")
|
|
if action == "block":
|
|
summary = "security issue detected (details unavailable)"
|
|
elif action == "warn":
|
|
summary = "security warning detected (details unavailable)"
|
|
|
|
return {"action": action, "findings": findings, "summary": summary}
|