Compare commits
2 Commits
dawn/322-1
...
queue/329-
| Author | SHA1 | Date | |
|---|---|---|---|
| a6f3ae34a3 | |||
| f94af53cee |
186
agent/memory.py
186
agent/memory.py
@@ -1,186 +0,0 @@
|
||||
"""Memory backends for cross-session user modeling.
|
||||
|
||||
Local SQLite (sovereign, recommended) vs Honcho cloud (opt-in).
|
||||
Evaluation: Local A(95pts) vs Honcho B(60pts).
|
||||
"""
|
||||
|
||||
import json, logging, os, sqlite3, time
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class Entry:
|
||||
key: str; value: str; uid: str
|
||||
etype: str = "preference"; created: float = 0; updated: float = 0; meta: Dict = field(default_factory=dict)
|
||||
def __post_init__(self):
|
||||
t = time.time()
|
||||
if not self.created: self.created = t
|
||||
if not self.updated: self.updated = t
|
||||
|
||||
class Backend(ABC):
|
||||
@abstractmethod
|
||||
def ok(self) -> bool: ...
|
||||
@abstractmethod
|
||||
def put(self, uid: str, k: str, v: str, meta: Dict = None) -> bool: ...
|
||||
@abstractmethod
|
||||
def get(self, uid: str, k: str) -> Optional[Entry]: ...
|
||||
@abstractmethod
|
||||
def find(self, uid: str, q: str, n: int = 10) -> List[Entry]: ...
|
||||
@abstractmethod
|
||||
def all(self, uid: str) -> List[Entry]: ...
|
||||
@abstractmethod
|
||||
def rm(self, uid: str, k: str) -> bool: ...
|
||||
@property
|
||||
@abstractmethod
|
||||
def name(self) -> str: ...
|
||||
@property
|
||||
@abstractmethod
|
||||
def cloud(self) -> bool: ...
|
||||
|
||||
class Null(Backend):
|
||||
def ok(self) -> bool: return True
|
||||
def put(self, uid, k, v, meta=None) -> bool: return True
|
||||
def get(self, uid, k) -> Optional[Entry]: return None
|
||||
def find(self, uid, q, n=10) -> List[Entry]: return []
|
||||
def all(self, uid) -> List[Entry]: return []
|
||||
def rm(self, uid, k) -> bool: return True
|
||||
@property
|
||||
def name(self) -> str: return "null"
|
||||
@property
|
||||
def cloud(self) -> bool: return False
|
||||
|
||||
class Local(Backend):
|
||||
def __init__(self, p: Path = None):
|
||||
self._p = p or get_hermes_home() / "memory.db"
|
||||
self._p.parent.mkdir(parents=True, exist_ok=True)
|
||||
with sqlite3.connect(str(self._p)) as c:
|
||||
c.execute("CREATE TABLE IF NOT EXISTS m(uid TEXT,k TEXT,v TEXT,t TEXT DEFAULT 'preference',m TEXT,c REAL,u REAL,PRIMARY KEY(uid,k))")
|
||||
c.commit()
|
||||
def ok(self) -> bool:
|
||||
try:
|
||||
with sqlite3.connect(str(self._p)) as c: c.execute("SELECT 1")
|
||||
return True
|
||||
except: return False
|
||||
def put(self, uid, k, v, meta=None) -> bool:
|
||||
try:
|
||||
t = time.time(); et = (meta or {}).get("type", "preference")
|
||||
with sqlite3.connect(str(self._p)) as c:
|
||||
c.execute("INSERT INTO m VALUES(?,?,?,?,?,?,?) ON CONFLICT(uid,k) DO UPDATE SET v=excluded.v,t=excluded.t,m=excluded.m,u=excluded.u",
|
||||
(uid, k, v, et, json.dumps(meta) if meta else None, t, t))
|
||||
c.commit()
|
||||
return True
|
||||
except Exception as e: logger.warning("put: %s", e); return False
|
||||
def get(self, uid, k) -> Optional[Entry]:
|
||||
try:
|
||||
with sqlite3.connect(str(self._p)) as c:
|
||||
r = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? AND k=?", (uid, k)).fetchone()
|
||||
return Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) if r else None
|
||||
except: return None
|
||||
def find(self, uid, q, n=10) -> List[Entry]:
|
||||
try:
|
||||
p = f"%{q}%"
|
||||
with sqlite3.connect(str(self._p)) as c:
|
||||
rows = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? AND (k LIKE ? OR v LIKE ?) ORDER BY u DESC LIMIT ?", (uid, p, p, n)).fetchall()
|
||||
return [Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) for r in rows]
|
||||
except: return []
|
||||
def all(self, uid) -> List[Entry]:
|
||||
try:
|
||||
with sqlite3.connect(str(self._p)) as c:
|
||||
rows = c.execute("SELECT k,v,uid,t,m,c,u FROM m WHERE uid=? ORDER BY u DESC", (uid,)).fetchall()
|
||||
return [Entry(key=r[0], value=r[1], uid=r[2], etype=r[3], meta=json.loads(r[4]) if r[4] else {}, created=r[5], updated=r[6]) for r in rows]
|
||||
except: return []
|
||||
def rm(self, uid, k) -> bool:
|
||||
try:
|
||||
with sqlite3.connect(str(self._p)) as c: c.execute("DELETE FROM m WHERE uid=? AND k=?", (uid, k)); c.commit()
|
||||
return True
|
||||
except: return False
|
||||
@property
|
||||
def name(self) -> str: return "local"
|
||||
@property
|
||||
def cloud(self) -> bool: return False
|
||||
|
||||
class Honcho(Backend):
|
||||
def __init__(self):
|
||||
self._c = None; self._k = os.getenv("HONCHO_API_KEY", "")
|
||||
def _lazy(self):
|
||||
if self._c: return self._c
|
||||
if not self._k: return None
|
||||
try:
|
||||
from honcho import Honcho as H; self._c = H(api_key=self._k); return self._c
|
||||
except: return None
|
||||
def ok(self) -> bool:
|
||||
if not self._k: return False
|
||||
c = self._lazy()
|
||||
if not c: return False
|
||||
try: c.get_sessions(limit=1); return True
|
||||
except: return False
|
||||
def put(self, uid, k, v, meta=None) -> bool:
|
||||
c = self._lazy()
|
||||
if not c: return False
|
||||
try: c.add_message(f"m-{uid}", "system", json.dumps({"k": k, "v": v})); return True
|
||||
except: return False
|
||||
def get(self, uid, k) -> Optional[Entry]:
|
||||
for e in self.find(uid, k, 1):
|
||||
if e.key == k: return e
|
||||
return None
|
||||
def find(self, uid, q, n=10) -> List[Entry]:
|
||||
c = self._lazy()
|
||||
if not c: return []
|
||||
try:
|
||||
r = c.chat(f"m-{uid}", f"Find: {q}")
|
||||
if isinstance(r, dict):
|
||||
try:
|
||||
data = json.loads(r.get("content", ""))
|
||||
items = data if isinstance(data, list) else [data]
|
||||
return [Entry(key=i["k"], value=i.get("v", ""), uid=uid) for i in items[:n] if isinstance(i, dict) and i.get("k")]
|
||||
except: pass
|
||||
return []
|
||||
except: return []
|
||||
def all(self, uid) -> List[Entry]: return self.find(uid, "", 100)
|
||||
def rm(self, uid, k) -> bool: return False
|
||||
@property
|
||||
def name(self) -> str: return "honcho"
|
||||
@property
|
||||
def cloud(self) -> bool: return True
|
||||
|
||||
def score(b: Backend, uid: str = "_e_") -> Dict:
|
||||
if not b.ok(): return {"name": b.name, "score": 0, "grade": "F", "ok": False, "cloud": b.cloud}
|
||||
s = 20
|
||||
t0 = time.perf_counter(); ok = b.put(uid, "ek", "ev"); sm = (time.perf_counter()-t0)*1000; s += 15 if ok else 0
|
||||
t0 = time.perf_counter(); r = b.get(uid, "ek"); gm = (time.perf_counter()-t0)*1000; s += 15 if r else 0
|
||||
t0 = time.perf_counter(); q = b.find(uid, "ev", 5); qm = (time.perf_counter()-t0)*1000; s += 10 if q else 0
|
||||
avg = (sm+gm+qm)/3; s += 20 if avg < 10 else 15 if avg < 50 else 10 if avg < 200 else 5
|
||||
s += 20 if not b.cloud else 5
|
||||
try: b.rm(uid, "ek")
|
||||
except: pass
|
||||
g = "A" if s >= 80 else "B" if s >= 60 else "C" if s >= 40 else "D" if s >= 20 else "F"
|
||||
return {"name": b.name, "score": s, "grade": g, "ok": True, "cloud": b.cloud}
|
||||
|
||||
def evaluate() -> Dict:
|
||||
bs = [Null(), Local()]
|
||||
if os.getenv("HONCHO_API_KEY"):
|
||||
try: bs.append(Honcho())
|
||||
except: pass
|
||||
rs = [score(b) for b in bs]
|
||||
best = max((r for r in rs if r["name"] != "null" and r["ok"]), key=lambda r: r["score"], default=None)
|
||||
rec = f"Best: {best['name']} ({best['score']}pts, {best['grade']})" if best else "None"
|
||||
if best and best.get("cloud"): rec += " WARNING: cloud. RECOMMEND local."
|
||||
return {"results": rs, "recommendation": rec}
|
||||
|
||||
_inst = None
|
||||
def get_backend() -> Backend:
|
||||
global _inst
|
||||
if _inst: return _inst
|
||||
if os.getenv("HONCHO_API_KEY") and os.getenv("HERMES_MEMORY_BACKEND", "").lower() != "local":
|
||||
try:
|
||||
h = Honcho()
|
||||
if h.ok(): _inst = h; return _inst
|
||||
except: pass
|
||||
_inst = Local(); return _inst
|
||||
def reset(): global _inst; _inst = None
|
||||
89
examples/session_templates_example.py
Normal file
89
examples/session_templates_example.py
Normal file
@@ -0,0 +1,89 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Example: Using session templates for code-first seeding.
|
||||
|
||||
This script demonstrates how to use the session template system
|
||||
to pre-seed new sessions with successful tool call patterns.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add the parent directory to the path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.session_templates import SessionTemplates, TaskType
|
||||
|
||||
|
||||
def main():
|
||||
"""Demonstrate session template usage."""
|
||||
|
||||
# Create template manager
|
||||
templates = SessionTemplates()
|
||||
|
||||
print("Session Templates Example")
|
||||
print("=" * 50)
|
||||
|
||||
# List existing templates
|
||||
print("\n1. Existing templates:")
|
||||
template_list = templates.list_templates()
|
||||
if template_list:
|
||||
for t in template_list:
|
||||
print(f" - {t.name}: {t.task_type.value} ({len(t.examples)} examples)")
|
||||
else:
|
||||
print(" No templates found")
|
||||
|
||||
# Example: Create a template from a session
|
||||
print("\n2. Creating a template from a session:")
|
||||
print(" (This would normally use a real session ID)")
|
||||
|
||||
# Example: Get a template for code tasks
|
||||
print("\n3. Getting a template for CODE tasks:")
|
||||
code_template = templates.get_template(TaskType.CODE)
|
||||
if code_template:
|
||||
print(f" Found template: {code_template.name}")
|
||||
print(f" Type: {code_template.task_type.value}")
|
||||
print(f" Examples: {len(code_template.examples)}")
|
||||
|
||||
# Show first example
|
||||
if code_template.examples:
|
||||
example = code_template.examples[0]
|
||||
print(f" First example: {example.tool_name}")
|
||||
print(f" Arguments: {example.arguments}")
|
||||
print(f" Result preview: {example.result[:100]}...")
|
||||
else:
|
||||
print(" No CODE template found")
|
||||
|
||||
# Example: Inject template into messages
|
||||
print("\n4. Injecting template into messages:")
|
||||
if code_template:
|
||||
# Create sample messages
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": "Help me write some code"}
|
||||
]
|
||||
|
||||
# Inject template
|
||||
updated_messages = templates.inject_into_messages(code_template, messages)
|
||||
|
||||
print(f" Original messages: {len(messages)}")
|
||||
print(f" Updated messages: {len(updated_messages)}")
|
||||
print(f" Template usage count: {code_template.usage_count}")
|
||||
|
||||
# Show the injection
|
||||
print("\n Injected messages:")
|
||||
for i, msg in enumerate(updated_messages[:6]): # Show first 6
|
||||
role = msg.get('role', 'unknown')
|
||||
content = msg.get('content', '')
|
||||
if content:
|
||||
content_preview = content[:50] + "..." if len(content) > 50 else content
|
||||
print(f" {i}: {role} - {content_preview}")
|
||||
else:
|
||||
print(f" {i}: {role} - (tool call)")
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("Example complete!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,62 +0,0 @@
|
||||
"""Tests for memory backends (#322)."""
|
||||
import json, pytest
|
||||
from agent.memory import Entry, Null, Local, Honcho, score, evaluate, get_backend, reset
|
||||
|
||||
@pytest.fixture()
|
||||
def loc(tmp_path): return Local(p=tmp_path / "t.db")
|
||||
@pytest.fixture()
|
||||
def rst(): reset(); yield; reset()
|
||||
|
||||
class TestEntry:
|
||||
def test_defaults(self):
|
||||
e = Entry(key="k", value="v", uid="u"); assert e.created > 0
|
||||
|
||||
class TestNull:
|
||||
def test_ok(self): assert Null().ok()
|
||||
def test_put(self): assert Null().put("u", "k", "v")
|
||||
def test_get(self): assert Null().get("u", "k") is None
|
||||
def test_find(self): assert Null().find("u", "q") == []
|
||||
def test_all(self): assert Null().all("u") == []
|
||||
def test_rm(self): assert Null().rm("u", "k")
|
||||
def test_not_cloud(self): assert not Null().cloud
|
||||
|
||||
class TestLocal:
|
||||
def test_ok(self, loc): assert loc.ok()
|
||||
def test_put_get(self, loc):
|
||||
assert loc.put("u", "lang", "py")
|
||||
assert loc.get("u", "lang").value == "py"
|
||||
def test_meta(self, loc):
|
||||
loc.put("u", "k", "v", {"type": "pattern"})
|
||||
assert loc.get("u", "k").etype == "pattern"
|
||||
def test_update(self, loc):
|
||||
loc.put("u", "k", "v1"); loc.put("u", "k", "v2")
|
||||
assert loc.get("u", "k").value == "v2"
|
||||
def test_find(self, loc):
|
||||
loc.put("u", "pref_py", "1"); loc.put("u", "pref_vim", "1"); loc.put("u", "th", "d")
|
||||
assert len(loc.find("u", "pref")) == 2
|
||||
def test_all(self, loc):
|
||||
loc.put("u", "a", "1"); loc.put("u", "b", "2"); assert len(loc.all("u")) == 2
|
||||
def test_rm(self, loc):
|
||||
loc.put("u", "k", "v"); assert loc.rm("u", "k"); assert loc.get("u", "k") is None
|
||||
def test_not_cloud(self, loc): assert not loc.cloud
|
||||
def test_users(self, loc):
|
||||
loc.put("u1", "k", "v1"); loc.put("u2", "k", "v2")
|
||||
assert loc.get("u1", "k").value == "v1"
|
||||
assert loc.get("u2", "k").value == "v2"
|
||||
|
||||
class TestHoncho:
|
||||
def test_no_key(self, monkeypatch):
|
||||
monkeypatch.delenv("HONCHO_API_KEY", raising=False); assert not Honcho().ok()
|
||||
def test_cloud(self): assert Honcho().cloud
|
||||
|
||||
class TestScore:
|
||||
def test_null(self): assert score(Null())["score"] > 0
|
||||
def test_local(self, loc):
|
||||
r = score(loc); assert r["ok"]; assert r["score"] >= 80; assert r["grade"] == "A"
|
||||
def test_eval(self):
|
||||
r = evaluate(); assert len(r["results"]) >= 2; assert "recommendation" in r
|
||||
|
||||
class TestSingleton:
|
||||
def test_default(self, rst, monkeypatch):
|
||||
monkeypatch.delenv("HONCHO_API_KEY", raising=False); assert isinstance(get_backend(), Local)
|
||||
def test_cache(self, rst): assert get_backend() is get_backend()
|
||||
@@ -1,36 +0,0 @@
|
||||
"""Memory backend tool. Local default, Honcho opt-in."""
|
||||
import json
|
||||
from tools.registry import registry
|
||||
|
||||
def memory_backend(action, uid="default", key=None, value=None, query=None, meta=None):
|
||||
from agent.memory import get_backend, evaluate
|
||||
b = get_backend()
|
||||
if action == "info": return json.dumps({"ok": True, "backend": b.name, "cloud": b.cloud, "available": b.ok()})
|
||||
if action == "store":
|
||||
if not key or value is None: return json.dumps({"ok": False, "error": "key+value required"})
|
||||
return json.dumps({"ok": b.put(uid, key, value, meta), "key": key})
|
||||
if action == "get":
|
||||
if not key: return json.dumps({"ok": False, "error": "key required"})
|
||||
e = b.get(uid, key)
|
||||
return json.dumps({"ok": True, "key": e.key, "value": e.value, "type": e.etype}) if e else json.dumps({"ok": False, "error": "not found"})
|
||||
if action == "query":
|
||||
if not query: return json.dumps({"ok": False, "error": "query required"})
|
||||
r = b.find(uid, query)
|
||||
return json.dumps({"ok": True, "results": [{"key": e.key, "value": e.value} for e in r], "count": len(r)})
|
||||
if action == "list":
|
||||
r = b.all(uid)
|
||||
return json.dumps({"ok": True, "entries": [{"key": e.key, "type": e.etype} for e in r], "count": len(r)})
|
||||
if action == "delete":
|
||||
if not key: return json.dumps({"ok": False, "error": "key required"})
|
||||
return json.dumps({"ok": b.rm(uid, key)})
|
||||
if action == "evaluate": return json.dumps({"ok": True, **evaluate()})
|
||||
return json.dumps({"ok": False, "error": f"unknown: {action}"})
|
||||
|
||||
registry.register(name="memory_backend", toolset="skills", schema={
|
||||
"name": "memory_backend",
|
||||
"description": "Cross-session memory. Local SQLite default, Honcho cloud opt-in. Zero overhead when disabled.",
|
||||
"parameters": {"type": "object", "properties": {
|
||||
"action": {"type": "string", "enum": ["store", "get", "query", "list", "delete", "info", "evaluate"]},
|
||||
"uid": {"type": "string"}, "key": {"type": "string"}, "value": {"type": "string"},
|
||||
"query": {"type": "string"}, "meta": {"type": "object"}}, "required": ["action"]}},
|
||||
handler=lambda a, **kw: memory_backend(**{k: v for k, v in a.items() if v is not None}), emoji="🧠")
|
||||
384
tools/session_templates.py
Normal file
384
tools/session_templates.py
Normal file
@@ -0,0 +1,384 @@
|
||||
"""
|
||||
Session templates for code-first seeding.
|
||||
|
||||
Based on research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
|
||||
improve over time. File-heavy sessions degrade. The key is deterministic feedback loops.
|
||||
|
||||
This module provides:
|
||||
1. Template extraction from successful sessions
|
||||
2. Task type classification (code, file, research)
|
||||
3. Template storage in ~/.hermes/session-templates/
|
||||
4. Template injection into new sessions
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default template directory
|
||||
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
"""Task type classification."""
|
||||
CODE = "code"
|
||||
FILE = "file"
|
||||
RESEARCH = "research"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallExample:
|
||||
"""A single tool call example."""
|
||||
tool_name: str
|
||||
arguments: Dict[str, Any]
|
||||
result: str
|
||||
success: bool
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallExample':
|
||||
return cls(**data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionTemplate:
|
||||
"""A session template with tool call examples."""
|
||||
name: str
|
||||
task_type: TaskType
|
||||
examples: List[ToolCallExample]
|
||||
description: str = ""
|
||||
created_at: float = 0.0
|
||||
usage_count: int = 0
|
||||
|
||||
def __post_init__(self):
|
||||
if self.created_at == 0.0:
|
||||
self.created_at = time.time()
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
data = asdict(self)
|
||||
data['task_type'] = self.task_type.value
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
|
||||
data['task_type'] = TaskType(data['task_type'])
|
||||
examples_data = data.get('examples', [])
|
||||
data['examples'] = [ToolCallExample.from_dict(e) for e in examples_data]
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class SessionTemplates:
|
||||
"""Manages session templates for code-first seeding."""
|
||||
|
||||
def __init__(self, template_dir: Optional[Path] = None):
|
||||
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
|
||||
self.template_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.templates: Dict[str, SessionTemplate] = {}
|
||||
self._load_templates()
|
||||
|
||||
def _load_templates(self):
|
||||
"""Load all templates from disk."""
|
||||
for template_file in self.template_dir.glob("*.json"):
|
||||
try:
|
||||
with open(template_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
template = SessionTemplate.from_dict(data)
|
||||
self.templates[template.name] = template
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load template {template_file}: {e}")
|
||||
|
||||
def _save_template(self, template: SessionTemplate):
|
||||
"""Save a template to disk."""
|
||||
template_file = self.template_dir / f"{template.name}.json"
|
||||
with open(template_file, 'w') as f:
|
||||
json.dump(template.to_dict(), f, indent=2)
|
||||
|
||||
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
|
||||
"""Classify task type based on tool calls."""
|
||||
if not tool_calls:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Count tool types
|
||||
code_tools = {'execute_code', 'code_execution'}
|
||||
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
|
||||
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
|
||||
|
||||
tool_names = [tc.get('tool_name', '') for tc in tool_calls]
|
||||
|
||||
code_count = sum(1 for t in tool_names if t in code_tools)
|
||||
file_count = sum(1 for t in tool_names if t in file_tools)
|
||||
research_count = sum(1 for t in tool_names if t in research_tools)
|
||||
|
||||
total = len(tool_calls)
|
||||
if total == 0:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Determine dominant type (60% threshold)
|
||||
if code_count / total > 0.6:
|
||||
return TaskType.CODE
|
||||
elif file_count / total > 0.6:
|
||||
return TaskType.FILE
|
||||
elif research_count / total > 0.6:
|
||||
return TaskType.RESEARCH
|
||||
else:
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract_from_session(self, session_id: str, max_examples: int = 10) -> List[ToolCallExample]:
|
||||
"""Extract successful tool calls from a session."""
|
||||
db_path = Path.home() / ".hermes" / "state.db"
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Get messages with tool calls
|
||||
cursor = conn.execute("""
|
||||
SELECT role, content, tool_calls, tool_name
|
||||
FROM messages
|
||||
WHERE session_id = ?
|
||||
ORDER BY timestamp
|
||||
LIMIT 100
|
||||
""", (session_id,))
|
||||
|
||||
messages = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
examples = []
|
||||
for msg in messages:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
if msg['role'] == 'assistant' and msg['tool_calls']:
|
||||
try:
|
||||
tool_calls = json.loads(msg['tool_calls'])
|
||||
for tc in tool_calls:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
tool_name = tc.get('function', {}).get('name')
|
||||
if not tool_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
|
||||
except:
|
||||
arguments = {}
|
||||
|
||||
examples.append(ToolCallExample(
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
result="", # Will be filled from tool response
|
||||
success=True
|
||||
))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
elif msg['role'] == 'tool' and examples and examples[-1].result == "":
|
||||
examples[-1].result = msg['content'] or ""
|
||||
|
||||
return examples
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract from session {session_id}: {e}")
|
||||
return []
|
||||
|
||||
def create_template(self, session_id: str, name: Optional[str] = None,
|
||||
task_type: Optional[TaskType] = None,
|
||||
max_examples: int = 10) -> Optional[SessionTemplate]:
|
||||
"""Create a template from a session."""
|
||||
examples = self.extract_from_session(session_id, max_examples)
|
||||
if not examples:
|
||||
return None
|
||||
|
||||
# Classify task type if not provided
|
||||
if task_type is None:
|
||||
tool_calls = [{'tool_name': e.tool_name} for e in examples]
|
||||
task_type = self.classify_task_type(tool_calls)
|
||||
|
||||
# Generate name if not provided
|
||||
if name is None:
|
||||
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
|
||||
|
||||
# Create template
|
||||
template = SessionTemplate(
|
||||
name=name,
|
||||
task_type=task_type,
|
||||
examples=examples,
|
||||
description=f"Template with {len(examples)} examples"
|
||||
)
|
||||
|
||||
# Save template
|
||||
self.templates[name] = template
|
||||
self._save_template(template)
|
||||
|
||||
logger.info(f"Created template {name} with {len(examples)} examples")
|
||||
return template
|
||||
|
||||
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
|
||||
"""Get the best template for a task type."""
|
||||
matching = [t for t in self.templates.values() if t.task_type == task_type]
|
||||
if not matching:
|
||||
return None
|
||||
|
||||
# Sort by usage count (prefer less used templates)
|
||||
matching.sort(key=lambda t: t.usage_count)
|
||||
return matching[0]
|
||||
|
||||
def inject_into_messages(self, template: SessionTemplate,
|
||||
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Inject template examples into messages."""
|
||||
if not template.examples:
|
||||
return messages
|
||||
|
||||
# Create injection messages
|
||||
injection = []
|
||||
|
||||
# Add system message
|
||||
injection.append({
|
||||
"role": "system",
|
||||
"content": f"Session template: {template.name} ({template.task_type.value})\n"
|
||||
f"Examples of successful tool calls from previous sessions:"
|
||||
})
|
||||
|
||||
# Add tool call examples
|
||||
for i, example in enumerate(template.examples):
|
||||
# Assistant message with tool call
|
||||
injection.append({
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [{
|
||||
"id": f"template_{i}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": example.tool_name,
|
||||
"arguments": json.dumps(example.arguments)
|
||||
}
|
||||
}]
|
||||
})
|
||||
|
||||
# Tool response
|
||||
injection.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"template_{i}",
|
||||
"content": example.result
|
||||
})
|
||||
|
||||
# Insert after system messages
|
||||
insert_index = 0
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") != "system":
|
||||
break
|
||||
insert_index = i + 1
|
||||
|
||||
# Insert injection
|
||||
for i, msg in enumerate(injection):
|
||||
messages.insert(insert_index + i, msg)
|
||||
|
||||
# Update usage count
|
||||
template.usage_count += 1
|
||||
self._save_template(template)
|
||||
|
||||
return messages
|
||||
|
||||
def list_templates(self, task_type: Optional[TaskType] = None) -> List[SessionTemplate]:
|
||||
"""List templates, optionally filtered by task type."""
|
||||
templates = list(self.templates.values())
|
||||
if task_type:
|
||||
templates = [t for t in templates if t.task_type == task_type]
|
||||
templates.sort(key=lambda t: t.created_at, reverse=True)
|
||||
return templates
|
||||
|
||||
def delete_template(self, name: str) -> bool:
|
||||
"""Delete a template."""
|
||||
if name not in self.templates:
|
||||
return False
|
||||
|
||||
del self.templates[name]
|
||||
template_file = self.template_dir / f"{name}.json"
|
||||
if template_file.exists():
|
||||
template_file.unlink()
|
||||
|
||||
logger.info(f"Deleted template {name}")
|
||||
return True
|
||||
|
||||
|
||||
# CLI interface
|
||||
def main():
|
||||
"""CLI for session templates."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session Templates")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# List templates
|
||||
list_parser = subparsers.add_parser("list", help="List templates")
|
||||
list_parser.add_argument("--type", choices=["code", "file", "research", "mixed"])
|
||||
|
||||
# Create template
|
||||
create_parser = subparsers.add_parser("create", help="Create template from session")
|
||||
create_parser.add_argument("session_id", help="Session ID")
|
||||
create_parser.add_argument("--name", help="Template name")
|
||||
create_parser.add_argument("--type", choices=["code", "file", "research", "mixed"])
|
||||
create_parser.add_argument("--max-examples", type=int, default=10)
|
||||
|
||||
# Delete template
|
||||
delete_parser = subparsers.add_parser("delete", help="Delete template")
|
||||
delete_parser.add_argument("name", help="Template name")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
templates = SessionTemplates()
|
||||
|
||||
if args.command == "list":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
template_list = templates.list_templates(task_type)
|
||||
|
||||
if not template_list:
|
||||
print("No templates found")
|
||||
return
|
||||
|
||||
print(f"Found {len(template_list)} templates:")
|
||||
for t in template_list:
|
||||
print(f" {t.name}: {t.task_type.value} ({len(t.examples)} examples, used {t.usage_count} times)")
|
||||
|
||||
elif args.command == "create":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
template = templates.create_template(
|
||||
args.session_id,
|
||||
name=args.name,
|
||||
task_type=task_type,
|
||||
max_examples=args.max_examples
|
||||
)
|
||||
|
||||
if template:
|
||||
print(f"Created template: {template.name}")
|
||||
print(f" Type: {template.task_type.value}")
|
||||
print(f" Examples: {len(template.examples)}")
|
||||
else:
|
||||
print("Failed to create template")
|
||||
|
||||
elif args.command == "delete":
|
||||
if templates.delete_template(args.name):
|
||||
print(f"Deleted template: {args.name}")
|
||||
else:
|
||||
print(f"Template not found: {args.name}")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user