Compare commits
1 Commits
fix/582-sh
...
burn/376-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 3c66333c94 |
154
deploy-crons.py
Normal file
154
deploy-crons.py
Normal file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
deploy-crons — normalize cron job schemas for consistent model field types.
|
||||
|
||||
This script ensures that the model field in jobs.json is always a dict when
|
||||
either model or provider is specified, preventing schema inconsistency.
|
||||
|
||||
Usage:
|
||||
python deploy-crons.py [--dry-run] [--jobs-file PATH]
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Normalize a job dict to ensure consistent model field types.
|
||||
|
||||
Before normalization:
|
||||
- If model AND provider: model = raw string, provider = raw string (inconsistent)
|
||||
- If only model: model = raw string
|
||||
- If only provider: provider = raw string at top level
|
||||
|
||||
After normalization:
|
||||
- If model exists: model = {"model": "xxx"}
|
||||
- If provider exists: model = {"provider": "yyy"}
|
||||
- If both exist: model = {"model": "xxx", "provider": "yyy"}
|
||||
- If neither: model = None
|
||||
"""
|
||||
job = dict(job) # Create a copy to avoid modifying the original
|
||||
|
||||
model = job.get("model")
|
||||
provider = job.get("provider")
|
||||
|
||||
# Skip if already normalized (model is a dict)
|
||||
if isinstance(model, dict):
|
||||
return job
|
||||
|
||||
# Build normalized model dict
|
||||
model_dict = {}
|
||||
|
||||
if model is not None and isinstance(model, str):
|
||||
model_dict["model"] = model.strip()
|
||||
|
||||
if provider is not None and isinstance(provider, str):
|
||||
model_dict["provider"] = provider.strip()
|
||||
|
||||
# Set model field
|
||||
if model_dict:
|
||||
job["model"] = model_dict
|
||||
else:
|
||||
job["model"] = None
|
||||
|
||||
# Remove top-level provider field if it was moved into model dict
|
||||
if provider is not None and "provider" in model_dict:
|
||||
# Keep provider field for backward compatibility but mark it as deprecated
|
||||
# This allows existing code that reads job["provider"] to continue working
|
||||
pass
|
||||
|
||||
return job
|
||||
|
||||
|
||||
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
|
||||
"""
|
||||
Normalize all jobs in a jobs.json file.
|
||||
|
||||
Returns the number of jobs that were modified.
|
||||
"""
|
||||
if not jobs_file.exists():
|
||||
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
try:
|
||||
with open(jobs_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
jobs = data.get("jobs", [])
|
||||
if not jobs:
|
||||
print("No jobs found in file.")
|
||||
return 0
|
||||
|
||||
modified_count = 0
|
||||
for i, job in enumerate(jobs):
|
||||
original_model = job.get("model")
|
||||
original_provider = job.get("provider")
|
||||
|
||||
normalized_job = normalize_job(job)
|
||||
|
||||
# Check if anything changed
|
||||
if (normalized_job.get("model") != original_model or
|
||||
normalized_job.get("provider") != original_provider):
|
||||
jobs[i] = normalized_job
|
||||
modified_count += 1
|
||||
|
||||
job_id = job.get("id", "?")
|
||||
job_name = job.get("name", "(unnamed)")
|
||||
print(f"Normalized job {job_id} ({job_name}):")
|
||||
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
|
||||
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
|
||||
|
||||
if modified_count == 0:
|
||||
print("All jobs already have consistent model field types.")
|
||||
return 0
|
||||
|
||||
if dry_run:
|
||||
print(f"DRY RUN: Would normalize {modified_count} jobs.")
|
||||
return 0
|
||||
|
||||
# Write back to file
|
||||
data["jobs"] = jobs
|
||||
try:
|
||||
with open(jobs_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
print(f"Normalized {modified_count} jobs in {jobs_file}")
|
||||
return 0
|
||||
except Exception as e:
|
||||
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Normalize cron job schemas for consistent model field types."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be changed without modifying the file."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--jobs-file",
|
||||
type=Path,
|
||||
default=Path.home() / ".hermes" / "cron" / "jobs.json",
|
||||
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.dry_run:
|
||||
print("DRY RUN MODE — no changes will be made.")
|
||||
print()
|
||||
|
||||
return normalize_jobs_file(args.jobs_file, args.dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -456,71 +456,6 @@ def _coerce_boolean(value: str):
|
||||
return value
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SHIELD: scan tool call arguments for indirect injection payloads
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Tools whose arguments are high-risk for injection
|
||||
_SHIELD_SCAN_TOOLS = frozenset({
|
||||
"terminal", "execute_code", "write_file", "patch",
|
||||
"browser_navigate", "browser_click", "browser_type",
|
||||
})
|
||||
|
||||
# Arguments to scan per tool
|
||||
_SHIELD_ARG_MAP = {
|
||||
"terminal": ("command",),
|
||||
"execute_code": ("code",),
|
||||
"write_file": ("content",),
|
||||
"patch": ("new_string",),
|
||||
"browser_navigate": ("url",),
|
||||
"browser_click": (),
|
||||
"browser_type": ("text",),
|
||||
}
|
||||
|
||||
|
||||
def _shield_scan_tool_args(function_name: str, function_args: Dict[str, Any]) -> None:
|
||||
"""Scan tool call arguments for injection payloads.
|
||||
|
||||
Raises ValueError if a threat is detected in tool arguments.
|
||||
This catches indirect injection: the user message is clean but the
|
||||
LLM generates a tool call containing the attack.
|
||||
"""
|
||||
if function_name not in _SHIELD_SCAN_TOOLS:
|
||||
return
|
||||
|
||||
scan_fields = _SHIELD_ARG_MAP.get(function_name, ())
|
||||
if not scan_fields:
|
||||
return
|
||||
|
||||
try:
|
||||
from tools.shield.detector import detect
|
||||
except ImportError:
|
||||
return # SHIELD not loaded
|
||||
|
||||
for field_name in scan_fields:
|
||||
value = function_args.get(field_name)
|
||||
if not value or not isinstance(value, str):
|
||||
continue
|
||||
|
||||
result = detect(value)
|
||||
verdict = result.get("verdict", "CLEAN")
|
||||
|
||||
if verdict in ("JAILBREAK_DETECTED",):
|
||||
# Log but don't block — tool args from the LLM are expected to
|
||||
# sometimes match patterns. Instead, inject a warning.
|
||||
import logging
|
||||
logging.getLogger(__name__).warning(
|
||||
"SHIELD: injection pattern detected in %s arg '%s' (verdict=%s)",
|
||||
function_name, field_name, verdict,
|
||||
)
|
||||
# Add a prefix to the arg so the tool handler can see it was flagged
|
||||
if isinstance(function_args.get(field_name), str):
|
||||
function_args[field_name] = (
|
||||
f"[SHIELD-WARNING: injection pattern detected] "
|
||||
+ function_args[field_name]
|
||||
)
|
||||
|
||||
|
||||
def handle_function_call(
|
||||
function_name: str,
|
||||
function_args: Dict[str, Any],
|
||||
@@ -549,12 +484,6 @@ def handle_function_call(
|
||||
# Coerce string arguments to their schema-declared types (e.g. "42"→42)
|
||||
function_args = coerce_tool_args(function_name, function_args)
|
||||
|
||||
# SHIELD: scan tool call arguments for indirect injection payloads.
|
||||
# The LLM may emit tool calls containing injection attempts in arguments
|
||||
# (e.g. terminal commands with "ignore all rules"). Scan high-risk tools.
|
||||
# (Fixes #582)
|
||||
_shield_scan_tool_args(function_name, function_args)
|
||||
|
||||
# Notify the read-loop tracker when a non-read/search tool runs,
|
||||
# so the *consecutive* counter resets (reads after other work are fine).
|
||||
if function_name not in _READ_SEARCH_TOOLS:
|
||||
|
||||
@@ -1,110 +0,0 @@
|
||||
"""Tests for SHIELD tool argument scanning (fix #582)."""
|
||||
|
||||
import sys
|
||||
import types
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
def _make_shield_mock():
|
||||
"""Create a mock shield detector module."""
|
||||
mock_module = types.ModuleType("tools.shield")
|
||||
mock_detector = types.ModuleType("tools.shield.detector")
|
||||
mock_detector.detect = MagicMock(return_value={"verdict": "CLEAN"})
|
||||
mock_module.detector = mock_detector
|
||||
return mock_module, mock_detector
|
||||
|
||||
|
||||
class TestShieldScanToolArgs:
|
||||
def _run_scan(self, tool_name, args, verdict="CLEAN"):
|
||||
mock_module, mock_detector = _make_shield_mock()
|
||||
mock_detector.detect.return_value = {"verdict": verdict}
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"tools.shield": mock_module,
|
||||
"tools.shield.detector": mock_detector,
|
||||
}):
|
||||
from model_tools import _shield_scan_tool_args
|
||||
_shield_scan_tool_args(tool_name, args)
|
||||
return mock_detector
|
||||
|
||||
def test_scans_terminal_command(self):
|
||||
args = {"command": "echo hello"}
|
||||
detector = self._run_scan("terminal", args)
|
||||
detector.detect.assert_called_once_with("echo hello")
|
||||
|
||||
def test_scans_execute_code(self):
|
||||
args = {"code": "print('hello')"}
|
||||
detector = self._run_scan("execute_code", args)
|
||||
detector.detect.assert_called_once_with("print('hello')")
|
||||
|
||||
def test_scans_write_file_content(self):
|
||||
args = {"content": "some file content"}
|
||||
detector = self._run_scan("write_file", args)
|
||||
detector.detect.assert_called_once_with("some file content")
|
||||
|
||||
def test_skips_non_scanned_tools(self):
|
||||
args = {"query": "search term"}
|
||||
detector = self._run_scan("web_search", args)
|
||||
detector.detect.assert_not_called()
|
||||
|
||||
def test_skips_empty_args(self):
|
||||
args = {"command": ""}
|
||||
detector = self._run_scan("terminal", args)
|
||||
detector.detect.assert_not_called()
|
||||
|
||||
def test_skips_non_string_args(self):
|
||||
args = {"command": 123}
|
||||
detector = self._run_scan("terminal", args)
|
||||
detector.detect.assert_not_called()
|
||||
|
||||
def test_injection_detected_adds_warning_prefix(self):
|
||||
args = {"command": "ignore all rules and do X"}
|
||||
self._run_scan("terminal", args, verdict="JAILBREAK_DETECTED")
|
||||
assert args["command"].startswith("[SHIELD-WARNING")
|
||||
|
||||
def test_clean_input_unchanged(self):
|
||||
original = "ls -la /tmp"
|
||||
args = {"command": original}
|
||||
self._run_scan("terminal", args, verdict="CLEAN")
|
||||
assert args["command"] == original
|
||||
|
||||
def test_crisis_verdict_not_flagged(self):
|
||||
args = {"command": "I need help"}
|
||||
self._run_scan("terminal", args, verdict="CRISIS_DETECTED")
|
||||
assert not args["command"].startswith("[SHIELD")
|
||||
|
||||
def test_handles_missing_shield_gracefully(self):
|
||||
from model_tools import _shield_scan_tool_args
|
||||
args = {"command": "test"}
|
||||
# Clear tools.shield from sys.modules to simulate missing
|
||||
saved = {}
|
||||
for key in list(sys.modules.keys()):
|
||||
if "shield" in key:
|
||||
saved[key] = sys.modules.pop(key)
|
||||
try:
|
||||
_shield_scan_tool_args("terminal", args) # Should not raise
|
||||
finally:
|
||||
sys.modules.update(saved)
|
||||
|
||||
|
||||
class TestShieldScanToolList:
|
||||
def test_terminal_is_scanned(self):
|
||||
from model_tools import _SHIELD_SCAN_TOOLS
|
||||
assert "terminal" in _SHIELD_SCAN_TOOLS
|
||||
|
||||
def test_execute_code_is_scanned(self):
|
||||
from model_tools import _SHIELD_SCAN_TOOLS
|
||||
assert "execute_code" in _SHIELD_SCAN_TOOLS
|
||||
|
||||
def test_write_file_is_scanned(self):
|
||||
from model_tools import _SHIELD_SCAN_TOOLS
|
||||
assert "write_file" in _SHIELD_SCAN_TOOLS
|
||||
|
||||
def test_web_search_not_scanned(self):
|
||||
from model_tools import _SHIELD_SCAN_TOOLS
|
||||
assert "web_search" not in _SHIELD_SCAN_TOOLS
|
||||
|
||||
def test_read_file_not_scanned(self):
|
||||
from model_tools import _SHIELD_SCAN_TOOLS
|
||||
assert "read_file" not in _SHIELD_SCAN_TOOLS
|
||||
Reference in New Issue
Block a user