* fix: Anthropic OAuth compatibility — Claude Code identity fingerprinting
Anthropic routes OAuth/subscription requests based on Claude Code's
identity markers. Without them, requests get intermittent 500 errors
(~25% failure rate observed). This matches what pi-ai (clawdbot) and
OpenCode both implement for OAuth compatibility.
Changes (OAuth tokens only — API key users unaffected):
1. Headers: user-agent 'claude-cli/2.1.2 (external, cli)' + x-app 'cli'
2. System prompt: prepend 'You are Claude Code, Anthropic's official CLI'
3. System prompt sanitization: replace Hermes/Nous references
4. Tool names: prefix with 'mcp_' (Claude Code convention for non-native tools)
5. Tool name stripping: remove 'mcp_' prefix from response tool calls
Before: 9/12 OK, 1 hard fail, 4 needed retries (~25% error rate)
After: 16/16 OK, 0 failures, 0 retries (0% error rate)
* fix: three gateway issues from user error logs
1. send_animation missing metadata kwarg (base.py)
- Base class send_animation lacked the metadata parameter that the
call site in base.py line 917 passes. Telegram's override accepted
it, but any platform without an override (Discord, Slack, etc.)
hit TypeError. Added metadata to base class signature.
2. MarkdownV2 split-inside-inline-code (base.py truncate_message)
- truncate_message could split at a space inside an inline code span
(e.g. `function(arg1, arg2)`), leaving an unpaired backtick and
unescaped parentheses in the chunk. Telegram rejects with
'character ( is reserved'. Added inline code awareness to the
split-point finder — detects odd backtick counts and moves the
split before the code span.
3. tirith auto-install without cosign (tirith_security.py)
- Previously required cosign on PATH for auto-install, blocking
install entirely with a warning if missing. Now proceeds with
SHA-256 checksum verification only when cosign is unavailable.
Cosign is still used for full supply chain verification when
present. If cosign IS present but verification explicitly fails,
install is still aborted (tampered release).
675 lines
25 KiB
Python
675 lines
25 KiB
Python
"""Tirith pre-exec security scanning wrapper.
|
|
|
|
Runs the tirith binary as a subprocess to scan commands for content-level
|
|
threats (homograph URLs, pipe-to-interpreter, terminal injection, etc.).
|
|
|
|
Exit code is the verdict source of truth:
|
|
0 = allow, 1 = block, 2 = warn
|
|
|
|
JSON stdout enriches findings/summary but never overrides the verdict.
|
|
Operational failures (spawn error, timeout, unknown exit code) respect
|
|
the fail_open config setting. Programming errors propagate.
|
|
|
|
Auto-install: if tirith is not found on PATH or at the configured path,
|
|
it is automatically downloaded from GitHub releases to $HERMES_HOME/bin/tirith.
|
|
The download always verifies SHA-256 checksums. When cosign is available on
|
|
PATH, provenance verification (GitHub Actions workflow signature) is also
|
|
performed. If cosign is not installed, the download proceeds with SHA-256
|
|
verification only — still secure via HTTPS + checksum, just without supply
|
|
chain provenance proof. Installation runs in a background thread so startup
|
|
never blocks.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tarfile
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_REPO = "sheeki03/tirith"
|
|
|
|
# Cosign provenance verification — pinned to the specific release workflow
|
|
_COSIGN_IDENTITY_REGEXP = f"^https://github.com/{_REPO}/\\.github/workflows/release\\.yml@refs/tags/v"
|
|
_COSIGN_ISSUER = "https://token.actions.githubusercontent.com"
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _env_bool(key: str, default: bool) -> bool:
|
|
val = os.getenv(key)
|
|
if val is None:
|
|
return default
|
|
return val.lower() in ("1", "true", "yes")
|
|
|
|
|
|
def _env_int(key: str, default: int) -> int:
|
|
val = os.getenv(key)
|
|
if val is None:
|
|
return default
|
|
try:
|
|
return int(val)
|
|
except ValueError:
|
|
return default
|
|
|
|
|
|
def _load_security_config() -> dict:
|
|
"""Load security settings from config.yaml, with env var overrides."""
|
|
defaults = {
|
|
"tirith_enabled": True,
|
|
"tirith_path": "tirith",
|
|
"tirith_timeout": 5,
|
|
"tirith_fail_open": True,
|
|
}
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
cfg = load_config().get("security", {}) or {}
|
|
except Exception:
|
|
cfg = {}
|
|
|
|
return {
|
|
"tirith_enabled": _env_bool("TIRITH_ENABLED", cfg.get("tirith_enabled", defaults["tirith_enabled"])),
|
|
"tirith_path": os.getenv("TIRITH_BIN", cfg.get("tirith_path", defaults["tirith_path"])),
|
|
"tirith_timeout": _env_int("TIRITH_TIMEOUT", cfg.get("tirith_timeout", defaults["tirith_timeout"])),
|
|
"tirith_fail_open": _env_bool("TIRITH_FAIL_OPEN", cfg.get("tirith_fail_open", defaults["tirith_fail_open"])),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-install
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Cached path after first resolution (avoids repeated shutil.which per command).
|
|
# _INSTALL_FAILED means "we tried and failed" — prevents retry on every command.
|
|
_resolved_path: str | None | bool = None
|
|
_INSTALL_FAILED = False # sentinel: distinct from "not yet tried"
|
|
_install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_FAILED
|
|
|
|
# Background install thread coordination
|
|
_install_lock = threading.Lock()
|
|
_install_thread: threading.Thread | None = None
|
|
|
|
# Disk-persistent failure marker — avoids retry across process restarts
|
|
_MARKER_TTL = 86400 # 24 hours
|
|
|
|
|
|
def _get_hermes_home() -> str:
|
|
"""Return the Hermes home directory, respecting HERMES_HOME env var.
|
|
|
|
Matches the convention used throughout the codebase (hermes_cli.config,
|
|
cli.py, gateway/run.py, etc.) so tirith state stays inside the active
|
|
profile and tests get automatic isolation via conftest's HERMES_HOME
|
|
monkeypatch.
|
|
"""
|
|
return os.getenv("HERMES_HOME") or os.path.join(os.path.expanduser("~"), ".hermes")
|
|
|
|
|
|
def _failure_marker_path() -> str:
|
|
"""Return the path to the install-failure marker file."""
|
|
return os.path.join(_get_hermes_home(), ".tirith-install-failed")
|
|
|
|
|
|
def _read_failure_reason() -> str | None:
|
|
"""Read the failure reason from the disk marker.
|
|
|
|
Returns the reason string, or None if the marker doesn't exist or is
|
|
older than _MARKER_TTL.
|
|
"""
|
|
try:
|
|
p = _failure_marker_path()
|
|
mtime = os.path.getmtime(p)
|
|
if (time.time() - mtime) >= _MARKER_TTL:
|
|
return None
|
|
with open(p, "r") as f:
|
|
return f.read().strip()
|
|
except OSError:
|
|
return None
|
|
|
|
|
|
def _is_install_failed_on_disk() -> bool:
|
|
"""Check if a recent install failure was persisted to disk.
|
|
|
|
Returns False (allowing retry) when:
|
|
- No marker exists
|
|
- Marker is older than _MARKER_TTL (24h)
|
|
- Marker reason is 'cosign_missing' and cosign is now on PATH
|
|
"""
|
|
reason = _read_failure_reason()
|
|
if reason is None:
|
|
return False
|
|
if reason == "cosign_missing" and shutil.which("cosign"):
|
|
_clear_install_failed()
|
|
return False
|
|
return True
|
|
|
|
|
|
def _mark_install_failed(reason: str = ""):
|
|
"""Persist install failure to disk to avoid retry on next process.
|
|
|
|
Args:
|
|
reason: Short tag identifying the failure cause. Use "cosign_missing"
|
|
when cosign is not on PATH so the marker can be auto-cleared
|
|
once cosign becomes available.
|
|
"""
|
|
try:
|
|
p = _failure_marker_path()
|
|
os.makedirs(os.path.dirname(p), exist_ok=True)
|
|
with open(p, "w") as f:
|
|
f.write(reason)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _clear_install_failed():
|
|
"""Remove the failure marker after successful install."""
|
|
try:
|
|
os.unlink(_failure_marker_path())
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def _hermes_bin_dir() -> str:
|
|
"""Return $HERMES_HOME/bin, creating it if needed."""
|
|
d = os.path.join(_get_hermes_home(), "bin")
|
|
os.makedirs(d, exist_ok=True)
|
|
return d
|
|
|
|
|
|
def _detect_target() -> str | None:
|
|
"""Return the Rust target triple for the current platform, or None."""
|
|
system = platform.system()
|
|
machine = platform.machine().lower()
|
|
|
|
if system == "Darwin":
|
|
plat = "apple-darwin"
|
|
elif system == "Linux":
|
|
plat = "unknown-linux-gnu"
|
|
else:
|
|
return None
|
|
|
|
if machine in ("x86_64", "amd64"):
|
|
arch = "x86_64"
|
|
elif machine in ("aarch64", "arm64"):
|
|
arch = "aarch64"
|
|
else:
|
|
return None
|
|
|
|
return f"{arch}-{plat}"
|
|
|
|
|
|
def _download_file(url: str, dest: str, timeout: int = 10):
|
|
"""Download a URL to a local file."""
|
|
req = urllib.request.Request(url)
|
|
token = os.getenv("GITHUB_TOKEN")
|
|
if token:
|
|
req.add_header("Authorization", f"token {token}")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp, open(dest, "wb") as f:
|
|
shutil.copyfileobj(resp, f)
|
|
|
|
|
|
def _verify_cosign(checksums_path: str, sig_path: str, cert_path: str) -> bool | None:
|
|
"""Verify cosign provenance signature on checksums.txt.
|
|
|
|
Returns:
|
|
True — cosign verified successfully
|
|
False — cosign found but verification failed
|
|
None — cosign not available (not on PATH, or execution failed)
|
|
|
|
The caller treats both False and None as "abort auto-install" — only
|
|
True allows the install to proceed.
|
|
"""
|
|
cosign = shutil.which("cosign")
|
|
if not cosign:
|
|
logger.info("cosign not found on PATH")
|
|
return None
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[cosign, "verify-blob",
|
|
"--certificate", cert_path,
|
|
"--signature", sig_path,
|
|
"--certificate-identity-regexp", _COSIGN_IDENTITY_REGEXP,
|
|
"--certificate-oidc-issuer", _COSIGN_ISSUER,
|
|
checksums_path],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info("cosign provenance verification passed")
|
|
return True
|
|
else:
|
|
logger.warning("cosign verification failed (exit %d): %s",
|
|
result.returncode, result.stderr.strip())
|
|
return False
|
|
except (OSError, subprocess.TimeoutExpired) as exc:
|
|
logger.warning("cosign execution failed: %s", exc)
|
|
return None
|
|
|
|
|
|
def _verify_checksum(archive_path: str, checksums_path: str, archive_name: str) -> bool:
|
|
"""Verify SHA-256 of the archive against checksums.txt."""
|
|
expected = None
|
|
with open(checksums_path) as f:
|
|
for line in f:
|
|
# Format: "<hash> <filename>"
|
|
parts = line.strip().split(" ", 1)
|
|
if len(parts) == 2 and parts[1] == archive_name:
|
|
expected = parts[0]
|
|
break
|
|
if not expected:
|
|
logger.warning("No checksum entry for %s", archive_name)
|
|
return False
|
|
|
|
sha = hashlib.sha256()
|
|
with open(archive_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(8192), b""):
|
|
sha.update(chunk)
|
|
actual = sha.hexdigest()
|
|
if actual != expected:
|
|
logger.warning("Checksum mismatch: expected %s, got %s", expected, actual)
|
|
return False
|
|
return True
|
|
|
|
|
|
def _install_tirith(*, log_failures: bool = True) -> tuple[str | None, str]:
|
|
"""Download and install tirith to $HERMES_HOME/bin/tirith.
|
|
|
|
Verifies provenance via cosign and SHA-256 checksum.
|
|
Returns (installed_path, failure_reason). On success failure_reason is "".
|
|
failure_reason is a short tag used by the disk marker to decide if the
|
|
failure is retryable (e.g. "cosign_missing" clears when cosign appears).
|
|
"""
|
|
log = logger.warning if log_failures else logger.debug
|
|
|
|
target = _detect_target()
|
|
if not target:
|
|
logger.info("tirith auto-install: unsupported platform %s/%s",
|
|
platform.system(), platform.machine())
|
|
return None, "unsupported_platform"
|
|
|
|
archive_name = f"tirith-{target}.tar.gz"
|
|
base_url = f"https://github.com/{_REPO}/releases/latest/download"
|
|
|
|
tmpdir = tempfile.mkdtemp(prefix="tirith-install-")
|
|
try:
|
|
archive_path = os.path.join(tmpdir, archive_name)
|
|
checksums_path = os.path.join(tmpdir, "checksums.txt")
|
|
sig_path = os.path.join(tmpdir, "checksums.txt.sig")
|
|
cert_path = os.path.join(tmpdir, "checksums.txt.pem")
|
|
|
|
logger.info("tirith not found — downloading latest release for %s...", target)
|
|
|
|
try:
|
|
_download_file(f"{base_url}/{archive_name}", archive_path)
|
|
_download_file(f"{base_url}/checksums.txt", checksums_path)
|
|
except Exception as exc:
|
|
log("tirith download failed: %s", exc)
|
|
return None, "download_failed"
|
|
|
|
# Cosign provenance verification — preferred but not mandatory.
|
|
# When cosign is available, we verify that the release was produced
|
|
# by the expected GitHub Actions workflow (full supply chain proof).
|
|
# Without cosign, SHA-256 checksum + HTTPS still provides integrity
|
|
# and transport-level authenticity.
|
|
cosign_verified = False
|
|
if shutil.which("cosign"):
|
|
try:
|
|
_download_file(f"{base_url}/checksums.txt.sig", sig_path)
|
|
_download_file(f"{base_url}/checksums.txt.pem", cert_path)
|
|
except Exception as exc:
|
|
logger.info("cosign artifacts unavailable (%s), proceeding with SHA-256 only", exc)
|
|
else:
|
|
cosign_result = _verify_cosign(checksums_path, sig_path, cert_path)
|
|
if cosign_result is True:
|
|
cosign_verified = True
|
|
elif cosign_result is False:
|
|
# Verification explicitly rejected — abort, the release
|
|
# may have been tampered with.
|
|
log("tirith install aborted: cosign provenance verification failed")
|
|
return None, "cosign_verification_failed"
|
|
else:
|
|
# None = execution failure (timeout/OSError) — proceed
|
|
# with SHA-256 only since cosign itself is broken.
|
|
logger.info("cosign execution failed, proceeding with SHA-256 only")
|
|
else:
|
|
logger.info("cosign not on PATH — installing tirith with SHA-256 verification only "
|
|
"(install cosign for full supply chain verification)")
|
|
|
|
if not _verify_checksum(archive_path, checksums_path, archive_name):
|
|
return None, "checksum_failed"
|
|
|
|
with tarfile.open(archive_path, "r:gz") as tar:
|
|
# Extract only the tirith binary (safety: reject paths with ..)
|
|
for member in tar.getmembers():
|
|
if member.name == "tirith" or member.name.endswith("/tirith"):
|
|
if ".." in member.name:
|
|
continue
|
|
member.name = "tirith"
|
|
tar.extract(member, tmpdir)
|
|
break
|
|
else:
|
|
log("tirith binary not found in archive")
|
|
return None, "binary_not_in_archive"
|
|
|
|
src = os.path.join(tmpdir, "tirith")
|
|
dest = os.path.join(_hermes_bin_dir(), "tirith")
|
|
shutil.move(src, dest)
|
|
os.chmod(dest, os.stat(dest).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
|
|
|
verification = "cosign + SHA-256" if cosign_verified else "SHA-256 only"
|
|
logger.info("tirith installed to %s (%s)", dest, verification)
|
|
return dest, ""
|
|
|
|
finally:
|
|
shutil.rmtree(tmpdir, ignore_errors=True)
|
|
|
|
|
|
def _is_explicit_path(configured_path: str) -> bool:
|
|
"""Return True if the user explicitly configured a non-default tirith path."""
|
|
return configured_path != "tirith"
|
|
|
|
|
|
def _resolve_tirith_path(configured_path: str) -> str:
|
|
"""Resolve the tirith binary path, auto-installing if necessary.
|
|
|
|
If the user explicitly set a path (anything other than the bare "tirith"
|
|
default), that path is authoritative — we never fall through to
|
|
auto-download a different binary.
|
|
|
|
For the default "tirith":
|
|
1. PATH lookup via shutil.which
|
|
2. $HERMES_HOME/bin/tirith (previously auto-installed)
|
|
3. Auto-install from GitHub releases → $HERMES_HOME/bin/tirith
|
|
|
|
Failed installs are cached for the process lifetime (and persisted to
|
|
disk for 24h) to avoid repeated network attempts.
|
|
"""
|
|
global _resolved_path, _install_failure_reason
|
|
|
|
# Fast path: successfully resolved on a previous call.
|
|
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
|
|
return _resolved_path
|
|
|
|
expanded = os.path.expanduser(configured_path)
|
|
explicit = _is_explicit_path(configured_path)
|
|
install_failed = _resolved_path is _INSTALL_FAILED
|
|
|
|
# Explicit path: check it and stop. Never auto-download a replacement.
|
|
if explicit:
|
|
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
|
|
_resolved_path = expanded
|
|
return expanded
|
|
# Also try shutil.which in case it's a bare name on PATH
|
|
found = shutil.which(expanded)
|
|
if found:
|
|
_resolved_path = found
|
|
return found
|
|
logger.warning("Configured tirith path %r not found; scanning disabled", configured_path)
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = "explicit_path_missing"
|
|
return expanded
|
|
|
|
# Default "tirith" — always re-run cheap local checks so a manual
|
|
# install is picked up even after a previous network failure (P2 fix:
|
|
# long-lived gateway/CLI recovers without restart).
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return found
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return hermes_bin
|
|
|
|
# Local checks failed. If a previous install attempt already failed,
|
|
# skip the network retry — UNLESS the failure was "cosign_missing" and
|
|
# cosign is now available (retryable cause resolved in-process).
|
|
if install_failed:
|
|
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
|
|
# Retryable cause resolved — clear sentinel and fall through to retry
|
|
_resolved_path = None
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
install_failed = False
|
|
else:
|
|
return expanded
|
|
|
|
# If a background install thread is running, don't start a parallel one —
|
|
# return the configured path; the OSError handler in check_command_security
|
|
# will apply fail_open until the thread finishes.
|
|
if _install_thread is not None and _install_thread.is_alive():
|
|
return expanded
|
|
|
|
# Check disk failure marker before attempting network download.
|
|
# Preserve the marker's real reason so in-memory retry logic can
|
|
# detect retryable causes (e.g. cosign_missing) without restart.
|
|
disk_reason = _read_failure_reason()
|
|
if disk_reason is not None and _is_install_failed_on_disk():
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = disk_reason
|
|
return expanded
|
|
|
|
installed, reason = _install_tirith()
|
|
if installed:
|
|
_resolved_path = installed
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return installed
|
|
|
|
# Install failed — cache the miss and persist reason to disk
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = reason
|
|
_mark_install_failed(reason)
|
|
return expanded
|
|
|
|
|
|
def _background_install(*, log_failures: bool = True):
|
|
"""Background thread target: download and install tirith."""
|
|
global _resolved_path, _install_failure_reason
|
|
with _install_lock:
|
|
# Double-check after acquiring lock (another thread may have resolved)
|
|
if _resolved_path is not None:
|
|
return
|
|
|
|
# Re-check local paths (may have been installed by another process)
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
return
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
return
|
|
|
|
installed, reason = _install_tirith(log_failures=log_failures)
|
|
if installed:
|
|
_resolved_path = installed
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
else:
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = reason
|
|
_mark_install_failed(reason)
|
|
|
|
|
|
def ensure_installed(*, log_failures: bool = True):
|
|
"""Ensure tirith is available, downloading in background if needed.
|
|
|
|
Quick PATH/local checks are synchronous; network download runs in a
|
|
daemon thread so startup never blocks. Safe to call multiple times.
|
|
Returns the resolved path immediately if available, or None.
|
|
"""
|
|
global _resolved_path, _install_thread, _install_failure_reason
|
|
|
|
cfg = _load_security_config()
|
|
if not cfg["tirith_enabled"]:
|
|
return None
|
|
|
|
# Already resolved from a previous call
|
|
if _resolved_path is not None and _resolved_path is not _INSTALL_FAILED:
|
|
path = _resolved_path
|
|
if os.path.isfile(path) and os.access(path, os.X_OK):
|
|
return path
|
|
return None
|
|
|
|
configured_path = cfg["tirith_path"]
|
|
explicit = _is_explicit_path(configured_path)
|
|
expanded = os.path.expanduser(configured_path)
|
|
|
|
# Explicit path: synchronous check only, no download
|
|
if explicit:
|
|
if os.path.isfile(expanded) and os.access(expanded, os.X_OK):
|
|
_resolved_path = expanded
|
|
return expanded
|
|
found = shutil.which(expanded)
|
|
if found:
|
|
_resolved_path = found
|
|
return found
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = "explicit_path_missing"
|
|
return None
|
|
|
|
# Default "tirith" — quick local checks first (no network)
|
|
found = shutil.which("tirith")
|
|
if found:
|
|
_resolved_path = found
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return found
|
|
|
|
hermes_bin = os.path.join(_hermes_bin_dir(), "tirith")
|
|
if os.path.isfile(hermes_bin) and os.access(hermes_bin, os.X_OK):
|
|
_resolved_path = hermes_bin
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
return hermes_bin
|
|
|
|
# If previously failed in-memory, check if the cause is now resolved
|
|
if _resolved_path is _INSTALL_FAILED:
|
|
if _install_failure_reason == "cosign_missing" and shutil.which("cosign"):
|
|
_resolved_path = None
|
|
_install_failure_reason = ""
|
|
_clear_install_failed()
|
|
else:
|
|
return None
|
|
|
|
# Check disk failure marker (skip network attempt for 24h, unless
|
|
# the cosign_missing reason was resolved — handled by _is_install_failed_on_disk).
|
|
# Preserve the marker's real reason for in-memory retry logic.
|
|
disk_reason = _read_failure_reason()
|
|
if disk_reason is not None and _is_install_failed_on_disk():
|
|
_resolved_path = _INSTALL_FAILED
|
|
_install_failure_reason = disk_reason
|
|
return None
|
|
|
|
# Need to download — launch background thread so startup doesn't block
|
|
if _install_thread is None or not _install_thread.is_alive():
|
|
_install_thread = threading.Thread(
|
|
target=_background_install,
|
|
kwargs={"log_failures": log_failures},
|
|
daemon=True,
|
|
)
|
|
_install_thread.start()
|
|
|
|
return None # Not available yet; commands will fail-open until ready
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_MAX_FINDINGS = 50
|
|
_MAX_SUMMARY_LEN = 500
|
|
|
|
|
|
def check_command_security(command: str) -> dict:
|
|
"""Run tirith security scan on a command.
|
|
|
|
Exit code determines action (0=allow, 1=block, 2=warn). JSON enriches
|
|
findings/summary. Spawn failures and timeouts respect fail_open config.
|
|
Programming errors propagate.
|
|
|
|
Returns:
|
|
{"action": "allow"|"warn"|"block", "findings": [...], "summary": str}
|
|
"""
|
|
cfg = _load_security_config()
|
|
|
|
if not cfg["tirith_enabled"]:
|
|
return {"action": "allow", "findings": [], "summary": ""}
|
|
|
|
tirith_path = _resolve_tirith_path(cfg["tirith_path"])
|
|
timeout = cfg["tirith_timeout"]
|
|
fail_open = cfg["tirith_fail_open"]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
[tirith_path, "check", "--json", "--non-interactive",
|
|
"--shell", "posix", "--", command],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout,
|
|
)
|
|
except OSError as exc:
|
|
# Covers FileNotFoundError, PermissionError, exec format error
|
|
logger.warning("tirith spawn failed: %s", exc)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"}
|
|
return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"}
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("tirith timed out after %ds", timeout)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
|
|
return {"action": "block", "findings": [], "summary": f"tirith timed out (fail-closed)"}
|
|
|
|
# Map exit code to action
|
|
exit_code = result.returncode
|
|
if exit_code == 0:
|
|
action = "allow"
|
|
elif exit_code == 1:
|
|
action = "block"
|
|
elif exit_code == 2:
|
|
action = "warn"
|
|
else:
|
|
# Unknown exit code — respect fail_open
|
|
logger.warning("tirith returned unexpected exit code %d", exit_code)
|
|
if fail_open:
|
|
return {"action": "allow", "findings": [], "summary": f"tirith exit code {exit_code} (fail-open)"}
|
|
return {"action": "block", "findings": [], "summary": f"tirith exit code {exit_code} (fail-closed)"}
|
|
|
|
# Parse JSON for enrichment (never overrides the exit code verdict)
|
|
findings = []
|
|
summary = ""
|
|
try:
|
|
data = json.loads(result.stdout) if result.stdout.strip() else {}
|
|
raw_findings = data.get("findings", [])
|
|
findings = raw_findings[:_MAX_FINDINGS]
|
|
summary = (data.get("summary", "") or "")[:_MAX_SUMMARY_LEN]
|
|
except (json.JSONDecodeError, AttributeError):
|
|
# JSON parse failure degrades findings/summary, not the verdict
|
|
logger.debug("tirith JSON parse failed, using exit code only")
|
|
if action == "block":
|
|
summary = "security issue detected (details unavailable)"
|
|
elif action == "warn":
|
|
summary = "security warning detected (details unavailable)"
|
|
|
|
return {"action": action, "findings": findings, "summary": summary}
|