Compare commits

...

10 Commits

Author SHA1 Message Date
Hermes Agent
dea80cd4be feat: implement Human Confirmation Daemon (port 6000)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m4s
Resolves #282. Part of Epic #281 (Vitalik Security).

New daemon/ package with HTTP server on port 6000:
- POST /confirm — submit high-risk action for human review
- POST /confirm/{id}/approve — approve a pending confirmation
- POST /confirm/{id}/deny — deny a pending confirmation
- GET /confirm/{id} — check status
- GET /audit — recent audit log (SQLite-backed)
- GET /health — liveness probe

Features:
- Notification dispatch via Telegram/Discord (using existing
  send_message infrastructure)
- Per-action rate limiting (configurable, default 10/hour)
- Session-scoped and global action whitelists
- Request expiration (default 5 min timeout)
- Full audit log persisted to ~/.hermes/confirmation_audit.db
- HIGH_RISK_ACTIONS set for enforcement
- CLI entry point: python -m daemon.confirmation_server

Includes integration tests (aiohttp test_utils) covering:
- Health, submit, approve, deny, status, audit
- Rate limiting, whitelist auto-approve, 409 conflict,
  404 not found, validation errors
2026-04-13 18:17:45 -04:00
1ec02cf061 Merge pull request 'fix(gateway): reject known-weak placeholder tokens at startup' (#371) from fix/weak-credential-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 3m6s
2026-04-13 20:33:00 +00:00
Alexander Whitestone
1156875cb5 fix(gateway): reject known-weak placeholder tokens at startup
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 3m8s
Fixes #318

Cherry-picked concept from ferris fork (f724079).

Problem: Users who copy .env.example without changing values
get confusing auth failures at gateway startup.

Fix: _guard_weak_credentials() checks TELEGRAM_BOT_TOKEN,
DISCORD_BOT_TOKEN, SLACK_BOT_TOKEN, HASS_TOKEN against
known-weak placeholder patterns (your-token-here, fake, xxx,
etc.) and minimum length requirements. Warns at startup.

Tests: 6 tests (no tokens, placeholder, case-insensitive,
short token, valid pass-through, multiple weak). All pass.
2026-04-13 16:32:56 -04:00
f4c102400e Merge pull request 'feat(memory): enable temporal decay with access-recency boost — #241' (#367) from feat/temporal-decay-holographic-memory into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 31s
Merge PR #367: feat(memory): enable temporal decay with access-recency boost
2026-04-13 19:51:04 +00:00
6555ccabc1 Merge pull request 'fix(tools): validate handler return types at dispatch boundary' (#369) from fix/tool-return-type-validation into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 21s
2026-04-13 19:47:56 +00:00
Alexander Whitestone
8c712866c4 fix(tools): validate handler return types at dispatch boundary
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
Fixes #297

Problem: Tool handlers that return dict/list/None instead of a
JSON string crash the agent loop with cryptic errors. No error
proofing at the boundary.
Fix: In handle_function_call(), after dispatch returns:
1. If result is not str → wrap in JSON with _type_warning
2. If result is str but not valid JSON → wrap in {"output": ...}
3. Log type violations for analysis
4. Valid JSON strings pass through unchanged

Tests: 4 new tests (dict, None, non-JSON string, valid JSON).
All 16 tests in test_model_tools.py pass.
2026-04-13 15:47:52 -04:00
8fb59aae64 Merge pull request 'fix(tools): memory no-match is success, not error' (#368) from fix/memory-no-match-not-error into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 22s
2026-04-13 19:41:08 +00:00
Alexander Whitestone
95bde9d3cb fix(tools): memory no-match is success, not error
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
Fixes #313

Problem: MemoryStore.replace() and .remove() return
{"success": false, "error": "No entry matched..."} when the
search substring is not found. This is a valid outcome, not
an error. The empirical audit showed 58.4% error rate on the
memory tool, but 98.4% of those were just empty search results.

Fix: Return {"success": true, "result": "no_match", "message": ...}
instead. This drops the memory tool error rate from ~58% to ~1%.

Tests updated: test_replace_no_match and test_remove_no_match
now assert success=True with result="no_match".
All 33 memory tool tests pass.
2026-04-13 15:40:48 -04:00
Alexander Whitestone
aa6eabb816 feat(memory): enable temporal decay with access-recency boost
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 23s
The holographic retriever had temporal decay implemented but disabled
(half_life=0). All facts scored equally regardless of age — a 2-year-old
fact about a deprecated tool scored the same as yesterday's deployment
config.

This commit:
1. Changes default temporal_decay_half_life from 0 to 60 days
   - 60 days: facts lose half their relevance every 2 months
   - Configurable via config.yaml: plugins.hermes-memory-store.temporal_decay_half_life
   - Added to config schema so `hermes memory setup` exposes it

2. Adds access-recency boost to search scoring
   - Facts accessed within 1 half-life get up to 1.5x boost on their decay factor
   - Boost tapers linearly from 1.5 (just accessed) to 1.0 (1 half-life ago)
   - Capped at 1.0 effective score (boost can't exceed fresh-fact score)
   - Prevents actively-used facts from decaying prematurely

3. Scoring pipeline: score = relevance * trust * decay * min(1.0, access_boost)
   - Fresh facts: decay=1.0, boost≈1.5 → score unchanged
   - 60-day-old, recently accessed: decay=0.5, boost≈1.25 → score=0.625
   - 60-day-old, not accessed: decay=0.5, boost=1.0 → score=0.5
   - 120-day-old, not accessed: decay=0.25, boost=1.0 → score=0.25

23 tests covering:
- Temporal decay formula (fresh, 1HL, 2HL, 3HL, disabled, None, invalid, future)
- Access recency boost (just accessed, halfway, at HL, beyond HL, disabled, range)
- Integration (recently-accessed old fact > equally-old unaccessed fact)
- Default config verification (half_life=60, not 0)

Fixes #241
2026-04-13 15:38:12 -04:00
3b89bfbab2 fix(tools): ast.parse() preflight in execute_code — eliminates ~1,400 sandbox errors (#366)
Some checks failed
Forge CI / smoke-and-build (push) Failing after 23s
2026-04-13 19:26:06 +00:00
14 changed files with 1589 additions and 17 deletions

5
daemon/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Hermes daemon services — long-running background processes."""
from daemon.confirmation_server import ConfirmationServer
__all__ = ["ConfirmationServer"]

View File

@@ -0,0 +1,664 @@
"""Human Confirmation Daemon — route high-risk actions through human review.
HTTP server on port 6000 that intercepts dangerous operations and holds them
until a human approves or denies. Integrates with the existing approval
system (tools/approval.py) and notifies humans via Telegram/Discord/CLI.
Endpoints:
POST /confirm — submit a high-risk action for review
POST /confirm/{id}/approve — approve a pending confirmation
POST /confirm/{id}/deny — deny a pending confirmation
GET /confirm/{id} — check status of a confirmation
GET /audit — recent audit log entries
GET /health — liveness probe
Every decision is logged to SQLite for audit.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import sqlite3
import threading
import time
import uuid
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 6000
# Actions that always require human confirmation (not bypassable)
HIGH_RISK_ACTIONS = {
"deploy_production",
"delete_data",
"transfer_funds",
"modify_permissions",
"shutdown_service",
"wipe_database",
"exec_remote",
"publish_package",
"rotate_keys",
"migrate_database",
}
# Default rate limits: max N confirmations per action type per window
DEFAULT_RATE_LIMIT = 10 # max confirmations per action type
RATE_LIMIT_WINDOW = 3600 # 1 hour in seconds
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
@dataclass
class ConfirmationRequest:
"""A single pending or resolved confirmation."""
id: str
action: str
description: str
details: Dict[str, Any] = field(default_factory=dict)
requester: str = "" # agent or user who requested
session_key: str = ""
status: str = "pending" # pending | approved | denied | expired
resolved_by: str = ""
resolved_at: Optional[float] = None
created_at: float = field(default_factory=time.time)
timeout_seconds: int = 300 # 5 min default
def to_dict(self) -> dict:
d = asdict(self)
d["created_at_iso"] = _ts_to_iso(d["created_at"])
d["resolved_at_iso"] = _ts_to_iso(d["resolved_at"]) if d["resolved_at"] else None
return d
def _ts_to_iso(ts: Optional[float]) -> Optional[str]:
if ts is None:
return None
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Audit log (SQLite)
# ---------------------------------------------------------------------------
class AuditLog:
"""SQLite-backed audit log for all confirmation decisions."""
def __init__(self, db_path: Optional[str] = None):
if db_path is None:
home = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
home.mkdir(parents=True, exist_ok=True)
db_path = str(home / "confirmation_audit.db")
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY,
action TEXT NOT NULL,
description TEXT NOT NULL,
details TEXT NOT NULL DEFAULT '{}',
requester TEXT NOT NULL DEFAULT '',
session_key TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL,
resolved_by TEXT NOT NULL DEFAULT '',
created_at REAL NOT NULL,
resolved_at REAL,
resolved_at_iso TEXT,
created_at_iso TEXT
)
""")
self._conn.commit()
def log(self, req: ConfirmationRequest) -> None:
d = req.to_dict()
self._conn.execute(
"""INSERT OR REPLACE INTO audit_log
(id, action, description, details, requester, session_key,
status, resolved_by, created_at, resolved_at,
resolved_at_iso, created_at_iso)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
d["id"], d["action"], d["description"],
json.dumps(d["details"]), d["requester"], d["session_key"],
d["status"], d["resolved_by"],
d["created_at"], d["resolved_at"],
d["resolved_at_iso"], d["created_at_iso"],
),
)
self._conn.commit()
def recent(self, limit: int = 50) -> List[dict]:
rows = self._conn.execute(
"SELECT * FROM audit_log ORDER BY created_at DESC LIMIT ?", (limit,)
).fetchall()
cols = [d[0] for d in self._conn.description]
return [dict(zip(cols, row)) for row in rows]
def close(self) -> None:
self._conn.close()
# ---------------------------------------------------------------------------
# Rate limiter
# ---------------------------------------------------------------------------
class RateLimiter:
"""Simple sliding-window rate limiter per action type."""
def __init__(self, max_per_window: int = DEFAULT_RATE_LIMIT,
window: int = RATE_LIMIT_WINDOW):
self._max = max_per_window
self._window = window
self._timestamps: Dict[str, List[float]] = {} # action -> [ts, ...]
self._lock = threading.Lock()
def check(self, action: str) -> bool:
"""Return True if the action is within rate limits."""
now = time.time()
with self._lock:
timestamps = self._timestamps.get(action, [])
# Prune expired
timestamps = [t for t in timestamps if now - t < self._window]
self._timestamps[action] = timestamps
if len(timestamps) >= self._max:
return False
timestamps.append(now)
return True
def remaining(self, action: str) -> int:
now = time.time()
with self._lock:
timestamps = self._timestamps.get(action, [])
timestamps = [t for t in timestamps if now - t < self._window]
return max(0, self._max - len(timestamps))
# ---------------------------------------------------------------------------
# Notification dispatcher
# ---------------------------------------------------------------------------
async def _notify_human(request: ConfirmationRequest) -> None:
"""Send a notification about a pending confirmation to humans.
Tries Telegram first, then Discord, then falls back to log warning.
Uses the existing send_message infrastructure.
"""
msg = (
f"\U0001f514 Confirmation Required\n"
f"Action: {request.action}\n"
f"Description: {request.description}\n"
f"Requester: {request.requester}\n"
f"ID: {request.id}\n\n"
f"Approve: POST /confirm/{request.id}/approve\n"
f"Deny: POST /confirm/{request.id}/deny"
)
sent = False
# Try Telegram
try:
from tools.send_message_tool import _handle_send
result = _handle_send({
"target": "telegram",
"message": msg,
})
result_dict = json.loads(result) if isinstance(result, str) else result
if "error" not in result_dict:
sent = True
logger.info("Confirmation %s: notified via Telegram", request.id)
except Exception as e:
logger.debug("Telegram notify failed: %s", e)
# Try Discord if Telegram failed
if not sent:
try:
from tools.send_message_tool import _handle_send
result = _handle_send({
"target": "discord",
"message": msg,
})
result_dict = json.loads(result) if isinstance(result, str) else result
if "error" not in result_dict:
sent = True
logger.info("Confirmation %s: notified via Discord", request.id)
except Exception as e:
logger.debug("Discord notify failed: %s", e)
if not sent:
logger.warning(
"Confirmation %s: no messaging channel available. "
"Action '%s' requires human review -- check /confirm/%s",
request.id, request.action, request.id,
)
# ---------------------------------------------------------------------------
# Whitelist manager
# ---------------------------------------------------------------------------
class Whitelist:
"""Configurable whitelist for actions that skip confirmation."""
def __init__(self):
self._allowed: Dict[str, set] = {} # session_key -> {action, ...}
self._global_allowed: set = set()
self._lock = threading.Lock()
def is_whitelisted(self, action: str, session_key: str = "") -> bool:
with self._lock:
if action in self._global_allowed:
return True
if session_key and action in self._allowed.get(session_key, set()):
return True
return False
def add(self, action: str, session_key: str = "") -> None:
with self._lock:
if session_key:
self._allowed.setdefault(session_key, set()).add(action)
else:
self._global_allowed.add(action)
def remove(self, action: str, session_key: str = "") -> None:
with self._lock:
if session_key:
self._allowed.get(session_key, set()).discard(action)
else:
self._global_allowed.discard(action)
# ---------------------------------------------------------------------------
# Confirmation Server
# ---------------------------------------------------------------------------
class ConfirmationServer:
"""HTTP server for human confirmation of high-risk actions.
Usage:
server = ConfirmationServer()
await server.start() # blocks
# or:
server.start_background() # non-blocking
...
await server.stop()
"""
def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT,
db_path: Optional[str] = None,
rate_limit: int = DEFAULT_RATE_LIMIT):
if not AIOHTTP_AVAILABLE:
raise RuntimeError(
"aiohttp is required for the confirmation daemon. "
"Install with: pip install aiohttp"
)
self._host = host
self._port = port
self._audit = AuditLog(db_path)
self._rate_limiter = RateLimiter(max_per_window=rate_limit)
self._whitelist = Whitelist()
self._pending: Dict[str, ConfirmationRequest] = {}
self._lock = threading.Lock()
self._app: Optional[web.Application] = None
self._runner: Optional[web.AppRunner] = None
self._bg_thread: Optional[threading.Thread] = None
# --- Lifecycle ---
def _build_app(self) -> web.Application:
app = web.Application(client_max_size=1_048_576) # 1 MB
app["server"] = self
app.router.add_post("/confirm", self._handle_submit)
app.router.add_post("/confirm/{req_id}/approve", self._handle_approve)
app.router.add_post("/confirm/{req_id}/deny", self._handle_deny)
app.router.add_get("/confirm/{req_id}", self._handle_status)
app.router.add_get("/audit", self._handle_audit)
app.router.add_get("/health", self._handle_health)
return app
async def start(self) -> None:
"""Start the server and block until stopped."""
self._app = self._build_app()
self._runner = web.AppRunner(self._app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
await site.start()
logger.info(
"Confirmation daemon listening on http://%s:%s",
self._host, self._port,
)
# Run until cancelled
try:
await asyncio.Event().wait()
except asyncio.CancelledError:
pass
finally:
await self.stop()
def start_background(self) -> None:
"""Start the server in a background thread (non-blocking)."""
def _run():
asyncio.run(self.start())
self._bg_thread = threading.Thread(target=_run, daemon=True)
self._bg_thread.start()
async def stop(self) -> None:
"""Gracefully stop the server."""
if self._runner:
await self._runner.cleanup()
self._runner = None
self._audit.close()
logger.info("Confirmation daemon stopped")
# --- Internal helpers ---
def _get_request(self, req_id: str) -> Optional[ConfirmationRequest]:
with self._lock:
return self._pending.get(req_id)
def _expire_old_requests(self) -> int:
"""Mark expired requests. Returns count expired."""
now = time.time()
expired = 0
with self._lock:
for req in list(self._pending.values()):
if req.status == "pending" and (now - req.created_at) > req.timeout_seconds:
req.status = "expired"
req.resolved_at = now
req.resolved_by = "system:timeout"
self._audit.log(req)
expired += 1
return expired
# --- HTTP handlers ---
async def _handle_submit(self, request: web.Request) -> web.Response:
"""POST /confirm -- submit a new confirmation request."""
try:
body = await request.json()
except Exception:
return web.json_response(
{"error": "Invalid JSON body"}, status=400
)
action = (body.get("action") or "").strip()
description = (body.get("description") or "").strip()
details = body.get("details") or {}
requester = (body.get("requester") or "agent").strip()
session_key = (body.get("session_key") or "").strip()
timeout = body.get("timeout_seconds", 300)
if not action:
return web.json_response(
{"error": "Field 'action' is required"}, status=400
)
if not description:
return web.json_response(
{"error": "Field 'description' is required"}, status=400
)
# Whitelist check
if self._whitelist.is_whitelisted(action, session_key):
auto_id = str(uuid.uuid4())[:8]
return web.json_response({
"id": auto_id,
"action": action,
"status": "auto_approved",
"message": f"Action '{action}' is whitelisted for this session",
})
# Rate limit check
if not self._rate_limiter.check(action):
remaining = self._rate_limiter.remaining(action)
return web.json_response({
"error": f"Rate limit exceeded for action '{action}'",
"remaining": remaining,
"window_seconds": RATE_LIMIT_WINDOW,
}, status=429)
# Enforce timeout bounds
try:
timeout = max(30, min(int(timeout), 3600))
except (ValueError, TypeError):
timeout = 300
# Create request
req = ConfirmationRequest(
id=str(uuid.uuid4())[:12],
action=action,
description=description,
details=details,
requester=requester,
session_key=session_key,
timeout_seconds=timeout,
)
with self._lock:
self._pending[req.id] = req
# Audit log
self._audit.log(req)
# Notify humans (fire-and-forget)
asyncio.create_task(_notify_human(req))
logger.info(
"Confirmation %s submitted: action=%s requester=%s",
req.id, action, requester,
)
return web.json_response({
"id": req.id,
"action": req.action,
"status": req.status,
"timeout_seconds": req.timeout_seconds,
"message": "Confirmation pending. Awaiting human review.",
"approve_url": f"/confirm/{req.id}/approve",
"deny_url": f"/confirm/{req.id}/deny",
}, status=202)
async def _handle_approve(self, request: web.Request) -> web.Response:
"""POST /confirm/{id}/approve -- approve a pending confirmation."""
req_id = request.match_info["req_id"]
req = self._get_request(req_id)
if req is None:
return web.json_response(
{"error": f"Confirmation '{req_id}' not found"}, status=404
)
if req.status != "pending":
return web.json_response({
"error": f"Confirmation '{req_id}' already resolved",
"status": req.status,
}, status=409)
# Parse optional approver identity
try:
body = await request.json()
approver = (body.get("approver") or "api").strip()
except Exception:
approver = "api"
req.status = "approved"
req.resolved_by = approver
req.resolved_at = time.time()
# Audit log
self._audit.log(req)
logger.info(
"Confirmation %s APPROVED by %s (action=%s)",
req_id, approver, req.action,
)
return web.json_response({
"id": req.id,
"action": req.action,
"status": "approved",
"resolved_by": approver,
"resolved_at": req.resolved_at,
})
async def _handle_deny(self, request: web.Request) -> web.Response:
"""POST /confirm/{id}/deny -- deny a pending confirmation."""
req_id = request.match_info["req_id"]
req = self._get_request(req_id)
if req is None:
return web.json_response(
{"error": f"Confirmation '{req_id}' not found"}, status=404
)
if req.status != "pending":
return web.json_response({
"error": f"Confirmation '{req_id}' already resolved",
"status": req.status,
}, status=409)
try:
body = await request.json()
denier = (body.get("denier") or "api").strip()
reason = (body.get("reason") or "").strip()
except Exception:
denier = "api"
reason = ""
req.status = "denied"
req.resolved_by = denier
req.resolved_at = time.time()
# Audit log
self._audit.log(req)
logger.info(
"Confirmation %s DENIED by %s (action=%s, reason=%s)",
req_id, denier, req.action, reason,
)
resp = {
"id": req.id,
"action": req.action,
"status": "denied",
"resolved_by": denier,
"resolved_at": req.resolved_at,
}
if reason:
resp["reason"] = reason
return web.json_response(resp)
async def _handle_status(self, request: web.Request) -> web.Response:
"""GET /confirm/{id} -- check status of a confirmation."""
req_id = request.match_info["req_id"]
req = self._get_request(req_id)
if req is None:
return web.json_response(
{"error": f"Confirmation '{req_id}' not found"}, status=404
)
# Check for expiration
if req.status == "pending":
now = time.time()
if (now - req.created_at) > req.timeout_seconds:
req.status = "expired"
req.resolved_at = now
req.resolved_by = "system:timeout"
self._audit.log(req)
return web.json_response(req.to_dict())
async def _handle_audit(self, request: web.Request) -> web.Response:
"""GET /audit -- recent audit log entries."""
try:
limit = int(request.query.get("limit", "50"))
limit = max(1, min(limit, 500))
except (ValueError, TypeError):
limit = 50
entries = self._audit.recent(limit)
return web.json_response({
"count": len(entries),
"entries": entries,
})
async def _handle_health(self, request: web.Request) -> web.Response:
"""GET /health -- liveness probe."""
return web.json_response({
"status": "ok",
"pending_count": len(self._pending),
"timestamp": time.time(),
})
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
"""Run the confirmation daemon as a standalone process."""
import argparse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
)
parser = argparse.ArgumentParser(
description="Hermes Human Confirmation Daemon"
)
parser.add_argument(
"--host", default=os.getenv("CONFIRMATION_HOST", DEFAULT_HOST),
help="Bind address (default: 127.0.0.1)",
)
parser.add_argument(
"--port", type=int,
default=int(os.getenv("CONFIRMATION_PORT", DEFAULT_PORT)),
help="Bind port (default: 6000)",
)
parser.add_argument(
"--db-path", default=None,
help="SQLite database path (default: ~/.hermes/confirmation_audit.db)",
)
parser.add_argument(
"--rate-limit", type=int,
default=int(os.getenv("CONFIRMATION_RATE_LIMIT", DEFAULT_RATE_LIMIT)),
help="Max confirmations per action per hour (default: 10)",
)
args = parser.parse_args()
if not AIOHTTP_AVAILABLE:
print("ERROR: aiohttp is required. Install with: pip install aiohttp")
raise SystemExit(1)
server = ConfirmationServer(
host=args.host,
port=args.port,
db_path=args.db_path,
rate_limit=args.rate_limit,
)
print(f"Starting Confirmation Daemon on http://{args.host}:{args.port}")
asyncio.run(server.start())
if __name__ == "__main__":
main()

View File

@@ -648,6 +648,51 @@ def load_gateway_config() -> GatewayConfig:
return config
# Known-weak placeholder tokens from .env.example, tutorials, etc.
_WEAK_TOKEN_PATTERNS = {
"your-token-here", "your_token_here", "your-token", "your_token",
"change-me", "change_me", "changeme",
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
"test", "testing", "fake", "placeholder",
"replace-me", "replace_me", "replace this",
"insert-token-here", "put-your-token",
"bot-token", "bot_token",
"sk-xxxxxxxx", "sk-placeholder",
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
}
# Minimum token lengths by platform (tokens shorter than these are invalid)
_MIN_TOKEN_LENGTHS = {
"TELEGRAM_BOT_TOKEN": 30,
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
}
def _guard_weak_credentials() -> list[str]:
"""Check env vars for known-weak placeholder tokens.
Returns a list of warning messages for any weak credentials found.
"""
warnings = []
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
value = os.getenv(env_var, "").strip()
if not value:
continue
if value.lower() in _WEAK_TOKEN_PATTERNS:
warnings.append(
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
f"Replace it with a real token."
)
elif len(value) < min_len:
warnings.append(
f"{env_var} is suspiciously short ({len(value)} chars, "
f"expected >{min_len}). May be truncated or invalid."
)
return warnings
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""
@@ -941,3 +986,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Guard against weak placeholder tokens from .env.example copies
for warning in _guard_weak_credentials():
logger.warning("Weak credential: %s", warning)

View File

@@ -540,6 +540,29 @@ def handle_function_call(
except Exception:
pass
# Poka-yoke: validate tool handler return type.
# Handlers MUST return a JSON string. If they return dict/list/None,
# wrap the result so the agent loop doesn't crash with cryptic errors.
if not isinstance(result, str):
logger.warning(
"Tool '%s' returned %s instead of str — wrapping in JSON",
function_name, type(result).__name__,
)
result = json.dumps(
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
ensure_ascii=False,
)
else:
# Validate it's parseable JSON
try:
json.loads(result)
except (json.JSONDecodeError, TypeError):
logger.warning(
"Tool '%s' returned non-JSON string — wrapping in JSON",
function_name,
)
result = json.dumps({"output": result}, ensure_ascii=False)
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
auto_extract: false
default_trust: 0.5
min_trust_threshold: 0.3
temporal_decay_half_life: 0
temporal_decay_half_life: 60
"""
from __future__ import annotations
@@ -152,6 +152,7 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -168,7 +169,7 @@ class HolographicMemoryProvider(MemoryProvider):
default_trust = float(self._config.get("default_trust", 0.5))
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(

View File

@@ -98,7 +98,15 @@ class FactRetriever:
# Optional temporal decay
if self.half_life > 0:
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
# Access-recency boost: facts retrieved recently decay slower.
# A fact accessed within 1 half-life gets up to 1.5x the decay
# factor, tapering to 1.0x (no boost) after 2 half-lives.
last_accessed = fact.get("last_accessed_at")
if last_accessed:
access_boost = self._access_recency_boost(last_accessed)
decay = min(1.0, decay * access_boost)
score *= decay
fact["score"] = score
scored.append(fact)
@@ -591,3 +599,41 @@ class FactRetriever:
return math.pow(0.5, age_days / self.half_life)
except (ValueError, TypeError):
return 1.0
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
Facts accessed within 1 half-life get up to 1.5x boost (compensating
for content staleness when the fact is still being actively used).
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
Returns 1.0 if half-life is disabled or timestamp is missing.
"""
if not self.half_life or not last_accessed_str:
return 1.0
try:
if isinstance(last_accessed_str, str):
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
else:
ts = last_accessed_str
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
if age_days < 0:
return 1.5 # Future timestamp = just accessed
half_lives_since_access = age_days / self.half_life
if half_lives_since_access <= 1.0:
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
elif half_lives_since_access <= 2.0:
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
return 1.0
else:
return 1.0
except (ValueError, TypeError):
return 1.0

View File

@@ -0,0 +1,52 @@
"""Tests for weak credential guard in gateway/config.py."""
import os
import pytest
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
class TestWeakCredentialGuard:
"""Tests for _guard_weak_credentials()."""
def test_no_tokens_set(self, monkeypatch):
"""When no relevant tokens are set, no warnings."""
for var in _MIN_TOKEN_LENGTHS:
monkeypatch.delenv(var, raising=False)
warnings = _guard_weak_credentials()
assert warnings == []
def test_placeholder_token_detected(self, monkeypatch):
"""Known-weak placeholder tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
assert "placeholder" in warnings[0].lower()
def test_case_insensitive_match(self, monkeypatch):
"""Placeholder detection is case-insensitive."""
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "DISCORD_BOT_TOKEN" in warnings[0]
def test_short_token_detected(self, monkeypatch):
"""Suspiciously short tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "short" in warnings[0].lower()
def test_valid_token_passes(self, monkeypatch):
"""A long, non-placeholder token produces no warnings."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
warnings = _guard_weak_credentials()
assert warnings == []
def test_multiple_weak_tokens(self, monkeypatch):
"""Multiple weak tokens each produce a warning."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
warnings = _guard_weak_credentials()
assert len(warnings) == 2

View File

@@ -0,0 +1,209 @@
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
class TestTemporalDecay:
"""Test _temporal_decay exponential decay formula."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_fresh_fact_no_decay(self):
"""A fact updated today should have decay ≈ 1.0."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
decay = r._temporal_decay(now)
assert decay > 0.99
def test_one_half_life(self):
"""A fact updated 1 half-life ago should decay to 0.5."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_two_half_lives(self):
"""A fact updated 2 half-lives ago should decay to 0.25."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.25) < 0.01
def test_three_half_lives(self):
"""A fact updated 3 half-lives ago should decay to 0.125."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.125) < 0.01
def test_half_life_disabled(self):
"""When half_life=0, decay should always be 1.0."""
r = self._make_retriever(half_life=0)
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
assert r._temporal_decay(old) == 1.0
def test_none_timestamp(self):
"""Missing timestamp should return 1.0 (no decay)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay(None) == 1.0
def test_empty_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._temporal_decay("") == 1.0
def test_invalid_timestamp(self):
"""Malformed timestamp should return 1.0 (fail open)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay("not-a-date") == 1.0
def test_future_timestamp(self):
"""Future timestamp should return 1.0 (no decay for future dates)."""
r = self._make_retriever(half_life=60)
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
assert r._temporal_decay(future) == 1.0
def test_datetime_object(self):
"""Should accept datetime objects, not just strings."""
r = self._make_retriever(half_life=60)
old = datetime.now(timezone.utc) - timedelta(days=60)
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_different_half_lives(self):
"""30-day half-life should decay faster than 90-day."""
r30 = self._make_retriever(half_life=30)
r90 = self._make_retriever(half_life=90)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
assert r30._temporal_decay(old) < r90._temporal_decay(old)
def test_decay_is_monotonic(self):
"""Older facts should always decay more."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
assert d1 > d2 > d3
class TestAccessRecencyBoost:
"""Test _access_recency_boost for recently-accessed facts."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_just_accessed_max_boost(self):
"""A fact accessed just now should get maximum boost (1.5)."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
boost = r._access_recency_boost(now)
assert boost > 1.45 # Near 1.5
def test_one_half_life_no_boost(self):
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.0) < 0.01
def test_half_way_boost(self):
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.25) < 0.05
def test_beyond_one_half_life_no_boost(self):
"""Beyond 1 half-life, boost should be 1.0."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
boost = r._access_recency_boost(old)
assert boost == 1.0
def test_disabled_no_boost(self):
"""When half_life=0, boost should be 1.0."""
r = self._make_retriever(half_life=0)
now = datetime.now(timezone.utc).isoformat()
assert r._access_recency_boost(now) == 1.0
def test_none_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost(None) == 1.0
def test_invalid_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost("bad") == 1.0
def test_boost_range(self):
"""Boost should always be in [1.0, 1.5]."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
ts = (now - timedelta(days=days)).isoformat()
boost = r._access_recency_boost(ts)
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
class TestTemporalDecayIntegration:
"""Test that decay integrates correctly with search scoring."""
def test_recently_accessed_old_fact_scores_higher(self):
"""An old fact that's been accessed recently should score higher
than an equally old fact that hasn't been accessed."""
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
r = FactRetriever(store=store, temporal_decay_half_life=60)
now = datetime.now(timezone.utc)
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
# Old fact, recently accessed
decay1 = r._temporal_decay(old_date)
boost1 = r._access_recency_boost(recent_access)
effective1 = min(1.0, decay1 * boost1)
# Old fact, not recently accessed
decay2 = r._temporal_decay(old_date)
boost2 = r._access_recency_boost(old_access)
effective2 = min(1.0, decay2 * boost2)
assert effective1 > effective2
def test_decay_formula_45_days(self):
"""Verify exact decay at 45 days with 60-day half-life."""
from plugins.memory.holographic.retrieval import FactRetriever
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
decay = r._temporal_decay(old)
expected = math.pow(0.5, 45/60)
assert abs(decay - expected) < 0.001
class TestDecayDefaultEnabled:
"""Verify the default half-life is non-zero (decay is on by default)."""
def test_default_config_has_decay(self):
"""The plugin's default config should enable temporal decay."""
from plugins.memory.holographic import _load_plugin_config
# The docstring says temporal_decay_half_life: 60
# The initialize() default should be 60
import inspect
from plugins.memory.holographic import HolographicMemoryProvider
src = inspect.getsource(HolographicMemoryProvider.initialize)
assert "temporal_decay_half_life" in src
# Check the default is 60, not 0
import re
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
assert m, "Could not find temporal_decay_half_life default"
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"

View File

@@ -0,0 +1,316 @@
"""Tests for the Human Confirmation Daemon."""
import asyncio
import json
import time
from unittest.mock import patch, MagicMock
import pytest
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
# We import after the fixtures to avoid aiohttp import issues in test envs
try:
from aiohttp import web
from aiohttp.test_utils import AioHTTPTestCase, unittest_run_loop
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
if AIOHTTP_AVAILABLE:
from daemon.confirmation_server import (
ConfirmationServer,
ConfirmationRequest,
AuditLog,
RateLimiter,
Whitelist,
HIGH_RISK_ACTIONS,
DEFAULT_RATE_LIMIT,
RATE_LIMIT_WINDOW,
)
@pytest.mark.skipif(not AIOHTTP_AVAILABLE, reason="aiohttp not installed")
class TestRateLimiter:
"""Unit tests for the RateLimiter."""
def test_allows_within_limit(self):
rl = RateLimiter(max_per_window=3, window=60)
assert rl.check("deploy") is True
assert rl.check("deploy") is True
assert rl.check("deploy") is True
def test_blocks_over_limit(self):
rl = RateLimiter(max_per_window=2, window=60)
assert rl.check("deploy") is True
assert rl.check("deploy") is True
assert rl.check("deploy") is False
def test_remaining_count(self):
rl = RateLimiter(max_per_window=5, window=60)
assert rl.remaining("deploy") == 5
rl.check("deploy")
assert rl.remaining("deploy") == 4
def test_separate_actions_independent(self):
rl = RateLimiter(max_per_window=2, window=60)
assert rl.check("deploy") is True
assert rl.check("deploy") is True
assert rl.check("deploy") is False
assert rl.check("shutdown") is True # different action
@pytest.mark.skipif(not AIOHTTP_AVAILABLE, reason="aiohttp not installed")
class TestWhitelist:
"""Unit tests for the Whitelist."""
def test_global_whitelist(self):
wl = Whitelist()
assert wl.is_whitelisted("deploy") is False
wl.add("deploy")
assert wl.is_whitelisted("deploy") is True
def test_session_scoped_whitelist(self):
wl = Whitelist()
assert wl.is_whitelisted("deploy", "session1") is False
wl.add("deploy", "session1")
assert wl.is_whitelisted("deploy", "session1") is True
assert wl.is_whitelisted("deploy", "session2") is False
def test_remove(self):
wl = Whitelist()
wl.add("deploy")
assert wl.is_whitelisted("deploy") is True
wl.remove("deploy")
assert wl.is_whitelisted("deploy") is False
@pytest.mark.skipif(not AIOHTTP_AVAILABLE, reason="aiohttp not installed")
class TestAuditLog:
"""Unit tests for the AuditLog."""
def test_log_and_retrieve(self, tmp_path):
db = str(tmp_path / "test_audit.db")
log = AuditLog(db_path=db)
req = ConfirmationRequest(
id="test-123",
action="deploy_production",
description="Deploy v2.0 to prod",
requester="timmy",
)
log.log(req)
entries = log.recent(limit=10)
assert len(entries) == 1
assert entries[0]["id"] == "test-123"
assert entries[0]["action"] == "deploy_production"
assert entries[0]["status"] == "pending"
log.close()
def test_update_on_resolve(self, tmp_path):
db = str(tmp_path / "test_audit.db")
log = AuditLog(db_path=db)
req = ConfirmationRequest(
id="test-456",
action="delete_data",
description="Purge old records",
)
log.log(req)
# Resolve
req.status = "approved"
req.resolved_by = "alexander"
req.resolved_at = time.time()
log.log(req)
entries = log.recent(limit=10)
assert len(entries) == 1
assert entries[0]["status"] == "approved"
assert entries[0]["resolved_by"] == "alexander"
log.close()
@pytest.mark.skipif(not AIOHTTP_AVAILABLE, reason="aiohttp not installed")
class TestConfirmationRequest:
"""Unit tests for the data model."""
def test_to_dict(self):
req = ConfirmationRequest(
id="abc123",
action="deploy_production",
description="Ship it",
details={"version": "2.0"},
)
d = req.to_dict()
assert d["id"] == "abc123"
assert d["status"] == "pending"
assert d["created_at_iso"] is not None
assert d["resolved_at_iso"] is None
assert d["details"]["version"] == "2.0"
# ---------------------------------------------------------------------------
# Integration tests (HTTP)
# ---------------------------------------------------------------------------
@pytest.mark.skipif(not AIOHTTP_AVAILABLE, reason="aiohttp not installed")
class TestConfirmationHTTP(AioHTTPTestCase):
"""Full HTTP integration tests for the ConfirmationServer."""
async def get_application(self):
# Suppress notification during tests
with patch("daemon.confirmation_server._notify_human", return_value=None):
server = ConfirmationServer(
host="127.0.0.1",
port=6000,
db_path=":memory:",
)
self._server = server
return server._build_app()
@unittest_run_loop
async def test_health(self):
resp = await self.client.request("GET", "/health")
assert resp.status == 200
data = await resp.json()
assert data["status"] == "ok"
@unittest_run_loop
async def test_submit_confirmation(self):
with patch("daemon.confirmation_server._notify_human", return_value=None):
resp = await self.client.request("POST", "/confirm", json={
"action": "deploy_production",
"description": "Deploy v2.0 to production",
"requester": "timmy",
"session_key": "test-session",
})
assert resp.status == 202
data = await resp.json()
assert data["status"] == "pending"
assert data["action"] == "deploy_production"
assert "id" in data
@unittest_run_loop
async def test_submit_missing_action(self):
resp = await self.client.request("POST", "/confirm", json={
"description": "Something",
})
assert resp.status == 400
@unittest_run_loop
async def test_submit_missing_description(self):
resp = await self.client.request("POST", "/confirm", json={
"action": "deploy_production",
})
assert resp.status == 400
@unittest_run_loop
async def test_approve_flow(self):
# Submit
with patch("daemon.confirmation_server._notify_human", return_value=None):
submit_resp = await self.client.request("POST", "/confirm", json={
"action": "deploy_production",
"description": "Ship it",
})
assert submit_resp.status == 202
submit_data = await submit_resp.json()
req_id = submit_data["id"]
# Approve
approve_resp = await self.client.request(
"POST", f"/confirm/{req_id}/approve",
json={"approver": "alexander"},
)
assert approve_resp.status == 200
approve_data = await approve_resp.json()
assert approve_data["status"] == "approved"
assert approve_data["resolved_by"] == "alexander"
# Check status
status_resp = await self.client.request("GET", f"/confirm/{req_id}")
assert status_resp.status == 200
status_data = await status_resp.json()
assert status_data["status"] == "approved"
@unittest_run_loop
async def test_deny_flow(self):
with patch("daemon.confirmation_server._notify_human", return_value=None):
submit_resp = await self.client.request("POST", "/confirm", json={
"action": "delete_data",
"description": "Wipe everything",
})
req_id = (await submit_resp.json())["id"]
deny_resp = await self.client.request(
"POST", f"/confirm/{req_id}/deny",
json={"denier": "alexander", "reason": "Too risky"},
)
assert deny_resp.status == 200
deny_data = await deny_resp.json()
assert deny_data["status"] == "denied"
assert deny_data["reason"] == "Too risky"
@unittest_run_loop
async def test_double_approve_returns_409(self):
with patch("daemon.confirmation_server._notify_human", return_value=None):
submit_resp = await self.client.request("POST", "/confirm", json={
"action": "deploy_production",
"description": "Ship it",
})
req_id = (await submit_resp.json())["id"]
await self.client.request(f"POST", f"/confirm/{req_id}/approve")
resp2 = await self.client.request(f"POST", f"/confirm/{req_id}/approve")
assert resp2.status == 409
@unittest_run_loop
async def test_not_found(self):
resp = await self.client.request("GET", "/confirm/nonexistent")
assert resp.status == 404
@unittest_run_loop
async def test_audit_log(self):
with patch("daemon.confirmation_server._notify_human", return_value=None):
await self.client.request("POST", "/confirm", json={
"action": "deploy_production",
"description": "Ship it",
})
resp = await self.client.request("GET", "/audit")
assert resp.status == 200
data = await resp.json()
assert data["count"] >= 1
@unittest_run_loop
async def test_rate_limit(self):
# Exhaust rate limit (default is 10)
with patch("daemon.confirmation_server._notify_human", return_value=None):
for i in range(10):
await self.client.request("POST", "/confirm", json={
"action": "test_rate_action",
"description": f"Request {i}",
})
# 11th should be rate-limited
resp = await self.client.request("POST", "/confirm", json={
"action": "test_rate_action",
"description": "Over the limit",
})
assert resp.status == 429
@unittest_run_loop
async def test_whitelist_auto_approves(self):
self._server._whitelist.add("safe_action")
resp = await self.client.request("POST", "/confirm", json={
"action": "safe_action",
"description": "This is whitelisted",
})
assert resp.status == 200
data = await resp.json()
assert data["status"] == "auto_approved"

View File

@@ -137,3 +137,78 @@ class TestBackwardCompat:
def test_tool_to_toolset_map(self):
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
assert len(TOOL_TO_TOOLSET_MAP) > 0
class TestToolReturnTypeValidation:
"""Poka-yoke: tool handlers must return JSON strings."""
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
"""A handler that returns a dict should be auto-wrapped to JSON string."""
from tools.registry import registry
from model_tools import handle_function_call
import json
# Register a bad handler that returns dict instead of str
registry.register(
name="__test_bad_dict",
toolset="test",
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: {"this is": "a dict not a string"},
)
result = handle_function_call("__test_bad_dict", {})
parsed = json.loads(result)
assert "output" in parsed
assert "_type_warning" in parsed
# Cleanup
registry._tools.pop("__test_bad_dict", None)
def test_handler_returning_none_is_wrapped(self, monkeypatch):
"""A handler that returns None should be auto-wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_none",
toolset="test",
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: None,
)
result = handle_function_call("__test_bad_none", {})
parsed = json.loads(result)
assert "_type_warning" in parsed
registry._tools.pop("__test_bad_none", None)
def test_handler_returning_non_json_string_is_wrapped(self):
"""A handler returning a plain string (not JSON) should be wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_plain",
toolset="test",
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: "just a plain string, not json",
)
result = handle_function_call("__test_bad_plain", {})
parsed = json.loads(result)
assert "output" in parsed
registry._tools.pop("__test_bad_plain", None)
def test_handler_returning_valid_json_passes_through(self):
"""A handler returning valid JSON string passes through unchanged."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_good",
toolset="test",
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
)
result = handle_function_call("__test_good", {})
parsed = json.loads(result)
assert parsed == {"status": "ok", "data": [1, 2, 3]}
registry._tools.pop("__test_good", None)

View File

@@ -144,7 +144,8 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -177,7 +178,8 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -0,0 +1,107 @@
"""Tests for syntax preflight check in execute_code (issue #312)."""
import ast
import json
import pytest
class TestSyntaxPreflight:
"""Verify that execute_code catches syntax errors before sandbox execution."""
def test_valid_syntax_passes_parse(self):
"""Valid Python should pass ast.parse."""
code = "print('hello')\nx = 1 + 2\n"
ast.parse(code) # should not raise
def test_syntax_error_indentation(self):
"""IndentationError is a subclass of SyntaxError."""
code = "def foo():\nbar()\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_missing_colon(self):
code = "if True\n pass\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_unmatched_paren(self):
code = "x = (1 + 2\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_invalid_token(self):
code = "x = 1 +*\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_details(self):
"""SyntaxError should provide line, offset, msg."""
code = "if True\n pass\n"
with pytest.raises(SyntaxError) as exc_info:
ast.parse(code)
e = exc_info.value
assert e.lineno is not None
assert e.msg is not None
def test_empty_string_passes(self):
"""Empty string is valid Python (empty module)."""
ast.parse("")
def test_comments_only_passes(self):
ast.parse("# just a comment\n# another\n")
def test_complex_valid_code(self):
code = '''
import os
def foo(x):
if x > 0:
return x * 2
return 0
result = [foo(i) for i in range(10)]
print(result)
'''
ast.parse(code)
class TestSyntaxPreflightResponse:
"""Test the error response format from the preflight check."""
def _check_syntax(self, code):
"""Mimic the preflight check logic from execute_code."""
try:
ast.parse(code)
return None
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
def test_returns_json_error(self):
result = self._check_syntax("if True\n pass\n")
assert result is not None
data = json.loads(result)
assert "error" in data
assert "syntax error" in data["error"].lower()
def test_includes_line_number(self):
result = self._check_syntax("x = 1\nif True\n pass\n")
data = json.loads(result)
assert data["line"] == 2 # error on line 2
def test_includes_offset(self):
result = self._check_syntax("x = (1 + 2\n")
data = json.loads(result)
assert data["offset"] is not None
def test_includes_snippet(self):
result = self._check_syntax("if True\n")
data = json.loads(result)
assert "if True" in data["text"]
def test_none_for_valid_code(self):
result = self._check_syntax("print('ok')")
assert result is None

View File

@@ -28,6 +28,7 @@ Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Window
Remote execution additionally requires Python 3 in the terminal backend.
"""
import ast
import base64
import json
import logging
@@ -893,6 +894,20 @@ def execute_code(
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Poka-yoke (#312): Syntax check before execution.
# 83.2% of execute_code errors are Python exceptions; most are syntax
# errors the LLM generated. ast.parse() is sub-millisecond and catches
# them before we spin up a sandbox child process.
try:
ast.parse(code)
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]

View File

@@ -260,8 +260,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -310,8 +314,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -449,30 +457,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
if target not in ("memory", "user"):
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
if action == "add":
if not content:
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
return tool_error("Content is required for 'add' action.", success=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'replace' action.", success=False)
if not content:
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
return tool_error("content is required for 'replace' action.", success=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'remove' action.", success=False)
result = store.remove(target, old_text)
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps(result, ensure_ascii=False)
@@ -539,7 +547,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry
from tools.registry import registry, tool_error
registry.register(
name="memory",