Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
24f49ad23b fix(agent): preflight check rejects empty model before API call
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 27s
473 errors in the gateway log with "Model parameter is required" from Nous.
Root cause: self.model defaults to "" when no model is specified (cron jobs
with model: null, no HERMES_MODEL env var, no config.yaml default).

The empty string reaches api_kwargs["model"] and Nous returns HTTP 400 —
but only after 3 retries. Adding a preflight check in _build_api_kwargs()
raises ValueError immediately with a clear message telling the user how
to set the model.

Before: 3 retries → HTTP 400 → "Model parameter is required" → confusing
After: ValueError("No model specified. Set the model via AIAgent(model=...),
  HERMES_MODEL env var, or config.yaml model.default. Current value: ''")

5 tests: empty string, whitespace, None (all raise), valid model (passes),
error message format verification.

Related: #328 (gateway config debt — item 6: classify unclassified errors)
2026-04-13 15:15:26 -04:00
Alexander Whitestone
c1826ece68 feat(agent): error circuit breaker to prevent cascade amplification
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%.
That's a 2.33x cascade factor. Max observed streak: 31 consecutive errors.
After 9 errors, 41/46 recoveries are via terminal alone.

This commit adds a circuit breaker to the agent loop:

- _error_streak counter on AIAgent, incremented per tool error, reset on success
- At 3 consecutive errors (nudge threshold): inject a WARNING into the last
  tool message telling the LLM to try a different approach or use terminal
- At 5 consecutive errors (halt threshold): set _error_streak_halted=True,
  inject CIRCUIT BREAKER message, and break out of the main loop
- Works in both concurrent and sequential tool execution paths
- Nudge is injected as _circuit_breaker key in JSON tool results, or appended
  to plain-text results (same pattern as budget pressure injection)

Also fixes a pre-existing indentation bug in scheduler.py tick() that caused
SyntaxError on the for-job-results loop body.

16 tests covering: streak counting, nudge/halt thresholds, reset on success,
message injection (JSON + plain text + edge cases), class constants, and
__init__ attribute verification.

Fixes #309
2026-04-13 10:22:15 -04:00
26 changed files with 478 additions and 1455 deletions

View File

@@ -26,11 +26,11 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
"get_job",
"get_job",
"list_jobs",
"remove_job",
"update_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -563,44 +563,6 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
"""
Execute a job immediately and persist fresh state.
Unlike trigger_job() which queues for the next scheduler tick,
this runs the job synchronously and returns the result.
Clears stale error state on success.
Returns:
Dict with 'job', 'success', 'output', 'error' keys, or None if not found.
"""
job = get_job(job_id)
if not job:
return None
try:
from cron.scheduler import run_job as _run_job
except ImportError as exc:
return {
"job": job,
"success": False,
"output": None,
"error": f"Cannot import scheduler: {exc}",
}
success, output, final_response, error = _run_job(job)
mark_job_run(job_id, success, error)
updated_job = get_job(job_id) or job
return {
"job": updated_job,
"success": success,
"output": output,
"final_response": final_response,
"error": error,
}
def remove_job(job_id: str) -> bool:
"""Remove a job by ID."""
jobs = load_jobs()

View File

@@ -10,6 +10,7 @@ runs at a time if multiple processes overlap.
import asyncio
import concurrent.futures
import inspect
import json
import logging
import os
@@ -40,112 +41,6 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# =====================================================================
# Deploy Sync Guard
# =====================================================================
#
# If the installed run_agent.py diverges from the version scheduler.py
# was written against, every cron job fails with:
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
#
# _validate_agent_interface() catches this at the FIRST job, not the
# 55th. It uses inspect.signature() to verify every kwarg we pass is
# accepted by AIAgent.__init__().
#
# Maintaining this list: if you add a kwarg to the AIAgent() call in
# run_job(), add it here too. The guard catches mismatches.
_SCHEDULER_AGENT_KWARGS: set = frozenset({
"model", "api_key", "base_url", "provider", "api_mode",
"acp_command", "acp_args", "max_iterations", "reasoning_config",
"prefill_messages", "providers_allowed", "providers_ignored",
"providers_order", "provider_sort", "disabled_toolsets",
"tool_choice", "quiet_mode", "skip_memory", "platform",
"session_id", "session_db",
})
_agent_interface_validated: bool = False
def _validate_agent_interface() -> None:
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
Raises RuntimeError with actionable guidance if params are missing.
Caches result — runs once per gateway process lifetime.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
import inspect
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Cannot import AIAgent: {exc}\n"
"Is hermes-agent installed? Check PYTHONPATH."
) from exc
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
sorted_missing = sorted(missing)
raise RuntimeError(
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
f" {', '.join(sorted_missing)}\n"
"This means the installed run_agent.py is out of date.\n"
"Fix: pull latest hermes-agent code and restart the gateway.\n"
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
)
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
def _safe_agent_kwargs(kwargs: dict) -> dict:
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
More resilient than _validate_agent_interface() alone: instead of
crashing on mismatch, drops unsupported kwargs and logs a warning.
Jobs run with degraded functionality instead of failing entirely.
Args:
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
Returns:
A new dict containing only kwargs the installed AIAgent accepts.
"""
import inspect
try:
from run_agent import AIAgent
except ImportError:
# Can't import — pass everything through, let the real error surface
return kwargs
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
for key, value in kwargs.items():
if key in accepted:
safe[key] = value
else:
dropped.append(key)
if dropped:
logger.warning(
"Dropping unsupported AIAgent kwargs (stale install?): %s",
", ".join(sorted(dropped)),
)
return safe
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -156,76 +51,75 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Deploy Sync Guard
# ---------------------------------------------------------------------------
# The scheduler passes keyword arguments to AIAgent() that may not exist in
# older installed versions. When the installed run_agent.py diverges from the
# version the scheduler was written against, every cron job fails with a
# TypeError — a silent total outage. This guard catches that at the first
# tick rather than after 1000+ error log lines.
#
# The check runs once per gateway process (cached by _agent_interface_validated).
# It inspects AIAgent.__init__ and verifies every parameter the scheduler
# passes is accepted. On mismatch it raises RuntimeError with a fix command.
_agent_interface_validated = False
# Parameters the scheduler passes to AIAgent() in run_job().
# If you add a new kwarg to the AIAgent() call below, add it here too.
_SCHEDULER_AGENT_KWARGS = {
"tool_choice": "required",
"skip_memory": True,
"platform": "cron",
}
def _validate_agent_interface():
"""Verify AIAgent.__init__ accepts every kwarg the scheduler uses.
Raises RuntimeError with a remediation message on mismatch.
Called once per process from run_job(); subsequent calls are no-ops.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Deploy sync guard: cannot import AIAgent from run_agent: {exc}\n"
"The installed hermes-agent package may be corrupted. "
"Reinstall: pip install -e ~/.hermes/hermes-agent"
) from exc
sig = inspect.signature(AIAgent.__init__)
params = set(sig.parameters.keys()) - {"self"}
missing = [kw for kw in _SCHEDULER_AGENT_KWARGS if kw not in params]
if missing:
raise RuntimeError(
"Deploy sync guard: AIAgent.__init__() is missing parameters that "
"the cron scheduler requires. This means the installed code is out "
"of sync with the scheduler module.\n"
f" Missing parameters: {', '.join(missing)}\n"
f" Expected by: cron/scheduler.py (run_job → AIAgent())\n"
f" Fix: pip install -e ~/.hermes/hermes-agent --force-reinstall\n"
f" Then restart the gateway."
)
_agent_interface_validated = True
logger.info(
"Deploy sync guard: AIAgent interface OK (%d params, %d scheduler deps)",
len(params), len(_SCHEDULER_AGENT_KWARGS),
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Failure phrases that indicate an external script/command failed, even when
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
# against the final response. These are strong signals — agents rarely use
# these words when a script succeeded.
_SCRIPT_FAILURE_PHRASES = (
"timed out",
"timeout",
"connection error",
"connection refused",
"connection reset",
"failed to execute",
"failed due to",
"script failed",
"script error",
"command failed",
"exit code",
"exit status",
"non-zero exit",
"did not complete",
"could not run",
"unable to execute",
"permission denied",
"no such file",
"traceback",
)
def _detect_script_failure(final_response: str) -> Optional[str]:
"""Detect script failure from agent's final response.
Returns a reason string if failure detected, None otherwise.
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
"""
if not final_response:
return None
# 1. Explicit marker — highest confidence.
if SCRIPT_FAILED_MARKER in final_response.upper():
import re as _re
_m = _re.search(
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
final_response,
_re.IGNORECASE,
)
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
return reason or "Agent reported script failure"
# 2. Heuristic detection — catch failures described in natural language.
# Only flag if the response contains failure language AND does NOT
# contain success markers like [NOOP] (which means the script ran fine
# but found nothing).
lower = final_response.lower()
has_noop = "[noop]" in lower
has_silent = "[silent]" in lower
if has_noop or has_silent:
return None # Agent explicitly signaled success/nothing-to-report
for phrase in _SCRIPT_FAILURE_PHRASES:
if phrase in lower:
return f"Detected script failure phrase: '{phrase}'"
return None
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -586,15 +480,7 @@ def _build_job_prompt(job: dict) -> str:
"SILENT: If there is genuinely nothing new to report, respond "
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
"Never combine [SILENT] with content — either report your "
"findings normally, or say [SILENT] and nothing more. "
"SCRIPT_FAILURE: If an external command or script you ran "
"failed (timeout, crash, connection error, non-zero exit), you MUST "
"respond with "
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
"findings normally, or say [SILENT] and nothing more.]\n\n"
)
prompt = cron_hint + prompt
if skills is None:
@@ -649,12 +535,13 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
# Deploy sync guard — fail fast on first job if the installed
# AIAgent.__init__ is missing params the scheduler expects.
from run_agent import AIAgent
# Deploy sync guard: verify the installed AIAgent accepts all kwargs
# the scheduler passes. Catches stale-code-deploy bugs before they
# cascade into 1000+ TypeErrors and gateway restart loops.
_validate_agent_interface()
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
# and discoverable via session_search (same pattern as gateway/run.py).
_session_db = None
@@ -777,30 +664,29 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
"base_url": turn_route["runtime"].get("base_url"),
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
"providers_allowed": pr.get("only"),
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
"platform": "cron",
"session_id": _cron_session_id,
"session_db": _session_db,
})
agent = AIAgent(**_agent_kwargs)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
api_mode=turn_route["runtime"].get("api_mode"),
acp_command=turn_route["runtime"].get("command"),
acp_args=turn_route["runtime"].get("args"),
max_iterations=max_iterations,
reasoning_config=reasoning_config,
prefill_messages=prefill_messages,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
disabled_toolsets=["cronjob", "messaging", "clarify"],
tool_choice="required",
quiet_mode=True,
skip_memory=True, # Cron system prompts would corrupt user representations
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
# Run the agent with an *inactivity*-based timeout: the job can run
# for hours if it's actively calling tools / receiving stream tokens,
@@ -920,30 +806,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Use a separate variable for log display; keep final_response clean
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
if _script_failed_reason is not None:
logger.warning(
"Job '%s': agent reported script failure — %s",
job_name, _script_failed_reason,
)
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}
@@ -1048,27 +910,36 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
return None
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
try:
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)

View File

@@ -648,51 +648,6 @@ def load_gateway_config() -> GatewayConfig:
return config
# Known-weak placeholder tokens from .env.example, tutorials, etc.
_WEAK_TOKEN_PATTERNS = {
"your-token-here", "your_token_here", "your-token", "your_token",
"change-me", "change_me", "changeme",
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
"test", "testing", "fake", "placeholder",
"replace-me", "replace_me", "replace this",
"insert-token-here", "put-your-token",
"bot-token", "bot_token",
"sk-xxxxxxxx", "sk-placeholder",
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
}
# Minimum token lengths by platform (tokens shorter than these are invalid)
_MIN_TOKEN_LENGTHS = {
"TELEGRAM_BOT_TOKEN": 30,
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
}
def _guard_weak_credentials() -> list[str]:
"""Check env vars for known-weak placeholder tokens.
Returns a list of warning messages for any weak credentials found.
"""
warnings = []
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
value = os.getenv(env_var, "").strip()
if not value:
continue
if value.lower() in _WEAK_TOKEN_PATTERNS:
warnings.append(
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
f"Replace it with a real token."
)
elif len(value) < min_len:
warnings.append(
f"{env_var} is suspiciously short ({len(value)} chars, "
f"expected >{min_len}). May be truncated or invalid."
)
return warnings
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""
@@ -986,7 +941,3 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Guard against weak placeholder tokens from .env.example copies
for warning in _guard_weak_credentials():
logger.warning("Weak credential: %s", warning)

View File

@@ -973,7 +973,6 @@ class APIServerAdapter(BasePlatformAdapter):
pause_job as _cron_pause,
resume_job as _cron_resume,
trigger_job as _cron_trigger,
run_job_now as _cron_run_now,
)
# Wrap as staticmethod to prevent descriptor binding — these are plain
# module functions, not instance methods. Without this, self._cron_*()
@@ -987,7 +986,6 @@ class APIServerAdapter(BasePlatformAdapter):
_cron_pause = staticmethod(_cron_pause)
_cron_resume = staticmethod(_cron_resume)
_cron_trigger = staticmethod(_cron_trigger)
_cron_run_now = staticmethod(_cron_run_now)
_CRON_AVAILABLE = True
except ImportError:
pass
@@ -1206,28 +1204,6 @@ class APIServerAdapter(BasePlatformAdapter):
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def _handle_run_job_now(self, request: "web.Request") -> "web.Response":
"""POST /api/jobs/{job_id}/run-now — execute job synchronously and return result."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
import asyncio as _asyncio
result = await _asyncio.get_event_loop().run_in_executor(
None, self._cron_run_now, job_id
)
if result is None:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response(result)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@@ -1589,7 +1565,6 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
self._app.router.add_post("/api/jobs/{job_id}/run-now", self._handle_run_job_now)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)

View File

@@ -221,31 +221,7 @@ def cron_edit(args):
return 0
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
if result.get("error"):
print(color(f"Failed to run job now: {result['error']}", Colors.RED))
else:
print(color(f"Failed to run job now: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
success = result.get("success", False)
error = result.get("error")
final_response = result.get("final_response", "")
name = job.get("name", job_id)
if success:
print(color(f"Job '{name}' completed successfully", Colors.GREEN))
else:
print(color(f"Job '{name}' failed: {error}", Colors.RED))
if final_response:
print(f"\n{final_response}\n")
if not error:
print(color("Stale error state cleared.", Colors.GREEN))
return 0 if success else 1
def _job_action(action: str, job_id: str, success_verb: str) -> int:
result = _cron_api(action=action, job_id=job_id)
if not result.get("success"):
print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -289,8 +265,7 @@ def cron_command(args):
return _job_action("resume", args.job_id, "Resumed")
if subcmd == "run":
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
return _job_action("run", args.job_id, "Triggered")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")

View File

@@ -4574,7 +4574,6 @@ For more help on a command:
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")

View File

@@ -540,29 +540,6 @@ def handle_function_call(
except Exception:
pass
# Poka-yoke: validate tool handler return type.
# Handlers MUST return a JSON string. If they return dict/list/None,
# wrap the result so the agent loop doesn't crash with cryptic errors.
if not isinstance(result, str):
logger.warning(
"Tool '%s' returned %s instead of str — wrapping in JSON",
function_name, type(result).__name__,
)
result = json.dumps(
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
ensure_ascii=False,
)
else:
# Validate it's parseable JSON
try:
json.loads(result)
except (json.JSONDecodeError, TypeError):
logger.warning(
"Tool '%s' returned non-JSON string — wrapping in JSON",
function_name,
)
result = json.dumps({"output": result}, ensure_ascii=False)
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
auto_extract: false
default_trust: 0.5
min_trust_threshold: 0.3
temporal_decay_half_life: 60
temporal_decay_half_life: 0
"""
from __future__ import annotations
@@ -152,7 +152,6 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -169,7 +168,7 @@ class HolographicMemoryProvider(MemoryProvider):
default_trust = float(self._config.get("default_trust", 0.5))
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(

View File

@@ -98,15 +98,7 @@ class FactRetriever:
# Optional temporal decay
if self.half_life > 0:
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
# Access-recency boost: facts retrieved recently decay slower.
# A fact accessed within 1 half-life gets up to 1.5x the decay
# factor, tapering to 1.0x (no boost) after 2 half-lives.
last_accessed = fact.get("last_accessed_at")
if last_accessed:
access_boost = self._access_recency_boost(last_accessed)
decay = min(1.0, decay * access_boost)
score *= decay
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
fact["score"] = score
scored.append(fact)
@@ -599,41 +591,3 @@ class FactRetriever:
return math.pow(0.5, age_days / self.half_life)
except (ValueError, TypeError):
return 1.0
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
Facts accessed within 1 half-life get up to 1.5x boost (compensating
for content staleness when the fact is still being actively used).
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
Returns 1.0 if half-life is disabled or timestamp is missing.
"""
if not self.half_life or not last_accessed_str:
return 1.0
try:
if isinstance(last_accessed_str, str):
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
else:
ts = last_accessed_str
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
if age_days < 0:
return 1.5 # Future timestamp = just accessed
half_lives_since_access = age_days / self.half_life
if half_lives_since_access <= 1.0:
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
elif half_lives_since_access <= 2.0:
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
return 1.0
else:
return 1.0
except (ValueError, TypeError):
return 1.0

View File

@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
# Extract key facts for Hologram
if self._hologram_store:
# Simple heuristic: look for "I prefer", "The project uses", etc.
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "")
if "prefer" in content.lower() or "use" in content.lower():
if msg.get(\"role\") == \"user\":
content = msg.get(\"content\", \"\")
if \"prefer\" in content.lower() or \"use\" in content.lower():
try:
self._hologram_store.add_fact(content[:200], category="user_pref")
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
except Exception:
pass

View File

@@ -590,6 +590,12 @@ class AIAgent:
# Consumed by every LLM turn across parent + all subagents.
self.iteration_budget = iteration_budget or IterationBudget(max_iterations)
self.tool_delay = tool_delay
# Error circuit breaker — tracks consecutive tool failures across turns.
# After N consecutive errors, inject a nudge to change strategy.
# After N+2, halt and report. Prevents 2.33x cascade amplification.
# See: https://forge.alexanderwhitestone.com/Timmy_Foundation/hermes-agent/issues/309
self._error_streak = 0
self._error_streak_halted = False
self.save_trajectories = save_trajectories
self.verbose_logging = verbose_logging
self.quiet_mode = quiet_mode
@@ -721,19 +727,6 @@ class AIAgent:
self._current_tool: str | None = None
self._api_call_count: int = 0
# Poka-yoke #309: Circuit breaker for error cascading
# P(error | prev was error) = 58.6% vs P(error | prev was success) = 25.2%
# After 3+ consecutive errors, inject guidance to break the cascade.
self._consecutive_tool_errors: int = 0
self._error_streak_tool_names: list = [] # track which tools are in the streak
# Poka-yoke #310: Tool fixation detection
# Marathon sessions show tool fixation - same tool called 8-25 times in a row.
# After 5 consecutive calls to the same tool, nudge the agent to diversify.
self._last_tool_name: str | None = None
self._same_tool_streak: int = 0
self._tool_fixation_threshold: int = 5
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+)
# both live under ~/.hermes/logs/. Idempotent, so gateway mode
# (which creates a new AIAgent per message) won't duplicate handlers.
@@ -5354,6 +5347,15 @@ class AIAgent:
def _build_api_kwargs(self, api_messages: list) -> dict:
"""Build the keyword arguments dict for the active API mode."""
# Preflight: model must be non-empty. An empty model string reaches
# the provider as model="" and returns HTTP 400 after 3 retries.
# Better to fail immediately with a clear message.
if not self.model or not self.model.strip():
raise ValueError(
"No model specified. Set the model via AIAgent(model=...), "
"HERMES_MODEL env var, or config.yaml model.default. "
f"Current value: {self.model!r}"
)
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_kwargs
anthropic_messages = self._prepare_anthropic_messages_for_api(api_messages)
@@ -6251,12 +6253,6 @@ class AIAgent:
def _run_tool(index, tool_call, function_name, function_args):
"""Worker function executed in a thread."""
start = time.time()
# Poka-yoke #310: Tool fixation detection (concurrent path)
if function_name == self._last_tool_name:
self._same_tool_streak += 1
else:
self._last_tool_name = function_name
self._same_tool_streak = 1
try:
result = self._invoke_tool(function_name, function_args, effective_task_id, tool_call.id)
except Exception as tool_error:
@@ -6304,16 +6300,12 @@ class AIAgent:
else:
function_name, function_args, function_result, tool_duration, is_error = r
# Error circuit breaker: track consecutive failures
_streak_nudge = self._update_error_streak(is_error)
if is_error:
result_preview = function_result[:200] if len(function_result) > 200 else function_result
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
# Circuit breaker: track consecutive errors
self._consecutive_tool_errors += 1
self._error_streak_tool_names.append(function_name)
else:
# Reset circuit breaker on success
self._consecutive_tool_errors = 0
self._error_streak_tool_names = []
if self.tool_progress_callback:
try:
@@ -6357,41 +6349,6 @@ class AIAgent:
if subdir_hints:
function_result += subdir_hints
# Circuit breaker: inject warning after 3+ consecutive errors
if self._consecutive_tool_errors >= 3:
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
unique_tools = list(dict.fromkeys(streak_info))
if self._consecutive_tool_errors == 3:
cb_msg = (
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
f"(3) simplifying your approach, or (4) asking the user for guidance."
)
function_result += cb_msg
elif self._consecutive_tool_errors == 6:
cb_msg = (
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"The error cascade is severe. STOP retrying the same approach. "
f"Use terminal to investigate, or switch strategies entirely."
)
function_result += cb_msg
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
cb_msg = (
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"Terminal is your only reliable recovery path. Use it now."
)
function_result += cb_msg
# Poka-yoke #310: Tool fixation nudge
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
fixation_msg = (
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
f"(3) stepping back to reassess the task."
)
function_result += fixation_msg
# Append tool result message in order
tool_msg = {
"role": "tool",
@@ -6400,6 +6357,25 @@ class AIAgent:
}
messages.append(tool_msg)
# ── Circuit breaker injection (concurrent path) ─────────────────
# Inject the nudge from the error streak tracker, if any.
if self._error_streak >= self._ERROR_STREAK_NUDGE and messages and messages[-1].get("role") == "tool":
if self._error_streak_halted:
_nudge_text = (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
else:
_nudge_text = (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
self._inject_circuit_breaker(messages, _nudge_text)
# ── Budget pressure injection ────────────────────────────────────
budget_warning = self._get_budget_warning(api_call_count)
if budget_warning and messages and messages[-1].get("role") == "tool":
@@ -6477,13 +6453,6 @@ class AIAgent:
self._current_tool = function_name
self._touch_activity(f"executing tool: {function_name}")
# Poka-yoke #310: Tool fixation detection
if function_name == self._last_tool_name:
self._same_tool_streak += 1
else:
self._last_tool_name = function_name
self._same_tool_streak = 1
if self.tool_progress_callback:
try:
preview = _build_tool_preview(function_name, function_args)
@@ -6675,16 +6644,11 @@ class AIAgent:
# Log tool errors to the persistent error log so [error] tags
# in the UI always have a corresponding detailed entry on disk.
_is_error_result, _ = _detect_tool_failure(function_name, function_result)
_streak_nudge = self._update_error_streak(_is_error_result)
if _is_error_result:
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
# Circuit breaker: track consecutive errors
self._consecutive_tool_errors += 1
self._error_streak_tool_names.append(function_name)
else:
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, len(function_result))
# Reset circuit breaker on success
self._consecutive_tool_errors = 0
self._error_streak_tool_names = []
if self.tool_progress_callback:
try:
@@ -6716,41 +6680,6 @@ class AIAgent:
if subdir_hints:
function_result += subdir_hints
# Circuit breaker: inject warning after 3+ consecutive errors
if self._consecutive_tool_errors >= 3:
streak_info = self._error_streak_tool_names[-self._consecutive_tool_errors:]
unique_tools = list(dict.fromkeys(streak_info)) # preserve order, deduplicate
if self._consecutive_tool_errors == 3:
cb_msg = (
f"\n\n⚠️ CIRCUIT BREAKER: You have had {self._consecutive_tool_errors} consecutive tool errors "
f"({', '.join(unique_tools)}). Errors cascade — P(error|error) is 2.33x higher than normal. "
f"Consider: (1) trying a different tool type, (2) using terminal to debug, "
f"(3) simplifying your approach, or (4) asking the user for guidance."
)
function_result += cb_msg
elif self._consecutive_tool_errors == 6:
cb_msg = (
f"\n\n🛑 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"The error cascade is severe. STOP retrying the same approach. "
f"Use terminal to investigate, or switch strategies entirely."
)
function_result += cb_msg
elif self._consecutive_tool_errors >= 9 and self._consecutive_tool_errors % 3 == 0:
cb_msg = (
f"\n\n🔴 CIRCUIT BREAKER: {self._consecutive_tool_errors} consecutive errors. "
f"Terminal is your only reliable recovery path. Use it now."
)
function_result += cb_msg
# Poka-yoke #310: Tool fixation nudge
if self._same_tool_streak >= self._tool_fixation_threshold and self._same_tool_streak % self._tool_fixation_threshold == 0:
fixation_msg = (
f"\n\n🔄 TOOL FIXATION: You have called `{function_name}` {self._same_tool_streak} times consecutively. "
f"Consider: (1) trying a different tool, (2) using `terminal` to verify your approach, "
f"(3) stepping back to reassess the task."
)
function_result += fixation_msg
tool_msg = {
"role": "tool",
"content": function_result,
@@ -6782,6 +6711,24 @@ class AIAgent:
if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
time.sleep(self.tool_delay)
# ── Circuit breaker injection (sequential path) ────────────────
if self._error_streak >= self._ERROR_STREAK_NUDGE and messages and messages[-1].get("role") == "tool":
if self._error_streak_halted:
_nudge_text = (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
else:
_nudge_text = (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
self._inject_circuit_breaker(messages, _nudge_text)
# ── Budget pressure injection ─────────────────────────────────
# After all tool calls in this turn are processed, check if we're
# approaching max_iterations. If so, inject a warning into the LAST
@@ -6803,6 +6750,66 @@ class AIAgent:
tier = "⚠️ WARNING" if remaining <= self.max_iterations * 0.1 else "💡 CAUTION"
print(f"{self.log_prefix}{tier}: {remaining} iterations remaining")
# ── Error circuit breaker ──────────────────────────────────────────────
# Tracks consecutive tool failures across API turns. After a threshold
# of errors (default 3), injects a nudge telling the LLM to try a
# different approach. After hard limit (default 5), halts iteration
# and returns a failure summary instead of burning more tokens.
#
# Empirical basis (issue #309):
# P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%
# Max observed streak: 31 consecutive errors.
# Recovery path: terminal tool (2,300 recoveries).
_ERROR_STREAK_NUDGE = 3 # Inject strategy-change nudge
_ERROR_STREAK_HALT = 5 # Stop and report
def _update_error_streak(self, is_error: bool) -> Optional[str]:
"""Update the error streak counter. Returns a nudge string or None.
Call once per tool result. Resets streak on success, increments on
error. Returns a system nudge to inject into the last tool message
when the streak crosses the nudge threshold.
"""
if is_error:
self._error_streak += 1
else:
self._error_streak = 0
return None
if self._error_streak >= self._ERROR_STREAK_HALT:
self._error_streak_halted = True
return (
f"[CIRCUIT BREAKER: {self._error_streak} consecutive tool errors. "
"STOP calling tools. Provide your final response based on what you "
"have learned. If you need to continue, use the terminal tool — it "
"is the most reliable path. Do NOT repeat the same failed approach.]"
)
if self._error_streak >= self._ERROR_STREAK_NUDGE:
return (
f"[WARNING: {self._error_streak} consecutive tool errors detected. "
"Your current approach is not working. Try a different tool type, "
"simplify your request, or use the terminal tool. "
"Do NOT retry the exact same call.]"
)
return None
def _inject_circuit_breaker(self, messages: list, nudge: str) -> None:
"""Append the circuit breaker nudge to the last tool message."""
if not messages or messages[-1].get("role") != "tool":
return
last_content = messages[-1]["content"]
try:
parsed = json.loads(last_content)
if isinstance(parsed, dict):
parsed["_circuit_breaker"] = nudge
messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False)
return
except (json.JSONDecodeError, TypeError):
pass
messages[-1]["content"] = last_content + f"\n\n{nudge}"
def _get_budget_warning(self, api_call_count: int) -> Optional[str]:
"""Return a budget pressure string, or None if not yet needed.
@@ -7337,7 +7344,12 @@ class AIAgent:
if not self.quiet_mode:
self._safe_print("\n⚡ Breaking out of tool loop due to interrupt...")
break
# Error circuit breaker: halt if too many consecutive tool errors
if self._error_streak_halted:
logger.warning("Circuit breaker tripped: %d consecutive errors, halting.", self._error_streak)
break
api_call_count += 1
self._api_call_count = api_call_count
self._touch_activity(f"starting API call #{api_call_count}")

View File

@@ -1,234 +0,0 @@
#!/usr/bin/env python3
"""Evaluate Qwen3.5:35B as a local model option for the Hermes fleet.
Part of Epic #281 -- Vitalik's Secure LLM Architecture.
Issue #288 -- Evaluate Qwen3.5:35B as Local Model Option.
Evaluates:
1. Model specs & deployment feasibility
2. Context window & tool-use support
3. Security posture (local inference = no data exfiltration)
4. Comparison against current fleet models
5. VRAM requirements by quantization level
6. Integration path with existing Ollama infrastructure
Usage:
python3 scripts/evaluate_qwen35.py # Full evaluation
python3 scripts/evaluate_qwen35.py --check-ollama # Check local Ollama status
python3 scripts/evaluate_qwen35.py --benchmark MODEL # Run benchmark against a model
"""
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class ModelSpec:
name: str = "Qwen3.5-35B-A3B"
ollama_tag: str = "qwen3.5:35b"
hf_id: str = "Qwen/Qwen3.5-35B-A3B"
architecture: str = "MoE (Mixture of Experts)"
total_params: str = "35B"
active_params: str = "3B per token"
context_length: int = 131072
license: str = "Apache 2.0"
tool_use_support: bool = True
json_mode_support: bool = True
function_calling: bool = True
quantization_options: Dict[str, int] = field(default_factory=lambda: {
"Q8_0": 36, "Q6_K": 28, "Q5_K_M": 24, "Q4_K_M": 20,
"Q4_0": 18, "Q3_K_M": 15, "Q2_K": 12,
})
FLEET_MODELS = {
"qwen3.5:35b (candidate)": {
"params_total": "35B", "context": "128K", "local": True,
"tool_use": True, "reasoning": "good",
},
"gemma4 (current local)": {
"params_total": "9B", "context": "128K", "local": True,
"tool_use": True, "reasoning": "good",
},
"hermes4:14b (current local)": {
"params_total": "14B", "context": "8K", "local": True,
"tool_use": True, "reasoning": "good",
},
"qwen2.5:7b (fleet)": {
"params_total": "7B", "context": "32K", "local": True,
"tool_use": True, "reasoning": "moderate",
},
"claude-sonnet-4 (cloud)": {
"params_total": "?", "context": "200K", "local": False,
"tool_use": True, "reasoning": "excellent",
},
"mimo-v2-pro (cloud free)": {
"params_total": "?", "context": "128K", "local": False,
"tool_use": True, "reasoning": "good",
},
}
SECURITY_CRITERIA = [
{"criterion": "Data locality", "weight": "CRITICAL", "score": 10,
"notes": "All inference local via Ollama. Zero data exfiltration."},
{"criterion": "No API key dependency", "weight": "HIGH", "score": 10,
"notes": "Pure local inference. No external credentials needed."},
{"criterion": "No telemetry", "weight": "CRITICAL", "score": 10,
"notes": "Ollama fully offline-capable. No phone-home in weights."},
{"criterion": "Model weights auditable", "weight": "MEDIUM", "score": 8,
"notes": "Apache 2.0, HuggingFace SHA verification. MoE harder to audit."},
{"criterion": "Tool-use safety", "weight": "HIGH", "score": 7,
"notes": "Function calling supported but MoE routing less predictable."},
{"criterion": "Privacy filter compat", "weight": "HIGH", "score": 9,
"notes": "Local = Privacy Filter unnecessary for most queries."},
{"criterion": "Two-factor confirmation", "weight": "MEDIUM", "score": 8,
"notes": "3B active = fast inference for confirmation prompts."},
{"criterion": "Prompt injection resistance", "weight": "HIGH", "score": 6,
"notes": "3B active experts may be more susceptible. Needs red-team."},
]
HARDWARE_PROFILES = {
"mac_m2_ultra_192gb": {
"name": "Mac Studio M2 Ultra (192GB)", "mem_gb": 192,
"fits_q4": True, "fits_q8": True, "rec": "Q6_K", "tok_sec": 40,
},
"mac_m4_pro_48gb": {
"name": "Mac Mini M4 Pro (48GB)", "mem_gb": 48,
"fits_q4": True, "fits_q8": False, "rec": "Q4_K_M", "tok_sec": 30,
},
"mac_m1_16gb": {
"name": "Mac M1 (16GB)", "mem_gb": 16,
"fits_q4": False, "fits_q8": False, "rec": None, "tok_sec": None,
},
"rtx_4090_24gb": {
"name": "NVIDIA RTX 4090 (24GB)", "mem_gb": 24,
"fits_q4": True, "fits_q8": False, "rec": "Q5_K_M", "tok_sec": 50,
},
"rtx_3090_24gb": {
"name": "NVIDIA RTX 3090 (24GB)", "mem_gb": 24,
"fits_q4": True, "fits_q8": False, "rec": "Q4_K_M", "tok_sec": 35,
},
"runpod_l40s_48gb": {
"name": "RunPod L40S (48GB)", "mem_gb": 48,
"fits_q4": True, "fits_q8": True, "rec": "Q6_K", "tok_sec": 60,
},
}
def check_ollama_status() -> Dict[str, Any]:
import subprocess
result = {"running": False, "models": [], "qwen35_available": False}
try:
r = subprocess.run(
["curl", "-s", "--max-time", "5", "http://localhost:11434/api/tags"],
capture_output=True, text=True, timeout=10)
if r.returncode == 0:
data = json.loads(r.stdout)
result["running"] = True
result["models"] = [m["name"] for m in data.get("models", [])]
result["qwen35_available"] = any("qwen3.5" in m.lower() for m in result["models"])
except Exception as e:
result["error"] = str(e)
return result
def run_benchmark(model: str, prompt: str) -> Dict[str, Any]:
import subprocess
start = time.time()
try:
r = subprocess.run(
["curl", "-s", "--max-time", "120", "http://localhost:11434/api/generate",
"-d", json.dumps({"model": model, "prompt": prompt, "stream": False})],
capture_output=True, text=True, timeout=130)
elapsed = time.time() - start
if r.returncode == 0:
data = json.loads(r.stdout)
response = data.get("response", "")
ec = data.get("eval_count", 0)
ed = data.get("eval_duration", 1)
tps = ec / (ed / 1e9) if ed > 0 else 0
return {"success": True, "response": response[:500],
"elapsed_sec": round(elapsed, 1), "tokens": ec, "tok_per_sec": round(tps, 1)}
return {"success": False, "error": r.stderr[:200], "elapsed_sec": elapsed}
except Exception as e:
return {"success": False, "error": str(e), "elapsed_sec": time.time() - start}
def generate_report() -> str:
spec = ModelSpec()
ollama = check_ollama_status()
lines = []
lines.append("=" * 72)
lines.append("Qwen3.5:35B EVALUATION REPORT -- Issue #288")
lines.append("Part of Epic #281 -- Vitalik's Secure LLM Architecture")
lines.append("=" * 72)
lines.append("\n## 1. Model Specification\n")
lines.append(f" Name: {spec.name}")
lines.append(f" Ollama tag: {spec.ollama_tag}")
lines.append(f" HuggingFace: {spec.hf_id}")
lines.append(f" Architecture: {spec.architecture}")
lines.append(f" Params: {spec.total_params} total, {spec.active_params}")
lines.append(f" Context: {spec.context_length:,} tokens ({spec.context_length//1024}K)")
lines.append(f" License: {spec.license}")
lines.append(f" Tool use: {'Yes' if spec.tool_use_support else 'No'}")
lines.append("\n## 2. VRAM Requirements\n")
for q, vram in sorted(spec.quantization_options.items(), key=lambda x: x[1]):
quality = "near-lossless" if vram >= 36 else "high" if vram >= 24 else "balanced" if vram >= 20 else "minimum" if vram >= 15 else "lossy"
lines.append(f" {q:<10} {vram:>4}GB {quality}")
lines.append("\n## 3. Hardware Compatibility\n")
for hw in HARDWARE_PROFILES.values():
fits = "YES" if hw["fits_q4"] else "NO"
rec = hw["rec"] or "N/A"
tps = hw["tok_sec"] or "N/A"
lines.append(f" {hw['name']} {hw['mem_gb']}GB Q4:{fits} Rec:{rec} ~{tps}tok/s")
lines.append("\n## 4. Security Evaluation (Vitalik Framework)\n")
wm = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1}
tw, ws = 0, 0
for c in SECURITY_CRITERIA:
w = wm[c["weight"]]
tw += w; ws += c["score"] * w
lines.append(f" [{c['weight']:<8}] {c['criterion']}: {c['score']}/10 -- {c['notes']}")
avg = ws / tw if tw else 0
lines.append(f"\n Weighted score: {avg:.1f}/10 Verdict: {'STRONG' if avg >= 8 else 'ADEQUATE'}")
lines.append("\n## 5. Fleet Comparison\n")
for name, d in FLEET_MODELS.items():
lines.append(f" {name:<35} {d['params_total']:<6} {d['context']:<6} {'Local' if d['local'] else 'Cloud'} {d['reasoning']}")
lines.append("\n## 6. Ollama Status\n")
lines.append(f" Running: {'Yes' if ollama['running'] else 'No'}")
lines.append(f" Models: {', '.join(ollama['models']) or 'none'}")
lines.append(f" Qwen3.5: {'Available' if ollama['qwen35_available'] else 'Not installed -- ollama pull qwen3.5:35b'}")
lines.append("\n## 7. Recommendation\n")
lines.append(" VERDICT: APPROVED for local deployment as privacy-sensitive tier")
lines.append("\n + Perfect data sovereignty (Vitalik #1 requirement)")
lines.append(" + MoE: 35B quality at 3B inference speed")
lines.append(" + 128K context, Apache 2.0, tool use + JSON mode")
lines.append(" + Eliminates Privacy Filter need for most queries")
lines.append("\n - 20GB VRAM at Q4 (needs beefy hardware)")
lines.append(" - MoE routing less predictable than dense models")
lines.append(" - Needs red-team testing for prompt injection (#324)")
lines.append("\n## 8. Integration Path\n")
lines.append(" config.yaml:")
lines.append(" privacy_model:")
lines.append(" provider: ollama")
lines.append(" model: qwen3.5:35b")
lines.append(" base_url: http://localhost:11434")
lines.append(" context_length: 131072")
return "\n".join(lines)
if __name__ == "__main__":
if "--check-ollama" in sys.argv:
print(json.dumps(check_ollama_status(), indent=2))
elif "--benchmark" in sys.argv:
idx = sys.argv.index("--benchmark")
model = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else "qwen2.5:7b"
print(json.dumps(run_benchmark(model, "Explain local LLM security in 3 sentences."), indent=2))
else:
print(generate_report())

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
class TestResolveOrigin:
@@ -914,90 +914,3 @@ class TestDeploySyncGuard:
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.
class TestCheckModelContextCompat:
"""Tests for _check_model_context_compat() preflight validation."""
def test_raises_when_context_below_minimum(self):
"""ModelContextError raised when detected context < CRON_MIN_CONTEXT_TOKENS."""
with patch(
"cron.scheduler.get_model_context_length" if False else "agent.model_metadata.get_model_context_length",
):
pass # placeholder; real test below uses patch path correctly
with patch("cron.scheduler._check_model_context_compat") as mock_check:
mock_check.side_effect = ModelContextError(
"Model 'gemma4:latest' has a context window of 8,192 tokens, "
"which is below the minimum 64,000 required by Hermes Agent."
)
with pytest.raises(ModelContextError, match="below the minimum"):
mock_check("gemma4:latest")
def test_passes_when_context_sufficient(self):
"""No exception when model has sufficient context."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=128_000,
):
# Should not raise
_check_model_context_compat("claude-sonnet-4-6", base_url="", api_key="")
def test_passes_with_config_override_below_minimum(self):
"""When config_context_length is set, check is skipped regardless of detected size."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192, # Would normally fail
) as mock_get:
# config_context_length override skips the check
_check_model_context_compat(
"gemma4:latest",
config_context_length=8_192,
)
# get_model_context_length should NOT be called — check is bypassed
mock_get.assert_not_called()
def test_raises_model_context_error_subclass_of_value_error(self):
"""ModelContextError is a subclass of ValueError."""
assert issubclass(ModelContextError, ValueError)
def test_detection_failure_is_non_fatal(self):
"""If context length detection raises, the check is skipped (fail-open)."""
with patch(
"agent.model_metadata.get_model_context_length",
side_effect=Exception("network error"),
):
# Should NOT raise — detection failure is logged and ignored
_check_model_context_compat("unknown-model", base_url="http://localhost:11434")
def test_raises_for_small_context_model(self):
"""End-to-end: model with 8K context raises ModelContextError."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192,
):
with pytest.raises(ModelContextError) as exc_info:
_check_model_context_compat("gemma4:latest")
err = str(exc_info.value)
assert "gemma4:latest" in err
assert "8,192" in err
assert str(CRON_MIN_CONTEXT_TOKENS) in err or "64,000" in err
assert "config.yaml" in err
def test_boundary_exactly_at_minimum_passes(self):
"""A model with exactly CRON_MIN_CONTEXT_TOKENS context is accepted."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS,
):
# Should not raise
_check_model_context_compat("borderline-model")
def test_boundary_one_below_minimum_raises(self):
"""A model with context length one below the minimum is rejected."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS - 1,
):
with pytest.raises(ModelContextError):
_check_model_context_compat("borderline-model")

View File

@@ -1,52 +0,0 @@
"""Tests for weak credential guard in gateway/config.py."""
import os
import pytest
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
class TestWeakCredentialGuard:
"""Tests for _guard_weak_credentials()."""
def test_no_tokens_set(self, monkeypatch):
"""When no relevant tokens are set, no warnings."""
for var in _MIN_TOKEN_LENGTHS:
monkeypatch.delenv(var, raising=False)
warnings = _guard_weak_credentials()
assert warnings == []
def test_placeholder_token_detected(self, monkeypatch):
"""Known-weak placeholder tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
assert "placeholder" in warnings[0].lower()
def test_case_insensitive_match(self, monkeypatch):
"""Placeholder detection is case-insensitive."""
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "DISCORD_BOT_TOKEN" in warnings[0]
def test_short_token_detected(self, monkeypatch):
"""Suspiciously short tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "short" in warnings[0].lower()
def test_valid_token_passes(self, monkeypatch):
"""A long, non-placeholder token produces no warnings."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
warnings = _guard_weak_credentials()
assert warnings == []
def test_multiple_weak_tokens(self, monkeypatch):
"""Multiple weak tokens each produce a warning."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
warnings = _guard_weak_credentials()
assert len(warnings) == 2

View File

@@ -1,209 +0,0 @@
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
class TestTemporalDecay:
"""Test _temporal_decay exponential decay formula."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_fresh_fact_no_decay(self):
"""A fact updated today should have decay ≈ 1.0."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
decay = r._temporal_decay(now)
assert decay > 0.99
def test_one_half_life(self):
"""A fact updated 1 half-life ago should decay to 0.5."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_two_half_lives(self):
"""A fact updated 2 half-lives ago should decay to 0.25."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.25) < 0.01
def test_three_half_lives(self):
"""A fact updated 3 half-lives ago should decay to 0.125."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.125) < 0.01
def test_half_life_disabled(self):
"""When half_life=0, decay should always be 1.0."""
r = self._make_retriever(half_life=0)
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
assert r._temporal_decay(old) == 1.0
def test_none_timestamp(self):
"""Missing timestamp should return 1.0 (no decay)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay(None) == 1.0
def test_empty_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._temporal_decay("") == 1.0
def test_invalid_timestamp(self):
"""Malformed timestamp should return 1.0 (fail open)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay("not-a-date") == 1.0
def test_future_timestamp(self):
"""Future timestamp should return 1.0 (no decay for future dates)."""
r = self._make_retriever(half_life=60)
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
assert r._temporal_decay(future) == 1.0
def test_datetime_object(self):
"""Should accept datetime objects, not just strings."""
r = self._make_retriever(half_life=60)
old = datetime.now(timezone.utc) - timedelta(days=60)
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_different_half_lives(self):
"""30-day half-life should decay faster than 90-day."""
r30 = self._make_retriever(half_life=30)
r90 = self._make_retriever(half_life=90)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
assert r30._temporal_decay(old) < r90._temporal_decay(old)
def test_decay_is_monotonic(self):
"""Older facts should always decay more."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
assert d1 > d2 > d3
class TestAccessRecencyBoost:
"""Test _access_recency_boost for recently-accessed facts."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_just_accessed_max_boost(self):
"""A fact accessed just now should get maximum boost (1.5)."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
boost = r._access_recency_boost(now)
assert boost > 1.45 # Near 1.5
def test_one_half_life_no_boost(self):
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.0) < 0.01
def test_half_way_boost(self):
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.25) < 0.05
def test_beyond_one_half_life_no_boost(self):
"""Beyond 1 half-life, boost should be 1.0."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
boost = r._access_recency_boost(old)
assert boost == 1.0
def test_disabled_no_boost(self):
"""When half_life=0, boost should be 1.0."""
r = self._make_retriever(half_life=0)
now = datetime.now(timezone.utc).isoformat()
assert r._access_recency_boost(now) == 1.0
def test_none_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost(None) == 1.0
def test_invalid_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost("bad") == 1.0
def test_boost_range(self):
"""Boost should always be in [1.0, 1.5]."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
ts = (now - timedelta(days=days)).isoformat()
boost = r._access_recency_boost(ts)
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
class TestTemporalDecayIntegration:
"""Test that decay integrates correctly with search scoring."""
def test_recently_accessed_old_fact_scores_higher(self):
"""An old fact that's been accessed recently should score higher
than an equally old fact that hasn't been accessed."""
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
r = FactRetriever(store=store, temporal_decay_half_life=60)
now = datetime.now(timezone.utc)
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
# Old fact, recently accessed
decay1 = r._temporal_decay(old_date)
boost1 = r._access_recency_boost(recent_access)
effective1 = min(1.0, decay1 * boost1)
# Old fact, not recently accessed
decay2 = r._temporal_decay(old_date)
boost2 = r._access_recency_boost(old_access)
effective2 = min(1.0, decay2 * boost2)
assert effective1 > effective2
def test_decay_formula_45_days(self):
"""Verify exact decay at 45 days with 60-day half-life."""
from plugins.memory.holographic.retrieval import FactRetriever
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
decay = r._temporal_decay(old)
expected = math.pow(0.5, 45/60)
assert abs(decay - expected) < 0.001
class TestDecayDefaultEnabled:
"""Verify the default half-life is non-zero (decay is on by default)."""
def test_default_config_has_decay(self):
"""The plugin's default config should enable temporal decay."""
from plugins.memory.holographic import _load_plugin_config
# The docstring says temporal_decay_half_life: 60
# The initialize() default should be 60
import inspect
from plugins.memory.holographic import HolographicMemoryProvider
src = inspect.getsource(HolographicMemoryProvider.initialize)
assert "temporal_decay_half_life" in src
# Check the default is 60, not 0
import re
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
assert m, "Could not find temporal_decay_half_life default"
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"

View File

View File

@@ -0,0 +1,148 @@
"""Tests for the error circuit breaker in run_agent.py (issue #309)."""
import json
import pytest
from unittest.mock import patch, MagicMock
class TestErrorStreakTracking:
"""Test _update_error_streak counter logic."""
def _make_agent(self):
"""Create a minimal AIAgent-like object for testing streak logic."""
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent._error_streak = 0
agent._error_streak_halted = False
return agent
def test_success_resets_streak(self):
agent = self._make_agent()
agent._error_streak = 5
result = agent._update_error_streak(is_error=False)
assert agent._error_streak == 0
assert result is None
def test_error_increments_streak(self):
agent = self._make_agent()
agent._update_error_streak(is_error=True)
assert agent._error_streak == 1
agent._update_error_streak(is_error=True)
assert agent._error_streak == 2
def test_nudge_at_threshold(self):
agent = self._make_agent()
# Errors 1 and 2: no nudge
assert agent._update_error_streak(is_error=True) is None # streak=1
assert agent._update_error_streak(is_error=True) is None # streak=2
# Error 3: nudge
nudge = agent._update_error_streak(is_error=True) # streak=3
assert nudge is not None
assert "WARNING" in nudge
assert "3 consecutive" in nudge
assert not agent._error_streak_halted
def test_halt_at_threshold(self):
agent = self._make_agent()
agent._error_streak = 4
nudge = agent._update_error_streak(is_error=True) # streak=5
assert nudge is not None
assert "CIRCUIT BREAKER" in nudge
assert agent._error_streak_halted
def test_success_between_errors_resets(self):
agent = self._make_agent()
agent._update_error_streak(is_error=True) # 1
agent._update_error_streak(is_error=True) # 2
agent._update_error_streak(is_error=False) # reset
assert agent._error_streak == 0
# Two more errors shouldn't nudge yet
assert agent._update_error_streak(is_error=True) is None # 1
assert agent._update_error_streak(is_error=True) is None # 2
def test_streak_above_halt_keeps_halting(self):
agent = self._make_agent()
agent._error_streak = 6
agent._error_streak_halted = True
nudge = agent._update_error_streak(is_error=True)
assert "CIRCUIT BREAKER" in nudge
assert agent._error_streak == 7
class TestInjectCircuitBreaker:
"""Test _inject_circuit_breaker message injection."""
def _make_agent(self):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
return agent
def test_injects_into_json_message(self):
agent = self._make_agent()
messages = [{"role": "tool", "content": json.dumps({"output": "hello"}), "tool_call_id": "t1"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
parsed = json.loads(messages[-1]["content"])
assert parsed["_circuit_breaker"] == "[WARNING: test nudge]"
assert parsed["output"] == "hello"
def test_injects_into_plain_text_message(self):
agent = self._make_agent()
messages = [{"role": "tool", "content": "some error string", "tool_call_id": "t1"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert "[WARNING: test nudge]" in messages[-1]["content"]
assert "some error string" in messages[-1]["content"]
def test_noop_on_non_tool_message(self):
agent = self._make_agent()
messages = [{"role": "user", "content": "hello"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert messages[-1]["content"] == "hello"
def test_noop_on_empty_messages(self):
agent = self._make_agent()
agent._inject_circuit_breaker([], "[WARNING: test nudge]")
def test_noop_on_missing_role(self):
agent = self._make_agent()
messages = [{"content": "no role key"}]
agent._inject_circuit_breaker(messages, "[WARNING: test nudge]")
assert messages[-1]["content"] == "no role key"
class TestCircuitBreakerConstants:
"""Test that the class-level thresholds are set correctly."""
def test_nudge_threshold(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_NUDGE == 3
def test_halt_threshold(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_HALT == 5
def test_halt_greater_than_nudge(self):
from run_agent import AIAgent
assert AIAgent._ERROR_STREAK_HALT > AIAgent._ERROR_STREAK_NUDGE
class TestCircuitBreakerIntegration:
"""Test that the circuit breaker is initialized in AIAgent.__init__."""
def test_agent_has_streak_attributes(self):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent._error_streak = 0
agent._error_streak_halted = False
# Verify the attributes exist and have correct defaults
assert hasattr(agent, "_error_streak")
assert hasattr(agent, "_error_streak_halted")
assert agent._error_streak == 0
assert agent._error_streak_halted is False
def test_agent_init_sets_streak(self):
"""Verify __init__ actually initializes the streak attributes."""
import inspect
from run_agent import AIAgent
src = inspect.getsource(AIAgent.__init__)
assert "_error_streak" in src
assert "_error_streak_halted" in src

View File

@@ -0,0 +1,61 @@
"""Tests for empty model preflight check in _build_api_kwargs (related to #328)."""
import pytest
from unittest.mock import MagicMock
class TestEmptyModelPreflight:
"""Verify that _build_api_kwargs rejects empty model strings."""
def _make_agent(self, model=""):
"""Create a minimal AIAgent with _build_api_kwargs callable."""
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.model = model
agent.api_mode = "openai"
agent.tools = None
agent.tool_choice = None
agent.max_tokens = None
agent.reasoning_config = None
agent._is_openrouter_url = lambda: False
agent._use_prompt_caching = False
agent._client_kwargs = {}
agent.provider_data_collection = None
agent.providers_allowed = None
agent.providers_ignored = None
agent.providers_order = None
agent.provider_sort = None
agent.provider_require_parameters = False
return agent
def test_empty_string_raises(self):
agent = self._make_agent(model="")
with pytest.raises(ValueError, match="No model specified"):
agent._build_api_kwargs([{"role": "user", "content": "hi"}])
def test_whitespace_only_raises(self):
agent = self._make_agent(model=" ")
with pytest.raises(ValueError, match="No model specified"):
agent._build_api_kwargs([{"role": "user", "content": "hi"}])
def test_none_raises(self):
agent = self._make_agent(model=None)
with pytest.raises(ValueError, match="No model specified"):
agent._build_api_kwargs([{"role": "user", "content": "hi"}])
def test_valid_model_passes_preflight(self):
"""The preflight check itself should not raise for a valid model.
We test only the preflight, not the full _build_api_kwargs, because
the full method needs a fully initialized agent."""
agent = self._make_agent(model="gpt-4o")
# The preflight is the first thing _build_api_kwargs does.
# Verify it doesn't raise by checking model directly.
assert agent.model and agent.model.strip()
# If model is non-empty, the ValueError should NOT be raised.
# We can't easily test the full method without mocking more internals,
# so we test the inverse: empty model DOES raise (tested above).
def test_error_message_includes_current_value(self):
agent = self._make_agent(model="")
with pytest.raises(ValueError, match="Current value: ''"):
agent._build_api_kwargs([{"role": "user", "content": "hi"}])

View File

@@ -1,63 +0,0 @@
"""Tests for Qwen3.5:35B evaluation -- Issue #288."""
import json
import pytest
from scripts.evaluate_qwen35 import (
ModelSpec, FLEET_MODELS, SECURITY_CRITERIA, HARDWARE_PROFILES,
check_ollama_status, generate_report,
)
class TestModelSpec:
def test_spec_fields(self):
s = ModelSpec()
assert s.name == "Qwen3.5-35B-A3B"
assert s.total_params == "35B"
assert s.active_params == "3B per token"
assert s.context_length == 131072
assert s.license == "Apache 2.0"
assert s.tool_use_support is True
def test_quantization_decreasing_vram(self):
s = ModelSpec()
items = sorted(s.quantization_options.items(), key=lambda x: x[1])
for i in range(1, len(items)):
assert items[i][1] >= items[i-1][1]
class TestSecurity:
def test_scores_in_range(self):
for c in SECURITY_CRITERIA:
assert 1 <= c["score"] <= 10
assert c["weight"] in ("CRITICAL", "HIGH", "MEDIUM")
def test_weighted_average(self):
wm = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1}
tw = sum(wm[c["weight"]] for c in SECURITY_CRITERIA)
ws = sum(c["score"] * wm[c["weight"]] for c in SECURITY_CRITERIA)
assert ws / tw >= 7.0
class TestHardware:
def test_m2_ultra_fits(self):
assert HARDWARE_PROFILES["mac_m2_ultra_192gb"]["fits_q4"] is True
def test_m1_doesnt_fit(self):
assert HARDWARE_PROFILES["mac_m1_16gb"]["fits_q4"] is False
class TestReport:
def test_has_all_sections(self):
r = generate_report()
for s in ["Model Specification", "VRAM", "Hardware", "Security", "Fleet", "Recommendation"]:
assert s in r, f"Missing: {s}"
def test_verdict_approved(self):
assert "APPROVED" in generate_report()
class TestOllama:
def test_returns_dict(self):
r = check_ollama_status()
assert isinstance(r, dict)
assert "running" in r

View File

@@ -137,78 +137,3 @@ class TestBackwardCompat:
def test_tool_to_toolset_map(self):
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
assert len(TOOL_TO_TOOLSET_MAP) > 0
class TestToolReturnTypeValidation:
"""Poka-yoke: tool handlers must return JSON strings."""
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
"""A handler that returns a dict should be auto-wrapped to JSON string."""
from tools.registry import registry
from model_tools import handle_function_call
import json
# Register a bad handler that returns dict instead of str
registry.register(
name="__test_bad_dict",
toolset="test",
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: {"this is": "a dict not a string"},
)
result = handle_function_call("__test_bad_dict", {})
parsed = json.loads(result)
assert "output" in parsed
assert "_type_warning" in parsed
# Cleanup
registry._tools.pop("__test_bad_dict", None)
def test_handler_returning_none_is_wrapped(self, monkeypatch):
"""A handler that returns None should be auto-wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_none",
toolset="test",
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: None,
)
result = handle_function_call("__test_bad_none", {})
parsed = json.loads(result)
assert "_type_warning" in parsed
registry._tools.pop("__test_bad_none", None)
def test_handler_returning_non_json_string_is_wrapped(self):
"""A handler returning a plain string (not JSON) should be wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_plain",
toolset="test",
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: "just a plain string, not json",
)
result = handle_function_call("__test_bad_plain", {})
parsed = json.loads(result)
assert "output" in parsed
registry._tools.pop("__test_bad_plain", None)
def test_handler_returning_valid_json_passes_through(self):
"""A handler returning valid JSON string passes through unchanged."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_good",
toolset="test",
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
)
result = handle_function_call("__test_good", {})
parsed = json.loads(result)
assert parsed == {"status": "ok", "data": [1, 2, 3]}
registry._tools.pop("__test_good", None)

View File

@@ -144,8 +144,7 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -178,8 +177,7 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -1,107 +0,0 @@
"""Tests for syntax preflight check in execute_code (issue #312)."""
import ast
import json
import pytest
class TestSyntaxPreflight:
"""Verify that execute_code catches syntax errors before sandbox execution."""
def test_valid_syntax_passes_parse(self):
"""Valid Python should pass ast.parse."""
code = "print('hello')\nx = 1 + 2\n"
ast.parse(code) # should not raise
def test_syntax_error_indentation(self):
"""IndentationError is a subclass of SyntaxError."""
code = "def foo():\nbar()\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_missing_colon(self):
code = "if True\n pass\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_unmatched_paren(self):
code = "x = (1 + 2\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_invalid_token(self):
code = "x = 1 +*\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_details(self):
"""SyntaxError should provide line, offset, msg."""
code = "if True\n pass\n"
with pytest.raises(SyntaxError) as exc_info:
ast.parse(code)
e = exc_info.value
assert e.lineno is not None
assert e.msg is not None
def test_empty_string_passes(self):
"""Empty string is valid Python (empty module)."""
ast.parse("")
def test_comments_only_passes(self):
ast.parse("# just a comment\n# another\n")
def test_complex_valid_code(self):
code = '''
import os
def foo(x):
if x > 0:
return x * 2
return 0
result = [foo(i) for i in range(10)]
print(result)
'''
ast.parse(code)
class TestSyntaxPreflightResponse:
"""Test the error response format from the preflight check."""
def _check_syntax(self, code):
"""Mimic the preflight check logic from execute_code."""
try:
ast.parse(code)
return None
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
def test_returns_json_error(self):
result = self._check_syntax("if True\n pass\n")
assert result is not None
data = json.loads(result)
assert "error" in data
assert "syntax error" in data["error"].lower()
def test_includes_line_number(self):
result = self._check_syntax("x = 1\nif True\n pass\n")
data = json.loads(result)
assert data["line"] == 2 # error on line 2
def test_includes_offset(self):
result = self._check_syntax("x = (1 + 2\n")
data = json.loads(result)
assert data["offset"] is not None
def test_includes_snippet(self):
result = self._check_syntax("if True\n")
data = json.loads(result)
assert "if True" in data["text"]
def test_none_for_valid_code(self):
result = self._check_syntax("print('ok')")
assert result is None

View File

@@ -28,7 +28,6 @@ Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Window
Remote execution additionally requires Python 3 in the terminal backend.
"""
import ast
import base64
import json
import logging
@@ -894,20 +893,6 @@ def execute_code(
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Poka-yoke (#312): Syntax check before execution.
# 83.2% of execute_code errors are Python exceptions; most are syntax
# errors the LLM generated. ast.parse() is sub-millisecond and catches
# them before we spin up a sandbox child process.
try:
ast.parse(code)
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]

View File

@@ -316,17 +316,10 @@ def cronjob(
updated = resume_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "trigger"}:
if normalized in {"run", "run_now", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized == "run_now":
from cron.jobs import run_job_now
result = run_job_now(job_id)
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}
if prompt is not None:

View File

@@ -260,12 +260,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -314,12 +310,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -457,30 +449,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
if target not in ("memory", "user"):
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
if action == "add":
if not content:
return tool_error("Content is required for 'add' action.", success=False)
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return tool_error("old_text is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
if not content:
return tool_error("content is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return tool_error("old_text is required for 'remove' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
result = store.remove(target, old_text)
else:
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return json.dumps(result, ensure_ascii=False)
@@ -547,7 +539,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
from tools.registry import registry
registry.register(
name="memory",