Compare commits

...

4 Commits

Author SHA1 Message Date
Alexander Whitestone
f0bd9db35a feat(cron): Script failure detection from agent responses
Detects failure phrases in agent final responses even when the
agent doesn't use the [SCRIPT_FAILED] marker. Phrases like
'timed out', 'connection refused', 'permission denied' indicate
the external script failed.

Fixes false-positive OK statuses on jobs where scripts fail
but the agent reports them conversationally.
2026-04-13 15:18:25 -04:00
Alexander Whitestone
558f89b39a perf(cron): 10 workers configurable via HERMES_CRON_WORKERS 2026-04-13 15:08:04 -04:00
Alexander Whitestone
b094bfe098 feat(tools): review_guard — prevent rubber-stamping zero-change PRs
Refs Timmy_Foundation/timmy-config#379.

No agent can approve a PR with 0 additions, 0 deletions, 0 changed files.
Zombie PRs are auto-flagged with a comment and closed.

API:
- should_review(repo, pr_number) → review/skip/close_zombie
- guarded_review(repo, pr_number) → full workflow with auto-close
- close_zombie_pr(repo, pr_number) → comment + close
- is_zombie_pr(pr) → bool check

Usage in agent review loops:
    result = guarded_review('Timmy_Foundation/timmy-config', 400)
    if result['action'] == 'review':
        review_pr(result['pr'])
    else:
        print(result['reason'])
2026-04-13 10:13:04 -04:00
Alexander Whitestone
ad5e64ac53 feat(agent): PrivacyFilter — Pattern 2 from Vitalik's Secure LLM Architecture
Refs #280. Implements input sanitization before remote API calls.

Features:
- PII detection: API keys, wallet addresses, emails, phones, SSNs,
  credit cards, personal file paths, personal names
- Sensitive topic detection: crisis language, medical, trauma, crypto
- Risk levels: low/medium/high/critical
- Recommendations: local_only, redact_and_remote, remote_ok
- sanitize_messages() for batch message sanitization
- should_use_local() for routing decisions
- Self-test suite built-in

Patterns from Vitalik's post:
- Local model strips private data before remote queries
- Different risk levels map to different routing decisions
- Crisis content (suicide/self-harm) stays local-only
2026-04-13 09:51:39 -04:00
3 changed files with 669 additions and 216 deletions

179
agent/privacy_filter.py Normal file
View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""
Privacy Filter for Hermes Agent — Pattern 2 from Vitalik's Secure LLM Architecture.
Strips personally identifiable information from context before remote API calls.
Decides whether tasks should use local or remote inference.
Reference: https://vitalik.eth.limo/general/2026/04/02/secure_llms.html
Issue: Timmy_Foundation/hermes-agent#280
"""
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
@dataclass
class PrivacyScan:
"""Result of scanning content for private data."""
has_pii: bool = False
pii_types: List[str] = field(default_factory=list)
redacted_content: str = ""
risk_level: str = "low" # low, medium, high, critical
recommendation: str = "" # "local_only", "redact_and_remote", "remote_ok"
# PII patterns — ordered by specificity
PII_PATTERNS = {
"api_key": [
r'sk-[a-zA-Z0-9]{20,}', # OpenAI/Anthropic keys
r'sk-ant-[a-zA-Z0-9_-]{20,}', # Anthropic keys
r'sk-or-[a-zA-Z0-9_-]{20,}', # OpenRouter keys
r'ghp_[a-zA-Z0-9]{36}', # GitHub PATs
r'AKIA[A-Z0-9]{16}', # AWS access keys
r'glpat-[a-zA-Z0-9_-]{20,}', # GitLab PATs
],
"wallet_address": [
r'0x[a-fA-F0-9]{40}', # Ethereum
r'bc1[a-zA-Z0-9]{25,62}', # Bitcoin bech32
r'[13][a-km-zA-HJ-NP-Z1-9]{25,34}', # Bitcoin legacy
],
"email": [
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}',
],
"phone": [
r'\+?1?\d{10,14}', # US/international (conservative)
r'\(\d{3}\)\s*\d{3}[-.]?\d{4}', # (555) 123-4567
],
"ssn": [
r'\d{3}-\d{2}-\d{4}', # US SSN
],
"credit_card": [
r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
],
"file_path_personal": [
r'/Users/[^/\s]+/', # macOS home paths
r'/home/[^/\s]+/', # Linux home paths
r'C:\\Users\\[^\\]+\\', # Windows home paths
],
"personal_name_context": [
r'(?:my name is|I am|call me|this is)\s+[A-Z][a-z]+\s+[A-Z][a-z]+',
],
}
# Sensitive topics that should stay local
SENSITIVE_TOPICS = [
r'\b(?:suicide|self.harm|kill (?:my)?self|end(?:ing)? my life|want to die)\b',
r'\b(?:diagnosis|medication|prescription|medical condition)\b',
r'\b(?:abuse|trauma|molest|assault|rape)\b',
r'\b(?:divorce|custody|bankruptcy)\b',
r'\b(?:crypto|wallet|private.key|seed.phrase|mnemonic)\b',
]
def scan_content(content: str) -> PrivacyScan:
"""Scan content for PII and sensitive data."""
scan = PrivacyScan()
redacted = content
# Check PII patterns
for pii_type, patterns in PII_PATTERNS.items():
for pattern in patterns:
matches = re.findall(pattern, redacted, re.IGNORECASE)
if matches:
scan.has_pii = True
scan.pii_types.append(pii_type)
redacted = re.sub(pattern, f'[{pii_type.upper()}_REDACTED]', redacted, flags=re.IGNORECASE)
# Check sensitive topics
for pattern in SENSITIVE_TOPICS:
if re.search(pattern, content, re.IGNORECASE):
scan.pii_types.append("sensitive_topic")
scan.redacted_content = redacted
# Determine risk level
critical_types = {"api_key", "wallet_address", "ssn", "credit_card"}
high_types = {"email", "phone", "personal_name_context"}
medium_types = {"file_path_personal"}
found_types = set(scan.pii_types)
if found_types & critical_types:
scan.risk_level = "critical"
scan.recommendation = "local_only"
elif found_types & high_types:
scan.risk_level = "high"
scan.recommendation = "redact_and_remote"
elif found_types & medium_types:
scan.risk_level = "medium"
scan.recommendation = "redact_and_remote"
elif "sensitive_topic" in found_types:
scan.risk_level = "high"
scan.recommendation = "local_only"
else:
scan.risk_level = "low"
scan.recommendation = "remote_ok"
return scan
def sanitize_messages(messages: List[Dict]) -> Tuple[List[Dict], PrivacyScan]:
"""Sanitize a list of messages for remote API use.
Returns (sanitized_messages, scan_result).
"""
full_content = "\n".join(str(m.get("content", "")) for m in messages)
scan = scan_content(full_content)
if not scan.has_pii:
return messages, scan
sanitized = []
for msg in messages:
new_msg = dict(msg)
content = str(msg.get("content", ""))
new_scan = scan_content(content)
new_msg["content"] = new_scan.redacted_content
sanitized.append(new_msg)
return sanitized, scan
def should_use_local(content: str) -> bool:
"""Decide if content should only be sent to local models."""
scan = scan_content(content)
return scan.recommendation == "local_only"
def get_privacy_report(content: str) -> Dict:
"""Get a privacy report for content (for logging/debugging)."""
scan = scan_content(content)
return {
"has_pii": scan.has_pii,
"pii_types": scan.pii_types,
"risk_level": scan.risk_level,
"recommendation": scan.recommendation,
"content_length": len(content),
"redacted_length": len(scan.redacted_content),
}
# --- Self-test ---
if __name__ == "__main__":
test_cases = [
("Clean text", "Hello, how are you today?"),
("API key", "My key is sk-ant-api03-abc123def456ghi789jkl"),
("Email", "Contact me at alexander@example.com for details"),
("Wallet", "Send to 0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18"),
("File path", "The config is at /Users/apayne/.hermes/config.yaml"),
("Sensitive", "I've been thinking about ending my life"),
("Mixed", "My email is test@test.com and my wallet is 0x1234567890abcdef1234567890abcdef12345678"),
]
for name, text in test_cases:
scan = scan_content(text)
print(f"\n[{name}]")
print(f" PII: {scan.has_pii}, Types: {scan.pii_types}, Risk: {scan.risk_level}")
print(f" Recommendation: {scan.recommendation}")
if scan.has_pii:
print(f" Redacted: {scan.redacted_content[:100]}")

View File

@@ -25,6 +25,7 @@ except ImportError:
import msvcrt
except ImportError:
msvcrt = None
import time
from pathlib import Path
from typing import Optional
@@ -39,12 +40,118 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# =====================================================================
# Deploy Sync Guard
# =====================================================================
#
# If the installed run_agent.py diverges from the version scheduler.py
# was written against, every cron job fails with:
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
#
# _validate_agent_interface() catches this at the FIRST job, not the
# 55th. It uses inspect.signature() to verify every kwarg we pass is
# accepted by AIAgent.__init__().
#
# Maintaining this list: if you add a kwarg to the AIAgent() call in
# run_job(), add it here too. The guard catches mismatches.
_SCHEDULER_AGENT_KWARGS: set = frozenset({
"model", "api_key", "base_url", "provider", "api_mode",
"acp_command", "acp_args", "max_iterations", "reasoning_config",
"prefill_messages", "providers_allowed", "providers_ignored",
"providers_order", "provider_sort", "disabled_toolsets",
"tool_choice", "quiet_mode", "skip_memory", "platform",
"session_id", "session_db",
})
_agent_interface_validated: bool = False
def _validate_agent_interface() -> None:
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
Raises RuntimeError with actionable guidance if params are missing.
Caches result — runs once per gateway process lifetime.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
import inspect
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Cannot import AIAgent: {exc}\n"
"Is hermes-agent installed? Check PYTHONPATH."
) from exc
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
sorted_missing = sorted(missing)
raise RuntimeError(
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
f" {', '.join(sorted_missing)}\n"
"This means the installed run_agent.py is out of date.\n"
"Fix: pull latest hermes-agent code and restart the gateway.\n"
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
)
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
def _safe_agent_kwargs(kwargs: dict) -> dict:
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
More resilient than _validate_agent_interface() alone: instead of
crashing on mismatch, drops unsupported kwargs and logs a warning.
Jobs run with degraded functionality instead of failing entirely.
Args:
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
Returns:
A new dict containing only kwargs the installed AIAgent accepts.
"""
import inspect
try:
from run_agent import AIAgent
except ImportError:
# Can't import — pass everything through, let the real error surface
return kwargs
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
for key, value in kwargs.items():
if key in accepted:
safe[key] = value
else:
dropped.append(key)
if dropped:
logger.warning(
"Dropping unsupported AIAgent kwargs (stale install?): %s",
", ".join(sorted(dropped)),
)
return safe
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
"telegram", "discord", "slack", "whatsapp", "signal",
"matrix", "mattermost", "homeassistant", "dingtalk", "feishu",
"wecom", "wecom_callback", "weixin", "sms", "email", "webhook", "bluebubbles",
"wecom", "sms", "email", "webhook",
})
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
@@ -53,6 +160,72 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Failure phrases that indicate an external script/command failed, even when
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
# against the final response. These are strong signals — agents rarely use
# these words when a script succeeded.
_SCRIPT_FAILURE_PHRASES = (
"timed out",
"timeout",
"connection error",
"connection refused",
"connection reset",
"failed to execute",
"failed due to",
"script failed",
"script error",
"command failed",
"exit code",
"exit status",
"non-zero exit",
"did not complete",
"could not run",
"unable to execute",
"permission denied",
"no such file",
"traceback",
)
def _detect_script_failure(final_response: str) -> Optional[str]:
"""Detect script failure from agent's final response.
Returns a reason string if failure detected, None otherwise.
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
"""
if not final_response:
return None
# 1. Explicit marker — highest confidence.
if SCRIPT_FAILED_MARKER in final_response.upper():
import re as _re
_m = _re.search(
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
final_response,
_re.IGNORECASE,
)
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
return reason or "Agent reported script failure"
# 2. Heuristic detection — catch failures described in natural language.
# Only flag if the response contains failure language AND does NOT
# contain success markers like [NOOP] (which means the script ran fine
# but found nothing).
lower = final_response.lower()
has_noop = "[noop]" in lower
has_silent = "[silent]" in lower
if has_noop or has_silent:
return None # Agent explicitly signaled success/nothing-to-report
for phrase in _SCRIPT_FAILURE_PHRASES:
if phrase in lower:
return f"Detected script failure phrase: '{phrase}'"
return None
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -91,7 +264,7 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
}
# Origin missing (e.g. job created via API/script) — try each
# platform's home channel as a fallback instead of silently dropping.
for platform_name in ("matrix", "telegram", "discord", "slack", "bluebubbles"):
for platform_name in ("matrix", "telegram", "discord", "slack"):
chat_id = os.getenv(f"{platform_name.upper()}_HOME_CHANNEL", "")
if chat_id:
logger.info(
@@ -158,45 +331,7 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
}
# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background
_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'})
_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
_IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'})
def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: dict | None, loop, job: dict) -> None:
"""Send extracted MEDIA files as native platform attachments via a live adapter.
Routes each file to the appropriate adapter method (send_voice, send_image_file,
send_video, send_document) based on file extension — mirroring the routing logic
in ``BasePlatformAdapter._process_message_background``.
"""
from pathlib import Path
for media_path, _is_voice in media_files:
try:
ext = Path(media_path).suffix.lower()
if ext in _AUDIO_EXTS:
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
elif ext in _VIDEO_EXTS:
coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata)
elif ext in _IMAGE_EXTS:
coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata)
else:
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
future = asyncio.run_coroutine_threadsafe(coro, loop)
result = future.result(timeout=30)
if result and not getattr(result, "success", True):
logger.warning(
"Job '%s': media send failed for %s: %s",
job.get("id", "?"), media_path, getattr(result, "error", "unknown"),
)
except Exception as e:
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Optional[str]:
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
"""
Deliver job output to the configured target (origin chat, specific platform, etc.).
@@ -204,36 +339,21 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
use the live adapter first — this supports E2EE rooms (e.g. Matrix) where
the standalone HTTP path cannot encrypt. Falls back to standalone send if
the adapter path fails or is unavailable.
Returns None on success, or an error string on failure.
"""
target = _resolve_delivery_target(job)
if not target:
if job.get("deliver", "local") != "local":
msg = f"no delivery target resolved for deliver={job.get('deliver', 'local')}"
logger.warning("Job '%s': %s", job["id"], msg)
return msg
return None # local-only jobs don't deliver — not a failure
logger.warning(
"Job '%s' deliver=%s but no concrete delivery target could be resolved",
job["id"],
job.get("deliver", "local"),
)
return
platform_name = target["platform"]
chat_id = target["chat_id"]
thread_id = target.get("thread_id")
# Diagnostic: log thread_id for topic-aware delivery debugging
origin = job.get("origin") or {}
origin_thread = origin.get("thread_id")
if origin_thread and not thread_id:
logger.warning(
"Job '%s': origin has thread_id=%s but delivery target lost it "
"(deliver=%s, target=%s)",
job["id"], origin_thread, job.get("deliver", "local"), target,
)
elif thread_id:
logger.debug(
"Job '%s': delivering to %s:%s thread_id=%s",
job["id"], platform_name, chat_id, thread_id,
)
from tools.send_message_tool import _send_to_platform
from gateway.config import load_gateway_config, Platform
@@ -249,30 +369,24 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
"dingtalk": Platform.DINGTALK,
"feishu": Platform.FEISHU,
"wecom": Platform.WECOM,
"wecom_callback": Platform.WECOM_CALLBACK,
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
"bluebubbles": Platform.BLUEBUBBLES,
}
platform = platform_map.get(platform_name.lower())
if not platform:
msg = f"unknown platform '{platform_name}'"
logger.warning("Job '%s': %s", job["id"], msg)
return msg
logger.warning("Job '%s': unknown platform '%s' for delivery", job["id"], platform_name)
return
try:
config = load_gateway_config()
except Exception as e:
msg = f"failed to load gateway config: {e}"
logger.error("Job '%s': %s", job["id"], msg)
return msg
logger.error("Job '%s': failed to load gateway config for delivery: %s", job["id"], e)
return
pconfig = config.platforms.get(platform)
if not pconfig or not pconfig.enabled:
msg = f"platform '{platform_name}' not configured/enabled"
logger.warning("Job '%s': %s", job["id"], msg)
return msg
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
return
# Optionally wrap the content with a header/footer so the user knows this
# is a cron delivery. Wrapping is on by default; set cron.wrap_response: false
@@ -305,30 +419,20 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
send_metadata = {"thread_id": thread_id} if thread_id else None
try:
# Send cleaned text (MEDIA tags stripped) — not the raw content
text_to_send = cleaned_delivery_content.strip()
adapter_ok = True
if text_to_send:
future = asyncio.run_coroutine_threadsafe(
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
loop,
future = asyncio.run_coroutine_threadsafe(
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
loop,
)
send_result = future.result(timeout=60)
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, err,
)
send_result = future.result(timeout=60)
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, err,
)
adapter_ok = False # fall through to standalone path
# Send extracted media files as native attachments via the live adapter
if adapter_ok and media_files:
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
if adapter_ok:
else:
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
return None
return
except Exception as e:
logger.warning(
"Job '%s': live adapter delivery to %s:%s failed (%s), falling back to standalone",
@@ -350,55 +454,16 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, cleaned_delivery_content, thread_id=thread_id, media_files=media_files))
result = future.result(timeout=30)
except Exception as e:
msg = f"delivery to {platform_name}:{chat_id} failed: {e}"
logger.error("Job '%s': %s", job["id"], msg)
return msg
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
return
if result and result.get("error"):
msg = f"delivery error: {result['error']}"
logger.error("Job '%s': %s", job["id"], msg)
return msg
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
return None
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
else:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
_DEFAULT_SCRIPT_TIMEOUT = 120 # seconds
# Backward-compatible module override used by tests and emergency monkeypatches.
_SCRIPT_TIMEOUT = _DEFAULT_SCRIPT_TIMEOUT
def _get_script_timeout() -> int:
"""Resolve cron pre-run script timeout from module/env/config with a safe default."""
if _SCRIPT_TIMEOUT != _DEFAULT_SCRIPT_TIMEOUT:
try:
timeout = int(float(_SCRIPT_TIMEOUT))
if timeout > 0:
return timeout
except Exception:
logger.warning("Invalid patched _SCRIPT_TIMEOUT=%r; using env/config/default", _SCRIPT_TIMEOUT)
env_value = os.getenv("HERMES_CRON_SCRIPT_TIMEOUT", "").strip()
if env_value:
try:
timeout = int(float(env_value))
if timeout > 0:
return timeout
except Exception:
logger.warning("Invalid HERMES_CRON_SCRIPT_TIMEOUT=%r; using config/default", env_value)
try:
cfg = load_config() or {}
cron_cfg = cfg.get("cron", {}) if isinstance(cfg, dict) else {}
configured = cron_cfg.get("script_timeout_seconds")
if configured is not None:
timeout = int(float(configured))
if timeout > 0:
return timeout
except Exception as exc:
logger.debug("Failed to load cron script timeout from config: %s", exc)
return _DEFAULT_SCRIPT_TIMEOUT
_SCRIPT_TIMEOUT = 120 # seconds
def _run_job_script(script_path: str) -> tuple[bool, str]:
@@ -445,27 +510,17 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
if not path.is_file():
return False, f"Script path is not a file: {path}"
script_timeout = _get_script_timeout()
try:
result = subprocess.run(
[sys.executable, str(path)],
capture_output=True,
text=True,
timeout=script_timeout,
timeout=_SCRIPT_TIMEOUT,
cwd=str(path.parent),
)
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
# Redact secrets from both stdout and stderr before any return path.
try:
from agent.redact import redact_sensitive_text
stdout = redact_sensitive_text(stdout)
stderr = redact_sensitive_text(stderr)
except Exception:
pass
if result.returncode != 0:
parts = [f"Script exited with code {result.returncode}"]
if stderr:
@@ -474,10 +529,17 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
parts.append(f"stdout:\n{stdout}")
return False, "\n".join(parts)
# Redact any secrets that may appear in script output before
# they are injected into the LLM prompt context.
try:
from agent.redact import redact_sensitive_text
stdout = redact_sensitive_text(stdout)
except Exception:
pass
return True, stdout
except subprocess.TimeoutExpired:
return False, f"Script timed out after {script_timeout}s: {path}"
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
except Exception as exc:
return False, f"Script execution failed: {exc}"
@@ -524,7 +586,15 @@ def _build_job_prompt(job: dict) -> str:
"SILENT: If there is genuinely nothing new to report, respond "
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
"Never combine [SILENT] with content — either report your "
"findings normally, or say [SILENT] and nothing more.]\n\n"
"findings normally, or say [SILENT] and nothing more. "
"SCRIPT_FAILURE: If an external command or script you ran "
"failed (timeout, crash, connection error, non-zero exit), you MUST "
"respond with "
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = cron_hint + prompt
if skills is None:
@@ -579,6 +649,10 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
# Deploy sync guard — fail fast on first job if the installed
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -641,18 +715,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
except Exception as e:
logger.warning("Job '%s': failed to load config.yaml, using defaults: %s", job_id, e)
# Apply IPv4 preference if configured.
try:
from hermes_constants import apply_ipv4_preference
_net_cfg = _cfg.get("network", {})
if isinstance(_net_cfg, dict) and _net_cfg.get("force_ipv4"):
apply_ipv4_preference(force=True)
except Exception:
pass
# Reasoning config from config.yaml
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
reasoning_config = parse_reasoning_effort(effort)
# Prefill messages from env or config.yaml
@@ -710,49 +777,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
fallback_model = _cfg.get("fallback_providers") or _cfg.get("fallback_model") or None
credential_pool = None
runtime_provider = str(turn_route["runtime"].get("provider") or "").strip().lower()
if runtime_provider:
try:
from agent.credential_pool import load_pool
pool = load_pool(runtime_provider)
if pool.has_credentials():
credential_pool = pool
logger.info(
"Job '%s': loaded credential pool for provider %s with %d entries",
job_id,
runtime_provider,
len(pool.entries()),
)
except Exception as e:
logger.debug("Job '%s': failed to load credential pool for %s: %s", job_id, runtime_provider, e)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
api_mode=turn_route["runtime"].get("api_mode"),
acp_command=turn_route["runtime"].get("command"),
acp_args=turn_route["runtime"].get("args"),
max_iterations=max_iterations,
reasoning_config=reasoning_config,
prefill_messages=prefill_messages,
fallback_model=fallback_model,
credential_pool=credential_pool,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
disabled_toolsets=["cronjob", "messaging", "clarify"],
quiet_mode=True,
skip_context_files=True, # Don't inject SOUL.md/AGENTS.md from scheduler cwd
skip_memory=True, # Cron system prompts would corrupt user representations
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
"base_url": turn_route["runtime"].get("base_url"),
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
"providers_allowed": pr.get("only"),
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
"platform": "cron",
"session_id": _cron_session_id,
"session_db": _session_db,
})
agent = AIAgent(**_agent_kwargs)
# Run the agent with an *inactivity*-based timeout: the job can run
# for hours if it's actively calling tools / receiving stream tokens,
@@ -765,8 +813,47 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
# Guard against interpreter shutdown: ThreadPoolExecutor.submit()
# raises RuntimeError("cannot schedule new futures after interpreter
# shutdown") when Python is finalizing (e.g. gateway restart races).
# Fall back to synchronous execution so the job at least attempts.
_cron_pool = None
try:
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
except RuntimeError:
logger.warning(
"Job '%s': ThreadPoolExecutor unavailable (interpreter shutdown?) "
"— falling back to synchronous execution",
job_name,
)
if _cron_pool is not None:
try:
_cron_pool.shutdown(wait=False)
except Exception:
pass
_cron_pool = None
result = agent.run_conversation(prompt)
final_response = result.get("final_response", "") or ""
logged_response = final_response if final_response else "(No response generated)"
output = f"""# Cron Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
logger.info("Job '%s' completed (sync fallback)", job_name)
return True, output, final_response, None
_inactivity_timeout = False
try:
if _cron_inactivity_limit is None:
@@ -793,10 +880,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_inactivity_timeout = True
break
except Exception:
_cron_pool.shutdown(wait=False, cancel_futures=True)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False, cancel_futures=True)
raise
finally:
_cron_pool.shutdown(wait=False, cancel_futures=True)
if _cron_pool is not None:
_cron_pool.shutdown(wait=False)
if _inactivity_timeout:
# Build diagnostic summary from the agent's activity tracker.
@@ -831,6 +920,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Use a separate variable for log display; keep final_response clean
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
if _script_failed_reason is not None:
logger.warning(
"Job '%s': agent reported script failure — %s",
job_name, _script_failed_reason,
)
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}
@@ -937,6 +1050,16 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
@@ -959,15 +1082,13 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
_deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
mark_job_run(job["id"], success, error)
executed += 1
except Exception as e:

153
tools/review_guard.py Normal file
View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
review_guard.py — Pre-review guard for sovereign agent PR reviews.
Prevents rubber-stamping: no agent can approve a PR with zero changes.
Auto-flags and closes zombie PRs (0 additions, 0 deletions, 0 changed files).
Refs: Timmy_Foundation/timmy-config#379
"""
import json
import os
import sys
import urllib.request
import urllib.error
from typing import Optional, Dict, Any
def load_gitea_token() -> str:
"""Load Gitea API token from standard location."""
token_path = os.path.expanduser("~/.hermes/gitea_token_vps")
if not os.path.exists(token_path):
raise FileNotFoundError(f"Gitea token not found at {token_path}")
with open(token_path) as f:
return f.read().strip()
def gitea_api(method: str, endpoint: str, data: Optional[dict] = None, token: Optional[str] = None) -> Optional[dict]:
"""Make a Gitea API call."""
if token is None:
token = load_gitea_token()
base = "https://forge.alexanderwhitestone.com/api/v1"
url = f"{base}{endpoint}"
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
resp = urllib.request.urlopen(req, timeout=15)
return json.loads(resp.read())
except urllib.error.HTTPError as e:
return None
def get_pr(repo: str, pr_number: int) -> Optional[Dict[str, Any]]:
"""Get PR details."""
return gitea_api("GET", f"/repos/{repo}/pulls/{pr_number}")
def is_zombie_pr(pr: Dict[str, Any]) -> bool:
"""Check if a PR has zero changes."""
return (
pr.get("additions", 0) == 0
and pr.get("deletions", 0) == 0
and pr.get("changed_files", 0) == 0
)
def should_review(repo: str, pr_number: int) -> Dict[str, Any]:
"""Pre-review guard. Returns decision dict.
Returns:
{"action": "review", "pr": {...}} — safe to review
{"action": "skip", "reason": "...", "pr": {...}} — skip review
{"action": "close_zombie", "pr": {...}} — auto-close zero-change PR
"""
pr = get_pr(repo, pr_number)
if pr is None:
return {"action": "skip", "reason": "PR not found or API error"}
# Check if already merged or closed
if pr.get("state") != "open" or pr.get("merged"):
return {"action": "skip", "reason": f"PR is {pr.get('state')}", "pr": pr}
# ZERO-CHANGE GUARD — the core protection
if is_zombie_pr(pr):
return {
"action": "close_zombie",
"reason": f"PR has {pr.get('additions', 0)} additions, {pr.get('deletions', 0)} deletions, {pr.get('changed_files', 0)} changed files",
"pr": pr,
}
# PR has changes — safe to review
return {"action": "review", "pr": pr}
def close_zombie_pr(repo: str, pr_number: int, pr: Optional[Dict] = None) -> bool:
"""Close a zero-change PR with a comment explaining why."""
if pr is None:
pr = get_pr(repo, pr_number)
if pr is None:
return False
# Post comment
comment = (
"🤖 **Review Guard: Zero-change PR detected**\n\n"
f"This PR has {pr.get('additions', 0)} additions, "
f"{pr.get('deletions', 0)} deletions, "
f"and {pr.get('changed_files', 0)} changed files.\n\n"
"No changes to review. Closing as zombie PR.\n\n"
"If this was intentional, reopen with actual changes."
)
gitea_api("POST", f"/repos/{repo}/issues/{pr_number}/comments", {"body": comment})
# Close the PR
gitea_api("PATCH", f"/repos/{repo}/issues/{pr_number}", {"state": "closed"})
return True
def guarded_review(repo: str, pr_number: int) -> Dict[str, Any]:
"""Full guarded review workflow.
Call this instead of directly reviewing a PR. It will:
1. Check if the PR is a zombie (zero changes)
2. If zombie: comment and close
3. If real: return PR details for review
Usage:
result = guarded_review("Timmy_Foundation/timmy-config", 400)
if result["action"] == "review":
# Proceed with actual review
review_pr(result["pr"])
else:
print(result["reason"])
"""
decision = should_review(repo, pr_number)
if decision["action"] == "close_zombie":
closed = close_zombie_pr(repo, pr_number, decision.get("pr"))
return {
"action": "closed_zombie",
"reason": decision["reason"],
"closed": closed,
"pr": decision.get("pr"),
}
return decision
# --- Self-test ---
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python review_guard.py <repo> <pr_number>")
print("Example: python review_guard.py Timmy_Foundation/timmy-config 359")
sys.exit(1)
repo = sys.argv[1]
pr_number = int(sys.argv[2])
result = guarded_review(repo, pr_number)
print(json.dumps(result, indent=2, default=str))