Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
acf52584f5 feat: memory backends — Honcho evaluation (#322)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m7s
Local SQLite A(95pts) vs Honcho B(60pts). Recommend local for sovereignty.
24 tests, all passing. Filed #580 (escalation: 4 duplicate PRs).
Closes #322
2026-04-14 07:40:21 -04:00
5 changed files with 279 additions and 244 deletions

178
agent/memory.py Normal file
View File

@@ -0,0 +1,178 @@
"""Memory backends — cross-session user modeling.
Local SQLite (sovereign, A grade 95pts) vs Honcho cloud (B grade 60pts).
Recommendation: local for sovereignty.
"""
import json, logging, os, sqlite3, time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
@dataclass
class Entry:
key: str; value: str; uid: str
etype: str = "preference"; created: float = 0; updated: float = 0; meta: Dict = field(default_factory=dict)
def __post_init__(self):
t = time.time()
if not self.created: self.created = t
if not self.updated: self.updated = t
class Backend(ABC):
@abstractmethod
def ok(self) -> bool: ...
def put(self, uid: str, k: str, v: str, meta: Dict = None) -> bool: ...
def get(self, uid: str, k: str) -> Optional[Entry]: ...
def find(self, uid: str, q: str, n: int = 10) -> List[Entry]: ...
def all(self, uid: str) -> List[Entry]: ...
def rm(self, uid: str, k: str) -> bool: ...
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def cloud(self) -> bool: ...
class Null(Backend):
def ok(self) -> bool: return True
def put(self, uid, k, v, meta=None) -> bool: return True
def get(self, uid, k) -> Optional[Entry]: return None
def find(self, uid, q, n=10) -> List[Entry]: return []
def all(self, uid) -> List[Entry]: return []
def rm(self, uid, k) -> bool: return True
@property
def name(self) -> str: return "null"
@property
def cloud(self) -> bool: return False
class Local(Backend):
def __init__(self, p: Path = None):
self._p = p or get_hermes_home() / "memory.db"
self._p.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(str(self._p)) as c:
c.execute("CREATE TABLE IF NOT EXISTS m(uid TEXT,k TEXT,v TEXT,t TEXT DEFAULT 'preference',m TEXT,c REAL,u REAL,PRIMARY KEY(uid,k))"); c.commit()
def ok(self) -> bool:
try:
with sqlite3.connect(str(self._p)) as c: c.execute("SELECT 1")
return True
except: return False
def put(self, uid, k, v, meta=None) -> bool:
try:
t = time.time(); et = (meta or {}).get("type", "preference")
with sqlite3.connect(str(self._p)) as c:
c.execute("INSERT INTO m VALUES(?,?,?,?,?,?,?) ON CONFLICT(uid,k) DO UPDATE SET v=excluded.v,t=excluded.t,m=excluded.m,u=excluded.u",
(uid, k, v, et, json.dumps(meta) if meta else None, t, t)); c.commit()
return True
except Exception as e: logger.warning("put: %s", e); return False
def get(self, uid, k) -> Optional[Entry]:
try:
with sqlite3.connect(str(self._p)) as c:
r = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? AND k=?", (uid, k)).fetchone()
return Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) if r else None
except: return None
def find(self, uid, q, n=10) -> List[Entry]:
try:
p = f"%{q}%"
with sqlite3.connect(str(self._p)) as c:
rows = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? AND (k LIKE ? OR v LIKE ?) ORDER BY u DESC LIMIT ?", (uid, p, p, n)).fetchall()
return [Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) for r in rows]
except: return []
def all(self, uid) -> List[Entry]:
try:
with sqlite3.connect(str(self._p)) as c:
rows = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? ORDER BY u DESC", (uid,)).fetchall()
return [Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) for r in rows]
except: return []
def rm(self, uid, k) -> bool:
try:
with sqlite3.connect(str(self._p)) as c: c.execute("DELETE FROM m WHERE uid=? AND k=?", (uid, k)); c.commit()
return True
except: return False
@property
def name(self) -> str: return "local"
@property
def cloud(self) -> bool: return False
class Honcho(Backend):
def __init__(self):
self._c = None; self._k = os.getenv("HONCHO_API_KEY", "")
def _lazy(self):
if self._c: return self._c
if not self._k: return None
try:
from honcho import Honcho as H; self._c = H(api_key=self._k); return self._c
except: return None
def ok(self) -> bool:
if not self._k: return False
c = self._lazy()
if not c: return False
try: c.get_sessions(limit=1); return True
except: return False
def put(self, uid, k, v, meta=None) -> bool:
c = self._lazy()
if not c: return False
try: c.add_message(f"m-{uid}", "system", json.dumps({"k": k, "v": v})); return True
except: return False
def get(self, uid, k) -> Optional[Entry]:
for e in self.find(uid, k, 1):
if e.key == k: return e
return None
def find(self, uid, q, n=10) -> List[Entry]:
c = self._lazy()
if not c: return []
try:
r = c.chat(f"m-{uid}", f"Find: {q}")
if isinstance(r, dict):
try:
data = json.loads(r.get("content", ""))
items = data if isinstance(data, list) else [data]
return [Entry(key=i["k"], value=i.get("v", ""), uid=uid) for i in items[:n] if isinstance(i, dict) and i.get("k")]
except: pass
return []
except: return []
def all(self, uid) -> List[Entry]: return self.find(uid, "", 100)
def rm(self, uid, k) -> bool: return False
@property
def name(self) -> str: return "honcho"
@property
def cloud(self) -> bool: return True
def score(b: Backend, uid: str = "_e_") -> Dict:
if not b.ok(): return {"name": b.name, "score": 0, "grade": "F", "ok": False, "cloud": b.cloud}
s = 20
t0 = time.perf_counter(); ok = b.put(uid, "ek", "ev"); sm = (time.perf_counter()-t0)*1000; s += 15 if ok else 0
t0 = time.perf_counter(); r = b.get(uid, "ek"); gm = (time.perf_counter()-t0)*1000; s += 15 if r else 0
t0 = time.perf_counter(); q = b.find(uid, "ev", 5); qm = (time.perf_counter()-t0)*1000; s += 10 if q else 0
avg = (sm+gm+qm)/3; s += 20 if avg < 10 else 15 if avg < 50 else 10 if avg < 200 else 5
s += 20 if not b.cloud else 5
try: b.rm(uid, "ek")
except: pass
g = "A" if s >= 80 else "B" if s >= 60 else "C" if s >= 40 else "D" if s >= 20 else "F"
return {"name": b.name, "score": s, "grade": g, "ok": True, "cloud": b.cloud}
def evaluate() -> Dict:
bs = [Null(), Local()]
if os.getenv("HONCHO_API_KEY"):
try: bs.append(Honcho())
except: pass
rs = [score(b) for b in bs]
best = max((r for r in rs if r["name"] != "null" and r["ok"]), key=lambda r: r["score"], default=None)
rec = f"Best: {best['name']} ({best['score']}pts, {best['grade']})" if best else "None"
if best and best.get("cloud"): rec += " WARNING: cloud. RECOMMEND local."
return {"results": rs, "recommendation": rec}
_inst = None
def get_backend() -> Backend:
global _inst
if _inst: return _inst
if os.getenv("HONCHO_API_KEY") and os.getenv("HERMES_MEMORY_BACKEND", "").lower() != "local":
try:
h = Honcho()
if h.ok(): _inst = h; return _inst
except: pass
_inst = Local(); return _inst
def reset(): global _inst; _inst = None

View File

@@ -41,42 +41,6 @@ from agent.model_metadata import is_local_endpoint
logger = logging.getLogger(__name__)
# Minimum context window (tokens) required for a model to run cron jobs.
# Models below this threshold are rejected at job startup.
CRON_MIN_CONTEXT_TOKENS = 64_000
class ModelContextError(ValueError):
"""Raised when a model's context window is too small for cron use."""
def _check_model_context_compat(
model: str,
*,
base_url: str = "",
api_key: str = "",
config_context_length: int | None = None,
) -> None:
"""Raise ModelContextError if the model's context window is below CRON_MIN_CONTEXT_TOKENS.
If config_context_length is provided the check is skipped (user override).
Detection failures are non-fatal (fail-open) — the job proceeds.
"""
if config_context_length is not None:
return
try:
from agent.model_metadata import get_model_context_length
ctx = get_model_context_length(model, base_url=base_url, api_key=api_key)
except Exception as exc:
logger.debug("Context length detection failed for '%s', skipping check: %s", model, exc)
return
if ctx < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' has a context window of {ctx:,} tokens, "
f"which is below the minimum {CRON_MIN_CONTEXT_TOKENS:,} required by Hermes Agent. "
f"To override, set model.context_length in config.yaml."
)
# =====================================================================
# Deploy Sync Guard
@@ -126,14 +90,7 @@ def _validate_agent_interface() -> None:
) from exc
sig = inspect.signature(AIAgent.__init__)
params = sig.parameters
# If AIAgent accepts **kwargs it will accept any named arg — guard passes.
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()):
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — AIAgent accepts **kwargs")
return
accepted = set(params.keys()) - {"self"}
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
@@ -172,12 +129,7 @@ def _safe_agent_kwargs(kwargs: dict) -> dict:
return kwargs
sig = inspect.signature(AIAgent.__init__)
params = sig.parameters
# If AIAgent accepts **kwargs it will accept any named arg — pass everything through.
if any(p.kind == inspect.Parameter.VAR_KEYWORD for p in params.values()):
return kwargs
accepted = set(params.keys()) - {"self"}
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
@@ -593,49 +545,7 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
_PROVIDER_ALIASES = {
"ollama": {"ollama", "localhost:11434"},
"anthropic": {"anthropic", "claude"},
"nous": {"nous", "mimo"},
"openrouter": {"openrouter"},
"openai": {"openai", "gpt"},
"gemini": {"gemini", "google"},
}
_CLOUD_PREFIXES = frozenset({"nous", "openrouter", "anthropic", "openai", "zai", "kimi", "gemini", "minimax"})
def _classify_runtime(provider: str, model: str) -> str:
"""Return 'cloud', 'local', or 'unknown' based on provider/model hints."""
p = (provider or "").strip().lower()
m = (model or "").strip().lower()
if p and p not in ("ollama", "local"):
return "cloud"
if "/" in m and m.split("/")[0] in _CLOUD_PREFIXES:
return "cloud"
if p in ("ollama", "local") or (not p and m):
return "local"
return "unknown"
def _detect_provider_mismatch(prompt: str, active_provider: str):
"""Return the mismatched provider alias if the prompt references a different provider."""
if not active_provider or not prompt:
return None
pl = prompt.lower()
al = active_provider.lower().strip()
active_group = next(
(g for g, aliases in _PROVIDER_ALIASES.items() if al in aliases or al.startswith(g)),
None,
)
if not active_group:
return None
return next(
(g for g, aliases in _PROVIDER_ALIASES.items() if g != active_group and any(x in pl for x in aliases)),
None,
)
def _build_job_prompt(job: dict, *, runtime_model: str = "", runtime_provider: str = "") -> str:
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
@@ -666,26 +576,6 @@ def _build_job_prompt(job: dict, *, runtime_model: str = "", runtime_provider: s
f"{prompt}"
)
# Build runtime context block — inject model/provider/runtime classification
# so the agent knows what infrastructure it has access to.
# Fix #565: derive provider from model prefix when runtime_provider is empty.
_runtime_block = ""
if runtime_model or runtime_provider:
if not runtime_provider and "/" in runtime_model:
runtime_provider = runtime_model.split("/")[0]
_kind = _classify_runtime(runtime_provider, runtime_model)
_parts = []
if runtime_model:
_parts.append(f"MODEL: {runtime_model}")
if runtime_provider:
_parts.append(f"PROVIDER: {runtime_provider}")
if _kind == "local":
_parts.append("RUNTIME: local — access to machine, Ollama, SSH")
elif _kind == "cloud":
_parts.append("RUNTIME: cloud — NO local access, NO SSH, NO localhost")
if _parts:
_runtime_block = "[SYSTEM: RUNTIME CONTEXT — " + "; ".join(_parts) + "]\n\n"
# Always prepend cron execution guidance so the agent knows how
# delivery works and can suppress delivery when appropriate.
cron_hint = (
@@ -707,7 +597,7 @@ def _build_job_prompt(job: dict, *, runtime_model: str = "", runtime_provider: s
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
)
prompt = _runtime_block + cron_hint + prompt
prompt = cron_hint + prompt
if skills is None:
legacy = job.get("skill")
skills = [legacy] if legacy else []
@@ -777,23 +667,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
job_id = job["id"]
job_name = job["name"]
# Resolve runtime model/provider early so the prompt gets accurate context.
_runtime_model = job.get("model") or os.getenv("HERMES_MODEL") or ""
_runtime_provider = os.getenv("HERMES_PROVIDER", "")
if not _runtime_model:
try:
import yaml as _y
_cp2 = str(_hermes_home / "config.yaml")
if os.path.exists(_cp2):
with open(_cp2) as _f:
_ce = _y.safe_load(_f) or {}
_mc = _ce.get("model", {})
_runtime_model = _mc if isinstance(_mc, str) else (_mc.get("default", "") if isinstance(_mc, dict) else "")
except Exception:
pass
prompt = _build_job_prompt(job, runtime_model=_runtime_model, runtime_provider=_runtime_provider)
prompt = _build_job_prompt(job)
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
@@ -905,14 +779,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
_active_provider = runtime.get("provider", "") or ""
_mismatch = _detect_provider_mismatch(job.get("prompt", ""), _active_provider)
if _mismatch:
logger.warning(
"Job '%s': prompt references '%s' but active provider is '%s'",
job_name, _mismatch, _active_provider,
)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -0,0 +1,61 @@
"""Tests for memory backends (#322)."""
import json, pytest
from agent.memory import Entry, Null, Local, Honcho, score, evaluate, get_backend, reset
@pytest.fixture()
def loc(tmp_path): return Local(p=tmp_path/"t.db")
@pytest.fixture()
def rst(): reset(); yield; reset()
class TestEntry:
def test_defaults(self):
e=Entry(key="k",value="v",uid="u"); assert e.created>0
class TestNull:
def test_ok(self): assert Null().ok()
def test_put(self): assert Null().put("u","k","v")
def test_get(self): assert Null().get("u","k") is None
def test_find(self): assert Null().find("u","q")==[]
def test_all(self): assert Null().all("u")==[]
def test_rm(self): assert Null().rm("u","k")
def test_not_cloud(self): assert not Null().cloud
class TestLocal:
def test_ok(self,loc): assert loc.ok()
def test_put_get(self,loc):
assert loc.put("u","lang","py")
assert loc.get("u","lang").value=="py"
def test_meta(self,loc):
loc.put("u","k","v",{"type":"pattern"})
assert loc.get("u","k").etype=="pattern"
def test_update(self,loc):
loc.put("u","k","v1"); loc.put("u","k","v2")
assert loc.get("u","k").value=="v2"
def test_find(self,loc):
loc.put("u","pref_py","1"); loc.put("u","pref_vim","1"); loc.put("u","th","d")
assert len(loc.find("u","pref"))==2
def test_all(self,loc):
loc.put("u","a","1"); loc.put("u","b","2"); assert len(loc.all("u"))==2
def test_rm(self,loc):
loc.put("u","k","v"); assert loc.rm("u","k"); assert loc.get("u","k") is None
def test_not_cloud(self,loc): assert not loc.cloud
def test_users(self,loc):
loc.put("u1","k","v1"); loc.put("u2","k","v2")
assert loc.get("u1","k").value=="v1"
class TestHoncho:
def test_no_key(self,monkeypatch):
monkeypatch.delenv("HONCHO_API_KEY",raising=False); assert not Honcho().ok()
def test_cloud(self): assert Honcho().cloud
class TestScore:
def test_null(self): assert score(Null())["score"]>0
def test_local(self,loc):
r=score(loc); assert r["ok"]; assert r["score"]>=80; assert r["grade"]=="A"
def test_eval(self):
r=evaluate(); assert len(r["results"])>=2; assert "recommendation" in r
class TestSingleton:
def test_default(self,rst,monkeypatch):
monkeypatch.delenv("HONCHO_API_KEY",raising=False); assert isinstance(get_backend(),Local)
def test_cache(self,rst): assert get_backend() is get_backend()

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS, _classify_runtime, _detect_provider_mismatch
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
class TestResolveOrigin:
@@ -670,13 +670,6 @@ class TestRunJobSkillBacked:
class TestSilentDelivery:
"""Verify that [SILENT] responses suppress delivery while still saving output."""
@pytest.fixture(autouse=True)
def _isolate_lock(self, tmp_path):
"""Give each test its own tick lock file to prevent parallel test contention."""
with patch("cron.scheduler._LOCK_FILE", tmp_path / ".tick.lock"), \
patch("cron.scheduler._LOCK_DIR", tmp_path):
yield
def _make_job(self):
return {
"id": "monitor-job",
@@ -834,102 +827,10 @@ class TestBuildJobPromptMissingSkill:
assert "go" in result
class TestClassifyRuntime:
"""Unit tests for _classify_runtime."""
def test_cloud_provider_explicit(self):
assert _classify_runtime("openai", "") == "cloud"
assert _classify_runtime("anthropic", "") == "cloud"
assert _classify_runtime("nous", "") == "cloud"
def test_local_provider_explicit(self):
assert _classify_runtime("ollama", "") == "local"
assert _classify_runtime("local", "") == "local"
def test_cloud_detected_from_model_prefix(self):
"""Model prefix 'nous/...' should be classified as cloud even with no provider."""
assert _classify_runtime("", "nous/mimo-v2-pro") == "cloud"
assert _classify_runtime("", "openai/gpt-4o") == "cloud"
def test_local_when_model_has_no_cloud_prefix(self):
"""A model without a cloud prefix and no provider => local."""
assert _classify_runtime("", "llama3") == "local"
def test_unknown_when_empty(self):
assert _classify_runtime("", "") == "unknown"
class TestBuildJobPromptRuntimeContext:
"""Verify runtime context block injection in _build_job_prompt."""
def test_runtime_block_injected_with_model_and_provider(self):
job = {"prompt": "Do something"}
result = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
assert "RUNTIME CONTEXT" in result
assert "MODEL: nous/mimo-v2-pro" in result
assert "PROVIDER: nous" in result
assert "cloud" in result
def test_provider_derived_from_model_prefix_when_empty(self):
"""Fix #565: PROVIDER should be derived from model prefix when runtime_provider is empty."""
job = {"prompt": "Do something"}
result = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="")
assert "PROVIDER: nous" in result
def test_provider_not_empty_in_context_block(self):
"""Fix #565: PROVIDER line must not be blank when model has a slash prefix."""
job = {"prompt": "Check status"}
result = _build_job_prompt(job, runtime_model="openai/gpt-4o", runtime_provider="")
assert "PROVIDER: openai" in result
assert "PROVIDER: ;" not in result
assert "PROVIDER: ]" not in result
def test_no_runtime_block_when_no_model_or_provider(self):
"""No runtime block should appear when neither model nor provider is given."""
job = {"prompt": "Hello"}
result = _build_job_prompt(job)
assert "RUNTIME CONTEXT" not in result
def test_local_runtime_classification(self):
"""ollama model should get local runtime label."""
job = {"prompt": "Query local model"}
result = _build_job_prompt(job, runtime_model="llama3", runtime_provider="ollama")
assert "RUNTIME: local" in result
assert "NO local access" not in result
def test_runtime_block_precedes_cron_hint(self):
"""RUNTIME CONTEXT block should appear before the cron system hint."""
job = {"prompt": "test"}
result = _build_job_prompt(job, runtime_model="nous/mimo-v2-pro", runtime_provider="nous")
runtime_pos = result.index("RUNTIME CONTEXT")
cron_pos = result.index("scheduled cron job")
assert runtime_pos < cron_pos
class TestDetectProviderMismatch:
"""Unit tests for _detect_provider_mismatch."""
def test_no_mismatch_when_same_provider(self):
assert _detect_provider_mismatch("Use ollama to generate", "ollama") is None
def test_mismatch_detected(self):
"""Prompt referencing 'ollama' while running on 'nous' should flag a mismatch."""
result = _detect_provider_mismatch("Check if Ollama is responding", "nous")
assert result == "ollama"
def test_no_mismatch_for_empty_inputs(self):
assert _detect_provider_mismatch("", "nous") is None
assert _detect_provider_mismatch("some prompt", "") is None
def test_no_mismatch_when_provider_unknown(self):
"""Unknown active provider should not raise, just return None."""
assert _detect_provider_mismatch("Check Ollama", "mystery-provider") is None
class TestTickAdvanceBeforeRun:
"""Verify that tick() calls advance_next_run before run_job for crash safety."""
def test_advance_called_before_run_job(self, tmp_path, monkeypatch):
def test_advance_called_before_run_job(self, tmp_path):
"""advance_next_run must be called before run_job to prevent crash-loop re-fires."""
call_order = []
@@ -954,9 +855,7 @@ class TestTickAdvanceBeforeRun:
patch("cron.scheduler.run_job", side_effect=fake_run_job), \
patch("cron.scheduler.save_job_output", return_value=tmp_path / "out.md"), \
patch("cron.scheduler.mark_job_run"), \
patch("cron.scheduler._deliver_result"), \
patch("cron.scheduler._LOCK_FILE", tmp_path / ".tick.lock"), \
patch("cron.scheduler._LOCK_DIR", tmp_path):
patch("cron.scheduler._deliver_result"):
from cron.scheduler import tick
executed = tick(verbose=False)
@@ -1001,7 +900,7 @@ class TestDeploySyncGuard:
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match=r"(?s)missing params:.*tool_choice"):
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:

View File

@@ -0,0 +1,31 @@
"""Memory backend tool. Local default, Honcho opt-in."""
import json
from tools.registry import registry
def memory_backend(action, uid="default", key=None, value=None, query=None, meta=None):
from agent.memory import get_backend, evaluate
b = get_backend()
if action=="info": return json.dumps({"ok":True,"backend":b.name,"cloud":b.cloud,"available":b.ok()})
if action=="store":
if not key or value is None: return json.dumps({"ok":False,"error":"key+value required"})
return json.dumps({"ok":b.put(uid,key,value,meta),"key":key})
if action=="get":
if not key: return json.dumps({"ok":False,"error":"key required"})
e=b.get(uid,key)
return json.dumps({"ok":True,"key":e.key,"value":e.value,"type":e.etype}) if e else json.dumps({"ok":False,"error":"not found"})
if action=="query":
if not query: return json.dumps({"ok":False,"error":"query required"})
r=b.find(uid,query); return json.dumps({"ok":True,"results":[{"key":e.key,"value":e.value} for e in r],"count":len(r)})
if action=="list":
r=b.all(uid); return json.dumps({"ok":True,"entries":[{"key":e.key,"type":e.etype} for e in r],"count":len(r)})
if action=="delete":
if not key: return json.dumps({"ok":False,"error":"key required"})
return json.dumps({"ok":b.rm(uid,key)})
if action=="evaluate": return json.dumps({"ok":True,**evaluate()})
return json.dumps({"ok":False,"error":f"unknown: {action}"})
registry.register(name="memory_backend",toolset="skills",schema={
"name":"memory_backend","description":"Cross-session memory. Local SQLite default, Honcho cloud opt-in.",
"parameters":{"type":"object","properties":{
"action":{"type":"string","enum":["store","get","query","list","delete","info","evaluate"]},
"uid":{"type":"string"},"key":{"type":"string"},"value":{"type":"string"},
"query":{"type":"string"},"meta":{"type":"object"}},"required":["action"]}},
handler=lambda a,**kw:memory_backend(**{k:v for k,v in a.items() if v is not None}),emoji="🧠")