Compare commits
1 Commits
triage/329
...
whip/322-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3563896f86 |
171
agent/memory/__init__.py
Normal file
171
agent/memory/__init__.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Memory Backend Interface — pluggable cross-session user modeling.
|
||||
|
||||
Provides a common interface for memory backends that persist user
|
||||
preferences and patterns across sessions. Two implementations:
|
||||
|
||||
1. LocalBackend (default): SQLite-based, zero cloud dependency
|
||||
2. HonchoBackend (opt-in): Honcho AI-native memory, requires API key
|
||||
|
||||
Both are zero-overhead when disabled — the interface returns empty
|
||||
results and no writes occur.
|
||||
|
||||
Usage:
|
||||
from agent.memory import get_memory_backend
|
||||
|
||||
backend = get_memory_backend() # returns configured backend
|
||||
backend.store_preference("user", "prefers_python", "True")
|
||||
context = backend.query_context("user", "What does this user prefer?")
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryEntry:
|
||||
"""A single memory entry."""
|
||||
key: str
|
||||
value: str
|
||||
user_id: str
|
||||
created_at: float = 0
|
||||
updated_at: float = 0
|
||||
metadata: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self):
|
||||
now = time.time()
|
||||
if not self.created_at:
|
||||
self.created_at = now
|
||||
if not self.updated_at:
|
||||
self.updated_at = now
|
||||
|
||||
|
||||
class MemoryBackend(ABC):
|
||||
"""Abstract interface for memory backends."""
|
||||
|
||||
@abstractmethod
|
||||
def is_available(self) -> bool:
|
||||
"""Check if this backend is configured and usable."""
|
||||
|
||||
@abstractmethod
|
||||
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
|
||||
"""Store a memory entry."""
|
||||
|
||||
@abstractmethod
|
||||
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
|
||||
"""Retrieve a single memory entry."""
|
||||
|
||||
@abstractmethod
|
||||
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
|
||||
"""Query memories relevant to a text query."""
|
||||
|
||||
@abstractmethod
|
||||
def list_keys(self, user_id: str) -> List[str]:
|
||||
"""List all keys for a user."""
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, user_id: str, key: str) -> bool:
|
||||
"""Delete a memory entry."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def backend_name(self) -> str:
|
||||
"""Human-readable backend name."""
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def is_cloud(self) -> bool:
|
||||
"""Whether this backend requires cloud connectivity."""
|
||||
|
||||
|
||||
class NullBackend(MemoryBackend):
|
||||
"""No-op backend when memory is disabled. Zero overhead."""
|
||||
|
||||
def is_available(self) -> bool:
|
||||
return True # always "available" as null
|
||||
|
||||
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
|
||||
return True # no-op
|
||||
|
||||
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
|
||||
return None
|
||||
|
||||
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
|
||||
return []
|
||||
|
||||
def list_keys(self, user_id: str) -> List[str]:
|
||||
return []
|
||||
|
||||
def delete(self, user_id: str, key: str) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def backend_name(self) -> str:
|
||||
return "null (disabled)"
|
||||
|
||||
@property
|
||||
def is_cloud(self) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_backend: Optional[MemoryBackend] = None
|
||||
|
||||
|
||||
def get_memory_backend() -> MemoryBackend:
|
||||
"""Get the configured memory backend.
|
||||
|
||||
Priority:
|
||||
1. If HONCHO_API_KEY is set and honcho-ai is installed -> HonchoBackend
|
||||
2. If memory_backend config is 'local' -> LocalBackend
|
||||
3. Default -> NullBackend (zero overhead)
|
||||
"""
|
||||
global _backend
|
||||
if _backend is not None:
|
||||
return _backend
|
||||
|
||||
# Check config
|
||||
backend_type = os.getenv("HERMES_MEMORY_BACKEND", "").lower().strip()
|
||||
|
||||
if backend_type == "honcho" or os.getenv("HONCHO_API_KEY"):
|
||||
try:
|
||||
from agent.memory.honcho_backend import HonchoBackend
|
||||
backend = HonchoBackend()
|
||||
if backend.is_available():
|
||||
_backend = backend
|
||||
logger.info("Memory backend: Honcho (cloud)")
|
||||
return _backend
|
||||
except ImportError:
|
||||
logger.debug("Honcho not installed, falling back")
|
||||
|
||||
if backend_type == "local":
|
||||
try:
|
||||
from agent.memory.local_backend import LocalBackend
|
||||
_backend = LocalBackend()
|
||||
logger.info("Memory backend: Local (SQLite)")
|
||||
return _backend
|
||||
except Exception as e:
|
||||
logger.warning("Local backend failed: %s", e)
|
||||
|
||||
# Default: null (zero overhead)
|
||||
_backend = NullBackend()
|
||||
return _backend
|
||||
|
||||
|
||||
def reset_backend():
|
||||
"""Reset the singleton (for testing)."""
|
||||
global _backend
|
||||
_backend = None
|
||||
263
agent/memory/evaluation.py
Normal file
263
agent/memory/evaluation.py
Normal file
@@ -0,0 +1,263 @@
|
||||
"""Memory Backend Evaluation Framework.
|
||||
|
||||
Provides structured evaluation for comparing memory backends on:
|
||||
1. Latency (store/retrieve/query operations)
|
||||
2. Relevance (does query return useful results?)
|
||||
3. Privacy (where is data stored?)
|
||||
4. Reliability (availability, error handling)
|
||||
5. Cost (API calls, cloud dependency)
|
||||
|
||||
Usage:
|
||||
from agent.memory.evaluation import evaluate_backends
|
||||
report = evaluate_backends()
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BackendEvaluation:
|
||||
"""Evaluation results for a single backend."""
|
||||
backend_name: str
|
||||
is_cloud: bool
|
||||
available: bool
|
||||
|
||||
# Latency (milliseconds)
|
||||
store_latency_ms: float = 0
|
||||
retrieve_latency_ms: float = 0
|
||||
query_latency_ms: float = 0
|
||||
|
||||
# Functionality
|
||||
store_success: bool = False
|
||||
retrieve_success: bool = False
|
||||
query_returns_results: bool = False
|
||||
query_result_count: int = 0
|
||||
|
||||
# Privacy
|
||||
data_location: str = "unknown"
|
||||
requires_api_key: bool = False
|
||||
|
||||
# Overall
|
||||
score: float = 0 # 0-100
|
||||
recommendation: str = ""
|
||||
notes: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
def _measure_latency(func, *args, **kwargs) -> tuple:
|
||||
"""Measure function latency in milliseconds."""
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
elapsed = (time.perf_counter() - start) * 1000
|
||||
return elapsed, result, None
|
||||
except Exception as e:
|
||||
elapsed = (time.perf_counter() - start) * 1000
|
||||
return elapsed, None, e
|
||||
|
||||
|
||||
def evaluate_backend(backend, test_user: str = "eval_user") -> BackendEvaluation:
|
||||
"""Evaluate a single memory backend."""
|
||||
from agent.memory import MemoryBackend
|
||||
|
||||
eval_result = BackendEvaluation(
|
||||
backend_name=backend.backend_name,
|
||||
is_cloud=backend.is_cloud,
|
||||
available=backend.is_available(),
|
||||
)
|
||||
|
||||
if not eval_result.available:
|
||||
eval_result.notes.append("Backend not available")
|
||||
eval_result.score = 0
|
||||
eval_result.recommendation = "NOT AVAILABLE"
|
||||
return eval_result
|
||||
|
||||
# Privacy assessment
|
||||
if backend.is_cloud:
|
||||
eval_result.data_location = "cloud (external)"
|
||||
eval_result.requires_api_key = True
|
||||
else:
|
||||
eval_result.data_location = "local (~/.hermes/)"
|
||||
|
||||
# Test store
|
||||
latency, success, err = _measure_latency(
|
||||
backend.store,
|
||||
test_user,
|
||||
"eval_test_key",
|
||||
"eval_test_value",
|
||||
{"source": "evaluation"},
|
||||
)
|
||||
eval_result.store_latency_ms = latency
|
||||
eval_result.store_success = success is True
|
||||
if err:
|
||||
eval_result.notes.append(f"Store error: {err}")
|
||||
|
||||
# Test retrieve
|
||||
latency, result, err = _measure_latency(
|
||||
backend.retrieve,
|
||||
test_user,
|
||||
"eval_test_key",
|
||||
)
|
||||
eval_result.retrieve_latency_ms = latency
|
||||
eval_result.retrieve_success = result is not None
|
||||
if err:
|
||||
eval_result.notes.append(f"Retrieve error: {err}")
|
||||
|
||||
# Test query
|
||||
latency, results, err = _measure_latency(
|
||||
backend.query,
|
||||
test_user,
|
||||
"eval_test",
|
||||
5,
|
||||
)
|
||||
eval_result.query_latency_ms = latency
|
||||
eval_result.query_returns_results = bool(results)
|
||||
eval_result.query_result_count = len(results) if results else 0
|
||||
if err:
|
||||
eval_result.notes.append(f"Query error: {err}")
|
||||
|
||||
# Cleanup
|
||||
try:
|
||||
backend.delete(test_user, "eval_test_key")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Score calculation (0-100)
|
||||
score = 0
|
||||
|
||||
# Availability (20 points)
|
||||
score += 20
|
||||
|
||||
# Functionality (40 points)
|
||||
if eval_result.store_success:
|
||||
score += 15
|
||||
if eval_result.retrieve_success:
|
||||
score += 15
|
||||
if eval_result.query_returns_results:
|
||||
score += 10
|
||||
|
||||
# Latency (20 points) — lower is better
|
||||
avg_latency = (
|
||||
eval_result.store_latency_ms +
|
||||
eval_result.retrieve_latency_ms +
|
||||
eval_result.query_latency_ms
|
||||
) / 3
|
||||
if avg_latency < 10:
|
||||
score += 20
|
||||
elif avg_latency < 50:
|
||||
score += 15
|
||||
elif avg_latency < 200:
|
||||
score += 10
|
||||
else:
|
||||
score += 5
|
||||
|
||||
# Privacy (20 points) — local is better for sovereignty
|
||||
if not backend.is_cloud:
|
||||
score += 20
|
||||
else:
|
||||
score += 5 # cloud has privacy trade-offs
|
||||
|
||||
eval_result.score = score
|
||||
|
||||
# Recommendation
|
||||
if score >= 80:
|
||||
eval_result.recommendation = "RECOMMENDED"
|
||||
elif score >= 60:
|
||||
eval_result.recommendation = "ACCEPTABLE"
|
||||
elif score >= 40:
|
||||
eval_result.recommendation = "MARGINAL"
|
||||
else:
|
||||
eval_result.recommendation = "NOT RECOMMENDED"
|
||||
|
||||
return eval_result
|
||||
|
||||
|
||||
def evaluate_backends() -> Dict[str, Any]:
|
||||
"""Evaluate all available memory backends.
|
||||
|
||||
Returns a comparison report.
|
||||
"""
|
||||
from agent.memory import NullBackend
|
||||
from agent.memory.local_backend import LocalBackend
|
||||
|
||||
backends = []
|
||||
|
||||
# Always evaluate Null (baseline)
|
||||
backends.append(NullBackend())
|
||||
|
||||
# Evaluate Local
|
||||
try:
|
||||
backends.append(LocalBackend())
|
||||
except Exception as e:
|
||||
logger.warning("Local backend init failed: %s", e)
|
||||
|
||||
# Try Honcho if configured
|
||||
import os
|
||||
if os.getenv("HONCHO_API_KEY"):
|
||||
try:
|
||||
from agent.memory.honcho_backend import HonchoBackend
|
||||
backends.append(HonchoBackend())
|
||||
except ImportError:
|
||||
logger.debug("Honcho not installed, skipping evaluation")
|
||||
|
||||
evaluations = []
|
||||
for backend in backends:
|
||||
try:
|
||||
evaluations.append(evaluate_backend(backend))
|
||||
except Exception as e:
|
||||
logger.warning("Evaluation failed for %s: %s", backend.backend_name, e)
|
||||
|
||||
# Build report
|
||||
report = {
|
||||
"timestamp": time.time(),
|
||||
"backends_evaluated": len(evaluations),
|
||||
"evaluations": [asdict(e) for e in evaluations],
|
||||
"recommendation": _build_recommendation(evaluations),
|
||||
}
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def _build_recommendation(evaluations: List[BackendEvaluation]) -> str:
|
||||
"""Build overall recommendation from evaluations."""
|
||||
if not evaluations:
|
||||
return "No backends evaluated"
|
||||
|
||||
# Find best non-null backend
|
||||
viable = [e for e in evaluations if e.backend_name != "null (disabled)" and e.available]
|
||||
if not viable:
|
||||
return "No viable backends found. Use NullBackend (default)."
|
||||
|
||||
best = max(viable, key=lambda e: e.score)
|
||||
|
||||
parts = [f"Best backend: {best.backend_name} (score: {best.score})"]
|
||||
|
||||
if best.is_cloud:
|
||||
parts.append(
|
||||
"WARNING: Cloud backend has privacy trade-offs. "
|
||||
"Data leaves your machine. Consider LocalBackend for sovereignty."
|
||||
)
|
||||
|
||||
# Compare local vs cloud if both available
|
||||
local = [e for e in viable if not e.is_cloud]
|
||||
cloud = [e for e in viable if e.is_cloud]
|
||||
if local and cloud:
|
||||
local_score = max(e.score for e in local)
|
||||
cloud_score = max(e.score for e in cloud)
|
||||
if local_score >= cloud_score:
|
||||
parts.append(
|
||||
f"Local backend (score {local_score}) matches or beats "
|
||||
f"cloud (score {cloud_score}). RECOMMEND: stay local for sovereignty."
|
||||
)
|
||||
else:
|
||||
parts.append(
|
||||
f"Cloud backend (score {cloud_score}) outperforms "
|
||||
f"local (score {local_score}) but adds cloud dependency."
|
||||
)
|
||||
|
||||
return " ".join(parts)
|
||||
171
agent/memory/honcho_backend.py
Normal file
171
agent/memory/honcho_backend.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""Honcho memory backend — opt-in cloud-based user modeling.
|
||||
|
||||
Requires:
|
||||
- pip install honcho-ai
|
||||
- HONCHO_API_KEY environment variable (from app.honcho.dev)
|
||||
|
||||
Provides dialectic user context queries via Honcho's AI-native memory.
|
||||
Zero runtime overhead when not configured — get_memory_backend() falls
|
||||
back to LocalBackend or NullBackend if this fails to initialize.
|
||||
|
||||
This is the evaluation wrapper. It adapts the Honcho SDK to our
|
||||
MemoryBackend interface so we can A/B test against LocalBackend.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from agent.memory import MemoryBackend, MemoryEntry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HonchoBackend(MemoryBackend):
|
||||
"""Honcho AI-native memory backend.
|
||||
|
||||
Wraps the honcho-ai SDK to provide cross-session user modeling
|
||||
with dialectic context queries.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._client = None
|
||||
self._api_key = os.getenv("HONCHO_API_KEY", "")
|
||||
self._app_id = os.getenv("HONCHO_APP_ID", "hermes-agent")
|
||||
self._base_url = os.getenv("HONCHO_BASE_URL", "https://api.honcho.dev")
|
||||
|
||||
def _get_client(self):
|
||||
"""Lazy-load Honcho client."""
|
||||
if self._client is not None:
|
||||
return self._client
|
||||
|
||||
if not self._api_key:
|
||||
return None
|
||||
|
||||
try:
|
||||
from honcho import Honcho
|
||||
self._client = Honcho(
|
||||
api_key=self._api_key,
|
||||
app_id=self._app_id,
|
||||
base_url=self._base_url,
|
||||
)
|
||||
return self._client
|
||||
except ImportError:
|
||||
logger.warning("honcho-ai not installed. Install with: pip install honcho-ai")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("Failed to initialize Honcho client: %s", e)
|
||||
return None
|
||||
|
||||
def is_available(self) -> bool:
|
||||
if not self._api_key:
|
||||
return False
|
||||
client = self._get_client()
|
||||
if client is None:
|
||||
return False
|
||||
# Try a simple API call to verify connectivity
|
||||
try:
|
||||
# Honcho uses sessions — verify we can list them
|
||||
client.get_sessions(limit=1)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug("Honcho not available: %s", e)
|
||||
return False
|
||||
|
||||
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
|
||||
client = self._get_client()
|
||||
if client is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Honcho stores messages in sessions
|
||||
# We create a synthetic message to store the preference
|
||||
session_id = f"hermes-prefs-{user_id}"
|
||||
message_content = json.dumps({
|
||||
"type": "preference",
|
||||
"key": key,
|
||||
"value": value,
|
||||
"metadata": metadata or {},
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
client.add_message(
|
||||
session_id=session_id,
|
||||
role="system",
|
||||
content=message_content,
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Honcho store failed: %s", e)
|
||||
return False
|
||||
|
||||
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
|
||||
# Honcho doesn't have direct key-value retrieval
|
||||
# We query for the key and return the latest match
|
||||
results = self.query(user_id, key, limit=1)
|
||||
for entry in results:
|
||||
if entry.key == key:
|
||||
return entry
|
||||
return None
|
||||
|
||||
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
|
||||
client = self._get_client()
|
||||
if client is None:
|
||||
return []
|
||||
|
||||
try:
|
||||
session_id = f"hermes-prefs-{user_id}"
|
||||
# Use Honcho's dialectic query
|
||||
result = client.chat(
|
||||
session_id=session_id,
|
||||
message=f"Find preferences related to: {query_text}",
|
||||
)
|
||||
|
||||
# Parse the response into memory entries
|
||||
entries = []
|
||||
if isinstance(result, dict):
|
||||
content = result.get("content", "")
|
||||
try:
|
||||
data = json.loads(content)
|
||||
if isinstance(data, list):
|
||||
for item in data[:limit]:
|
||||
entries.append(MemoryEntry(
|
||||
key=item.get("key", ""),
|
||||
value=item.get("value", ""),
|
||||
user_id=user_id,
|
||||
metadata=item.get("metadata", {}),
|
||||
))
|
||||
elif isinstance(data, dict) and data.get("key"):
|
||||
entries.append(MemoryEntry(
|
||||
key=data.get("key", ""),
|
||||
value=data.get("value", ""),
|
||||
user_id=user_id,
|
||||
metadata=data.get("metadata", {}),
|
||||
))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
return entries
|
||||
except Exception as e:
|
||||
logger.warning("Honcho query failed: %s", e)
|
||||
return []
|
||||
|
||||
def list_keys(self, user_id: str) -> List[str]:
|
||||
# Query all and extract keys
|
||||
results = self.query(user_id, "", limit=100)
|
||||
return list(dict.fromkeys(e.key for e in results if e.key))
|
||||
|
||||
def delete(self, user_id: str, key: str) -> bool:
|
||||
# Honcho doesn't support deletion of individual entries
|
||||
# This is a limitation of the cloud backend
|
||||
logger.info("Honcho does not support individual entry deletion")
|
||||
return False
|
||||
|
||||
@property
|
||||
def backend_name(self) -> str:
|
||||
return "honcho (cloud)"
|
||||
|
||||
@property
|
||||
def is_cloud(self) -> bool:
|
||||
return True
|
||||
156
agent/memory/local_backend.py
Normal file
156
agent/memory/local_backend.py
Normal file
@@ -0,0 +1,156 @@
|
||||
"""Local SQLite memory backend.
|
||||
|
||||
Zero cloud dependency. Stores user preferences and patterns in a
|
||||
local SQLite database at ~/.hermes/memory.db.
|
||||
|
||||
Provides basic key-value storage with simple text search.
|
||||
No external dependencies beyond Python stdlib.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from agent.memory import MemoryBackend, MemoryEntry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LocalBackend(MemoryBackend):
|
||||
"""SQLite-backed local memory storage."""
|
||||
|
||||
def __init__(self, db_path: Path = None):
|
||||
self._db_path = db_path or (get_hermes_home() / "memory.db")
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize the database schema."""
|
||||
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS memories (
|
||||
user_id TEXT NOT NULL,
|
||||
key TEXT NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
metadata TEXT,
|
||||
created_at REAL NOT NULL,
|
||||
updated_at REAL NOT NULL,
|
||||
PRIMARY KEY (user_id, key)
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_memories_user
|
||||
ON memories(user_id)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
def is_available(self) -> bool:
|
||||
try:
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
conn.execute("SELECT 1")
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
|
||||
try:
|
||||
now = time.time()
|
||||
meta_json = json.dumps(metadata) if metadata else None
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
conn.execute("""
|
||||
INSERT INTO memories (user_id, key, value, metadata, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(user_id, key) DO UPDATE SET
|
||||
value = excluded.value,
|
||||
metadata = excluded.metadata,
|
||||
updated_at = excluded.updated_at
|
||||
""", (user_id, key, value, meta_json, now, now))
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("Failed to store memory: %s", e)
|
||||
return False
|
||||
|
||||
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
|
||||
try:
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
row = conn.execute(
|
||||
"SELECT key, value, user_id, created_at, updated_at, metadata "
|
||||
"FROM memories WHERE user_id = ? AND key = ?",
|
||||
(user_id, key),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return None
|
||||
return MemoryEntry(
|
||||
key=row[0],
|
||||
value=row[1],
|
||||
user_id=row[2],
|
||||
created_at=row[3],
|
||||
updated_at=row[4],
|
||||
metadata=json.loads(row[5]) if row[5] else {},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to retrieve memory: %s", e)
|
||||
return None
|
||||
|
||||
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
|
||||
"""Simple LIKE-based search on keys and values."""
|
||||
try:
|
||||
pattern = f"%{query_text}%"
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT key, value, user_id, created_at, updated_at, metadata
|
||||
FROM memories
|
||||
WHERE user_id = ? AND (key LIKE ? OR value LIKE ?)
|
||||
ORDER BY updated_at DESC
|
||||
LIMIT ?
|
||||
""", (user_id, pattern, pattern, limit)).fetchall()
|
||||
return [
|
||||
MemoryEntry(
|
||||
key=r[0],
|
||||
value=r[1],
|
||||
user_id=r[2],
|
||||
created_at=r[3],
|
||||
updated_at=r[4],
|
||||
metadata=json.loads(r[5]) if r[5] else {},
|
||||
)
|
||||
for r in rows
|
||||
]
|
||||
except Exception as e:
|
||||
logger.warning("Failed to query memories: %s", e)
|
||||
return []
|
||||
|
||||
def list_keys(self, user_id: str) -> List[str]:
|
||||
try:
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT key FROM memories WHERE user_id = ? ORDER BY updated_at DESC",
|
||||
(user_id,),
|
||||
).fetchall()
|
||||
return [r[0] for r in rows]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def delete(self, user_id: str, key: str) -> bool:
|
||||
try:
|
||||
with sqlite3.connect(str(self._db_path)) as conn:
|
||||
conn.execute(
|
||||
"DELETE FROM memories WHERE user_id = ? AND key = ?",
|
||||
(user_id, key),
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@property
|
||||
def backend_name(self) -> str:
|
||||
return "local (SQLite)"
|
||||
|
||||
@property
|
||||
def is_cloud(self) -> bool:
|
||||
return False
|
||||
@@ -1,206 +0,0 @@
|
||||
# Session Templates for Code-First Seeding
|
||||
|
||||
## Overview
|
||||
|
||||
Session templates pre-seed new sessions with successful tool call patterns from previous sessions. Based on research finding that code-heavy sessions (execute_code dominant in first 30 turns) improve over time, while file-heavy sessions degrade.
|
||||
|
||||
## Key Concepts
|
||||
|
||||
### Task Type Classification
|
||||
|
||||
Sessions are classified into four types based on tool call patterns:
|
||||
|
||||
- **CODE**: execute_code dominant (>60% of tool calls)
|
||||
- **FILE**: file operations dominant (read_file, write_file, patch, search_files)
|
||||
- **RESEARCH**: research tools dominant (web_search, web_fetch, browser_navigate)
|
||||
- **MIXED**: no dominant type (<60% for any category)
|
||||
|
||||
### Template Structure
|
||||
|
||||
Each template contains:
|
||||
- **Name**: Unique identifier
|
||||
- **Task Type**: CODE, FILE, RESEARCH, or MIXED
|
||||
- **Examples**: List of successful tool calls with arguments and results
|
||||
- **Description**: Human-readable description
|
||||
- **Tags**: Optional categorization tags
|
||||
- **Usage Count**: How many times the template has been used
|
||||
- **Source Session ID**: Session from which template was extracted
|
||||
|
||||
## Usage
|
||||
|
||||
### CLI Interface
|
||||
|
||||
```bash
|
||||
# List all templates
|
||||
python -m tools.session_templates list
|
||||
|
||||
# List only code templates
|
||||
python -m tools.session_templates list --type code
|
||||
|
||||
# List templates with specific tags
|
||||
python -m tools.session_templates list --tags "python,testing"
|
||||
|
||||
# Create template from session
|
||||
python -m tools.session_templates create 20260413_123456_abc123 --name my-code-template
|
||||
|
||||
# Create template with description and tags
|
||||
python -m tools.session_templates create 20260413_123456_abc123 \
|
||||
--name my-template \
|
||||
--type code \
|
||||
--description "Python development template" \
|
||||
--tags "python,development"
|
||||
|
||||
# Delete template
|
||||
python -m tools.session_templates delete my-template
|
||||
|
||||
# Show statistics
|
||||
python -m tools.session_templates stats
|
||||
```
|
||||
|
||||
### Programmatic Usage
|
||||
|
||||
```python
|
||||
from tools.session_templates import SessionTemplates, TaskType
|
||||
|
||||
# Create template manager
|
||||
templates = SessionTemplates()
|
||||
|
||||
# Get template for code tasks
|
||||
template = templates.get_template(TaskType.CODE)
|
||||
|
||||
# Inject template into messages
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": "Help me write some code"}
|
||||
]
|
||||
|
||||
updated_messages = templates.inject_into_messages(template, messages)
|
||||
|
||||
# Create template from session
|
||||
template = templates.create_template(
|
||||
session_id="20260413_123456_abc123",
|
||||
name="my-template",
|
||||
task_type=TaskType.CODE,
|
||||
max_examples=10,
|
||||
description="My template",
|
||||
tags=["python", "development"]
|
||||
)
|
||||
|
||||
# List templates
|
||||
code_templates = templates.list_templates(task_type=TaskType.CODE)
|
||||
all_templates = templates.list_templates()
|
||||
|
||||
# Get statistics
|
||||
stats = templates.get_template_stats()
|
||||
print(f"Total templates: {stats['total']}")
|
||||
print(f"Total examples: {stats['total_examples']}")
|
||||
```
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Template Extraction
|
||||
|
||||
1. Query SQLite database for session messages
|
||||
2. Extract tool calls from assistant messages
|
||||
3. Match tool calls with their results from tool responses
|
||||
4. Create ToolCallExample objects with arguments and results
|
||||
|
||||
### Template Injection
|
||||
|
||||
1. Create system message about template
|
||||
2. Add assistant messages with tool calls from template
|
||||
3. Add tool responses with results
|
||||
4. Insert after existing system messages
|
||||
5. Update template usage count
|
||||
|
||||
### Storage
|
||||
|
||||
Templates are stored as JSON files in `~/.hermes/session-templates/`:
|
||||
|
||||
```json
|
||||
{
|
||||
"name": "code_python_20260413",
|
||||
"task_type": "code",
|
||||
"examples": [
|
||||
{
|
||||
"tool_name": "execute_code",
|
||||
"arguments": {"code": "print('hello world')"},
|
||||
"result": "hello world",
|
||||
"success": true,
|
||||
"turn_number": 0
|
||||
}
|
||||
],
|
||||
"description": "Python development template",
|
||||
"created_at": 1712345678.0,
|
||||
"usage_count": 5,
|
||||
"source_session_id": "20260413_123456_abc123",
|
||||
"tags": ["python", "development"]
|
||||
}
|
||||
```
|
||||
|
||||
## Research Background
|
||||
|
||||
### Finding
|
||||
|
||||
Code-heavy sessions (execute_code dominant in first 30 turns) improve over time. File-heavy sessions (search/read/patch) degrade. The key is deterministic feedback loops, not arbitrary context.
|
||||
|
||||
### Hypothesis
|
||||
|
||||
Pre-seeding new sessions with successful tool call patterns establishes feedback loops early, leading to:
|
||||
- Lower error rate in first 30 turns
|
||||
- Faster time to first success
|
||||
- Fewer total errors
|
||||
- Better tool call diversity
|
||||
|
||||
### Experiment Design
|
||||
|
||||
A/B test: cold start vs code-seeded start on same task. Measure:
|
||||
- Error rate in first 30 turns
|
||||
- Time to first success
|
||||
- Total errors
|
||||
- Tool call diversity
|
||||
|
||||
## Best Practices
|
||||
|
||||
### Template Creation
|
||||
|
||||
1. **Extract from successful sessions**: Only use sessions with high success rates
|
||||
2. **Limit examples**: 5-10 examples per template is optimal
|
||||
3. **Use descriptive names**: Include task type and context in name
|
||||
4. **Add tags**: Use tags for categorization and filtering
|
||||
5. **Update regularly**: Create new templates from recent successful sessions
|
||||
|
||||
### Template Usage
|
||||
|
||||
1. **Match task type**: Use templates that match the expected task type
|
||||
2. **Don't over-inject**: One template per session is sufficient
|
||||
3. **Monitor effectiveness**: Track whether templates improve performance
|
||||
4. **Clean up old templates**: Remove templates that are no longer effective
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### No Templates Found
|
||||
|
||||
- Check if `~/.hermes/session-templates/` directory exists
|
||||
- Verify session database exists at `~/.hermes/state.db`
|
||||
- Check if session has successful tool calls
|
||||
|
||||
### Template Injection Not Working
|
||||
|
||||
- Verify template has examples
|
||||
- Check if messages list is not empty
|
||||
- Ensure template is properly loaded
|
||||
|
||||
### Extraction Fails
|
||||
|
||||
- Verify session ID exists in database
|
||||
- Check if session has tool calls
|
||||
- Ensure database is not corrupted
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
1. **Automatic template creation**: Create templates automatically from successful sessions
|
||||
2. **Template optimization**: Use ML to optimize template selection
|
||||
3. **Cross-session learning**: Share templates across users (with privacy controls)
|
||||
4. **Template versioning**: Track template effectiveness over time
|
||||
5. **Dynamic template adjustment**: Adjust templates based on task complexity
|
||||
28
run_agent.py
28
run_agent.py
@@ -1001,10 +1001,30 @@ class AIAgent:
|
||||
self._session_db = session_db
|
||||
self._parent_session_id = parent_session_id
|
||||
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
|
||||
# Lazy session creation: defer until first message flush (#314).
|
||||
# _flush_messages_to_session_db() calls ensure_session() which uses
|
||||
# INSERT OR IGNORE — creating the row only when messages arrive.
|
||||
# This eliminates 32% of sessions that are created but never used.
|
||||
if self._session_db:
|
||||
try:
|
||||
self._session_db.create_session(
|
||||
session_id=self.session_id,
|
||||
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
|
||||
model=self.model,
|
||||
model_config={
|
||||
"max_iterations": self.max_iterations,
|
||||
"reasoning_config": reasoning_config,
|
||||
"max_tokens": max_tokens,
|
||||
},
|
||||
user_id=None,
|
||||
parent_session_id=self._parent_session_id,
|
||||
)
|
||||
except Exception as e:
|
||||
# Transient SQLite lock contention (e.g. CLI and gateway writing
|
||||
# concurrently) must NOT permanently disable session_search for
|
||||
# this agent. Keep _session_db alive — subsequent message
|
||||
# flushes and session_search calls will still work once the
|
||||
# lock clears. The session row may be missing from the index
|
||||
# for this run, but that is recoverable (flushes upsert rows).
|
||||
logger.warning(
|
||||
"Session DB create_session failed (session_search still available): %s", e
|
||||
)
|
||||
|
||||
# In-memory todo list for task planning (one per agent/session)
|
||||
from tools.todo_tool import TodoStore
|
||||
|
||||
205
tests/agent/test_memory_backend.py
Normal file
205
tests/agent/test_memory_backend.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Tests for memory backend system (#322)."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.memory import (
|
||||
MemoryEntry,
|
||||
NullBackend,
|
||||
get_memory_backend,
|
||||
reset_backend,
|
||||
)
|
||||
from agent.memory.local_backend import LocalBackend
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def isolated_local_backend(tmp_path, monkeypatch):
|
||||
"""Create a LocalBackend with temp DB."""
|
||||
db_path = tmp_path / "test_memory.db"
|
||||
return LocalBackend(db_path=db_path)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def reset_memory():
|
||||
"""Reset the memory backend singleton."""
|
||||
reset_backend()
|
||||
yield
|
||||
reset_backend()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MemoryEntry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMemoryEntry:
|
||||
def test_creation(self):
|
||||
entry = MemoryEntry(key="pref", value="python", user_id="u1")
|
||||
assert entry.key == "pref"
|
||||
assert entry.value == "python"
|
||||
assert entry.created_at > 0
|
||||
|
||||
def test_defaults(self):
|
||||
entry = MemoryEntry(key="k", value="v", user_id="u1")
|
||||
assert entry.metadata == {}
|
||||
assert entry.updated_at == entry.created_at
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# NullBackend
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestNullBackend:
|
||||
def test_always_available(self):
|
||||
backend = NullBackend()
|
||||
assert backend.is_available() is True
|
||||
|
||||
def test_store_noop(self):
|
||||
backend = NullBackend()
|
||||
assert backend.store("u1", "k", "v") is True
|
||||
|
||||
def test_retrieve_returns_none(self):
|
||||
backend = NullBackend()
|
||||
assert backend.retrieve("u1", "k") is None
|
||||
|
||||
def test_query_returns_empty(self):
|
||||
backend = NullBackend()
|
||||
assert backend.query("u1", "test") == []
|
||||
|
||||
def test_not_cloud(self):
|
||||
backend = NullBackend()
|
||||
assert backend.is_cloud is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LocalBackend
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLocalBackend:
|
||||
def test_available(self, isolated_local_backend):
|
||||
assert isolated_local_backend.is_available() is True
|
||||
|
||||
def test_store_and_retrieve(self, isolated_local_backend):
|
||||
assert isolated_local_backend.store("u1", "lang", "python")
|
||||
entry = isolated_local_backend.retrieve("u1", "lang")
|
||||
assert entry is not None
|
||||
assert entry.value == "python"
|
||||
assert entry.key == "lang"
|
||||
|
||||
def test_store_with_metadata(self, isolated_local_backend):
|
||||
assert isolated_local_backend.store("u1", "k", "v", {"source": "test"})
|
||||
entry = isolated_local_backend.retrieve("u1", "k")
|
||||
assert entry.metadata == {"source": "test"}
|
||||
|
||||
def test_update_existing(self, isolated_local_backend):
|
||||
isolated_local_backend.store("u1", "k", "v1")
|
||||
isolated_local_backend.store("u1", "k", "v2")
|
||||
entry = isolated_local_backend.retrieve("u1", "k")
|
||||
assert entry.value == "v2"
|
||||
|
||||
def test_query(self, isolated_local_backend):
|
||||
isolated_local_backend.store("u1", "pref_python", "True")
|
||||
isolated_local_backend.store("u1", "pref_editor", "vim")
|
||||
isolated_local_backend.store("u1", "theme", "dark")
|
||||
|
||||
results = isolated_local_backend.query("u1", "pref")
|
||||
assert len(results) == 2
|
||||
keys = {r.key for r in results}
|
||||
assert "pref_python" in keys
|
||||
assert "pref_editor" in keys
|
||||
|
||||
def test_list_keys(self, isolated_local_backend):
|
||||
isolated_local_backend.store("u1", "a", "1")
|
||||
isolated_local_backend.store("u1", "b", "2")
|
||||
keys = isolated_local_backend.list_keys("u1")
|
||||
assert set(keys) == {"a", "b"}
|
||||
|
||||
def test_delete(self, isolated_local_backend):
|
||||
isolated_local_backend.store("u1", "k", "v")
|
||||
assert isolated_local_backend.delete("u1", "k")
|
||||
assert isolated_local_backend.retrieve("u1", "k") is None
|
||||
|
||||
def test_retrieve_nonexistent(self, isolated_local_backend):
|
||||
assert isolated_local_backend.retrieve("u1", "nope") is None
|
||||
|
||||
def test_not_cloud(self, isolated_local_backend):
|
||||
assert isolated_local_backend.is_cloud is False
|
||||
|
||||
def test_separate_users(self, isolated_local_backend):
|
||||
isolated_local_backend.store("u1", "k", "user1_value")
|
||||
isolated_local_backend.store("u2", "k", "user2_value")
|
||||
assert isolated_local_backend.retrieve("u1", "k").value == "user1_value"
|
||||
assert isolated_local_backend.retrieve("u2", "k").value == "user2_value"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSingleton:
|
||||
def test_default_is_null(self, reset_memory, monkeypatch):
|
||||
monkeypatch.delenv("HERMES_MEMORY_BACKEND", raising=False)
|
||||
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
|
||||
backend = get_memory_backend()
|
||||
assert isinstance(backend, NullBackend)
|
||||
|
||||
def test_local_when_configured(self, reset_memory, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
|
||||
backend = get_memory_backend()
|
||||
assert isinstance(backend, LocalBackend)
|
||||
|
||||
def test_caches_instance(self, reset_memory, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
|
||||
b1 = get_memory_backend()
|
||||
b2 = get_memory_backend()
|
||||
assert b1 is b2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HonchoBackend (mocked)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestHonchoBackend:
|
||||
def test_not_available_without_key(self, monkeypatch):
|
||||
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
|
||||
from agent.memory.honcho_backend import HonchoBackend
|
||||
backend = HonchoBackend()
|
||||
assert backend.is_available() is False
|
||||
|
||||
def test_is_cloud(self):
|
||||
from agent.memory.honcho_backend import HonchoBackend
|
||||
backend = HonchoBackend()
|
||||
assert backend.is_cloud is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Evaluation framework
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEvaluation:
|
||||
def test_evaluate_null_backend(self):
|
||||
from agent.memory.evaluation import evaluate_backend
|
||||
result = evaluate_backend(NullBackend())
|
||||
assert result.backend_name == "null (disabled)"
|
||||
assert result.available is True
|
||||
assert result.score > 0
|
||||
assert result.is_cloud is False
|
||||
|
||||
def test_evaluate_local_backend(self, isolated_local_backend):
|
||||
from agent.memory.evaluation import evaluate_backend
|
||||
result = evaluate_backend(isolated_local_backend)
|
||||
assert result.backend_name == "local (SQLite)"
|
||||
assert result.available is True
|
||||
assert result.store_success is True
|
||||
assert result.retrieve_success is True
|
||||
assert result.score >= 80 # local should score well
|
||||
|
||||
def test_evaluate_backends_returns_report(self, reset_memory, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
|
||||
from agent.memory.evaluation import evaluate_backends
|
||||
report = evaluate_backends()
|
||||
assert "backends_evaluated" in report
|
||||
assert report["backends_evaluated"] >= 2 # null + local
|
||||
assert "recommendation" in report
|
||||
@@ -1,343 +0,0 @@
|
||||
"""
|
||||
Tests for session templates.
|
||||
"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
|
||||
from tools.session_templates import (
|
||||
SessionTemplates,
|
||||
SessionTemplate,
|
||||
ToolCallExample,
|
||||
TaskType
|
||||
)
|
||||
|
||||
|
||||
class TestTaskTypeClassification:
|
||||
"""Test task type classification."""
|
||||
|
||||
def test_code_heavy(self):
|
||||
"""Test classification of code-heavy sessions."""
|
||||
templates = SessionTemplates()
|
||||
tool_calls = [
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "read_file"},
|
||||
]
|
||||
|
||||
result = templates.classify_task_type(tool_calls)
|
||||
assert result == TaskType.CODE
|
||||
|
||||
def test_file_heavy(self):
|
||||
"""Test classification of file-heavy sessions."""
|
||||
templates = SessionTemplates()
|
||||
tool_calls = [
|
||||
{"tool_name": "read_file"},
|
||||
{"tool_name": "write_file"},
|
||||
{"tool_name": "patch"},
|
||||
{"tool_name": "search_files"},
|
||||
]
|
||||
|
||||
result = templates.classify_task_type(tool_calls)
|
||||
assert result == TaskType.FILE
|
||||
|
||||
def test_research_heavy(self):
|
||||
"""Test classification of research-heavy sessions."""
|
||||
templates = SessionTemplates()
|
||||
tool_calls = [
|
||||
{"tool_name": "web_search"},
|
||||
{"tool_name": "web_fetch"},
|
||||
{"tool_name": "browser_navigate"},
|
||||
]
|
||||
|
||||
result = templates.classify_task_type(tool_calls)
|
||||
assert result == TaskType.RESEARCH
|
||||
|
||||
def test_mixed(self):
|
||||
"""Test classification of mixed sessions."""
|
||||
templates = SessionTemplates()
|
||||
tool_calls = [
|
||||
{"tool_name": "execute_code"},
|
||||
{"tool_name": "read_file"},
|
||||
{"tool_name": "web_search"},
|
||||
]
|
||||
|
||||
result = templates.classify_task_type(tool_calls)
|
||||
assert result == TaskType.MIXED
|
||||
|
||||
def test_empty(self):
|
||||
"""Test classification of empty sessions."""
|
||||
templates = SessionTemplates()
|
||||
result = templates.classify_task_type([])
|
||||
assert result == TaskType.MIXED
|
||||
|
||||
|
||||
class TestToolCallExample:
|
||||
"""Test ToolCallExample dataclass."""
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Test conversion to dictionary."""
|
||||
example = ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True,
|
||||
turn_number=0
|
||||
)
|
||||
|
||||
data = example.to_dict()
|
||||
assert data["tool_name"] == "execute_code"
|
||||
assert data["arguments"] == {"code": "print('hello')"}
|
||||
assert data["result"] == "hello"
|
||||
assert data["success"] is True
|
||||
|
||||
def test_from_dict(self):
|
||||
"""Test creation from dictionary."""
|
||||
data = {
|
||||
"tool_name": "execute_code",
|
||||
"arguments": {"code": "print('hello')"},
|
||||
"result": "hello",
|
||||
"success": True,
|
||||
"turn_number": 0
|
||||
}
|
||||
|
||||
example = ToolCallExample.from_dict(data)
|
||||
assert example.tool_name == "execute_code"
|
||||
assert example.arguments == {"code": "print('hello')"}
|
||||
assert example.result == "hello"
|
||||
|
||||
|
||||
class TestSessionTemplate:
|
||||
"""Test SessionTemplate dataclass."""
|
||||
|
||||
def test_to_dict(self):
|
||||
"""Test conversion to dictionary."""
|
||||
examples = [
|
||||
ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
name="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=examples,
|
||||
description="Test template"
|
||||
)
|
||||
|
||||
data = template.to_dict()
|
||||
assert data["name"] == "test_template"
|
||||
assert data["task_type"] == "code"
|
||||
assert len(data["examples"]) == 1
|
||||
|
||||
def test_from_dict(self):
|
||||
"""Test creation from dictionary."""
|
||||
data = {
|
||||
"name": "test_template",
|
||||
"task_type": "code",
|
||||
"examples": [
|
||||
{
|
||||
"tool_name": "execute_code",
|
||||
"arguments": {"code": "print('hello')"},
|
||||
"result": "hello",
|
||||
"success": True,
|
||||
"turn_number": 0
|
||||
}
|
||||
],
|
||||
"description": "Test template",
|
||||
"created_at": 1234567890.0,
|
||||
"usage_count": 0,
|
||||
"source_session_id": None,
|
||||
"tags": []
|
||||
}
|
||||
|
||||
template = SessionTemplate.from_dict(data)
|
||||
assert template.name == "test_template"
|
||||
assert template.task_type == TaskType.CODE
|
||||
assert len(template.examples) == 1
|
||||
|
||||
|
||||
class TestSessionTemplates:
|
||||
"""Test SessionTemplates manager."""
|
||||
|
||||
def test_create_and_list(self):
|
||||
"""Test creating and listing templates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplates(template_dir=template_dir)
|
||||
|
||||
# Create a mock template
|
||||
examples = [
|
||||
ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
name="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=examples
|
||||
)
|
||||
|
||||
manager.templates["test_template"] = template
|
||||
manager._save_template(template)
|
||||
|
||||
# List templates
|
||||
templates = manager.list_templates()
|
||||
assert len(templates) == 1
|
||||
assert templates[0].name == "test_template"
|
||||
|
||||
def test_get_template(self):
|
||||
"""Test getting template by task type."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplates(template_dir=template_dir)
|
||||
|
||||
# Create templates
|
||||
code_template = SessionTemplate(
|
||||
name="code_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[]
|
||||
)
|
||||
|
||||
file_template = SessionTemplate(
|
||||
name="file_template",
|
||||
task_type=TaskType.FILE,
|
||||
examples=[]
|
||||
)
|
||||
|
||||
manager.templates["code_template"] = code_template
|
||||
manager.templates["file_template"] = file_template
|
||||
|
||||
# Get code template
|
||||
result = manager.get_template(TaskType.CODE)
|
||||
assert result is not None
|
||||
assert result.name == "code_template"
|
||||
|
||||
# Get file template
|
||||
result = manager.get_template(TaskType.FILE)
|
||||
assert result is not None
|
||||
assert result.name == "file_template"
|
||||
|
||||
# Get non-existent template
|
||||
result = manager.get_template(TaskType.RESEARCH)
|
||||
assert result is None
|
||||
|
||||
def test_inject_into_messages(self):
|
||||
"""Test injecting template into messages."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplates(template_dir=template_dir)
|
||||
|
||||
# Create template
|
||||
examples = [
|
||||
ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
arguments={"code": "print('hello')"},
|
||||
result="hello",
|
||||
success=True
|
||||
)
|
||||
]
|
||||
|
||||
template = SessionTemplate(
|
||||
name="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=examples
|
||||
)
|
||||
|
||||
manager.templates["test_template"] = template
|
||||
|
||||
# Test injection
|
||||
messages = [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": "Hello"}
|
||||
]
|
||||
|
||||
result = manager.inject_into_messages(template, messages)
|
||||
|
||||
# Should have added template messages
|
||||
assert len(result) > len(messages)
|
||||
assert any("Session template loaded" in str(msg.get("content", ""))
|
||||
for msg in result)
|
||||
|
||||
# Usage count should be updated
|
||||
assert template.usage_count == 1
|
||||
|
||||
def test_delete_template(self):
|
||||
"""Test deleting templates."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplates(template_dir=template_dir)
|
||||
|
||||
# Create template
|
||||
template = SessionTemplate(
|
||||
name="test_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[]
|
||||
)
|
||||
|
||||
manager.templates["test_template"] = template
|
||||
manager._save_template(template)
|
||||
|
||||
# Verify it exists
|
||||
assert "test_template" in manager.templates
|
||||
assert (template_dir / "test_template.json").exists()
|
||||
|
||||
# Delete it
|
||||
result = manager.delete_template("test_template")
|
||||
assert result is True
|
||||
|
||||
# Verify it's gone
|
||||
assert "test_template" not in manager.templates
|
||||
assert not (template_dir / "test_template.json").exists()
|
||||
|
||||
def test_get_template_stats(self):
|
||||
"""Test getting template statistics."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
template_dir = Path(tmpdir)
|
||||
manager = SessionTemplates(template_dir=template_dir)
|
||||
|
||||
# Create templates
|
||||
code_template = SessionTemplate(
|
||||
name="code_template",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample("execute_code", {}, "", True),
|
||||
ToolCallExample("execute_code", {}, "", True)
|
||||
],
|
||||
usage_count=5
|
||||
)
|
||||
|
||||
file_template = SessionTemplate(
|
||||
name="file_template",
|
||||
task_type=TaskType.FILE,
|
||||
examples=[
|
||||
ToolCallExample("read_file", {}, "", True)
|
||||
],
|
||||
usage_count=3
|
||||
)
|
||||
|
||||
manager.templates["code_template"] = code_template
|
||||
manager.templates["file_template"] = file_template
|
||||
|
||||
stats = manager.get_template_stats()
|
||||
|
||||
assert stats["total"] == 2
|
||||
assert stats["total_examples"] == 3
|
||||
assert stats["total_usage"] == 8
|
||||
assert stats["by_type"]["code"] == 1
|
||||
assert stats["by_type"]["file"] == 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
165
tools/memory_backend_tool.py
Normal file
165
tools/memory_backend_tool.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Memory Backend Tool — manage cross-session memory backends.
|
||||
|
||||
Provides store/retrieve/query/evaluate/list actions for the
|
||||
pluggable memory backend system.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from tools.registry import registry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def memory_backend(
|
||||
action: str,
|
||||
user_id: str = "default",
|
||||
key: str = None,
|
||||
value: str = None,
|
||||
query_text: str = None,
|
||||
metadata: dict = None,
|
||||
) -> str:
|
||||
"""Manage cross-session memory backends.
|
||||
|
||||
Actions:
|
||||
store — store a user preference/pattern
|
||||
retrieve — retrieve a specific memory by key
|
||||
query — search memories by text
|
||||
list — list all keys for a user
|
||||
delete — delete a memory entry
|
||||
info — show current backend info
|
||||
evaluate — run evaluation framework comparing backends
|
||||
"""
|
||||
from agent.memory import get_memory_backend
|
||||
|
||||
backend = get_memory_backend()
|
||||
|
||||
if action == "info":
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"backend": backend.backend_name,
|
||||
"is_cloud": backend.is_cloud,
|
||||
"available": backend.is_available(),
|
||||
})
|
||||
|
||||
if action == "store":
|
||||
if not key or value is None:
|
||||
return json.dumps({"success": False, "error": "key and value are required for 'store'."})
|
||||
success = backend.store(user_id, key, value, metadata)
|
||||
return json.dumps({"success": success, "key": key})
|
||||
|
||||
if action == "retrieve":
|
||||
if not key:
|
||||
return json.dumps({"success": False, "error": "key is required for 'retrieve'."})
|
||||
entry = backend.retrieve(user_id, key)
|
||||
if entry is None:
|
||||
return json.dumps({"success": False, "error": f"No memory found for key '{key}'."})
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"key": entry.key,
|
||||
"value": entry.value,
|
||||
"metadata": entry.metadata,
|
||||
"updated_at": entry.updated_at,
|
||||
})
|
||||
|
||||
if action == "query":
|
||||
if not query_text:
|
||||
return json.dumps({"success": False, "error": "query_text is required for 'query'."})
|
||||
results = backend.query(user_id, query_text)
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"results": [
|
||||
{"key": e.key, "value": e.value, "metadata": e.metadata}
|
||||
for e in results
|
||||
],
|
||||
"count": len(results),
|
||||
})
|
||||
|
||||
if action == "list":
|
||||
keys = backend.list_keys(user_id)
|
||||
return json.dumps({"success": True, "keys": keys, "count": len(keys)})
|
||||
|
||||
if action == "delete":
|
||||
if not key:
|
||||
return json.dumps({"success": False, "error": "key is required for 'delete'."})
|
||||
success = backend.delete(user_id, key)
|
||||
return json.dumps({"success": success})
|
||||
|
||||
if action == "evaluate":
|
||||
from agent.memory.evaluation import evaluate_backends
|
||||
report = evaluate_backends()
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
**report,
|
||||
})
|
||||
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
"error": f"Unknown action '{action}'. Use: store, retrieve, query, list, delete, info, evaluate",
|
||||
})
|
||||
|
||||
|
||||
MEMORY_BACKEND_SCHEMA = {
|
||||
"name": "memory_backend",
|
||||
"description": (
|
||||
"Manage cross-session memory backends for user preference persistence. "
|
||||
"Pluggable architecture supports local SQLite (default, zero cloud dependency) "
|
||||
"and optional Honcho cloud backend (requires HONCHO_API_KEY).\n\n"
|
||||
"Actions:\n"
|
||||
" store — store a user preference/pattern\n"
|
||||
" retrieve — retrieve a specific memory by key\n"
|
||||
" query — search memories by text\n"
|
||||
" list — list all keys for a user\n"
|
||||
" delete — delete a memory entry\n"
|
||||
" info — show current backend info\n"
|
||||
" evaluate — run evaluation framework comparing backends"
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["store", "retrieve", "query", "list", "delete", "info", "evaluate"],
|
||||
"description": "The action to perform.",
|
||||
},
|
||||
"user_id": {
|
||||
"type": "string",
|
||||
"description": "User identifier for memory operations (default: 'default').",
|
||||
},
|
||||
"key": {
|
||||
"type": "string",
|
||||
"description": "Memory key for store/retrieve/delete.",
|
||||
},
|
||||
"value": {
|
||||
"type": "string",
|
||||
"description": "Value to store.",
|
||||
},
|
||||
"query_text": {
|
||||
"type": "string",
|
||||
"description": "Search text for query action.",
|
||||
},
|
||||
"metadata": {
|
||||
"type": "object",
|
||||
"description": "Optional metadata dict for store.",
|
||||
},
|
||||
},
|
||||
"required": ["action"],
|
||||
},
|
||||
}
|
||||
|
||||
registry.register(
|
||||
name="memory_backend",
|
||||
toolset="skills",
|
||||
schema=MEMORY_BACKEND_SCHEMA,
|
||||
handler=lambda args, **kw: memory_backend(
|
||||
action=args.get("action", ""),
|
||||
user_id=args.get("user_id", "default"),
|
||||
key=args.get("key"),
|
||||
value=args.get("value"),
|
||||
query_text=args.get("query_text"),
|
||||
metadata=args.get("metadata"),
|
||||
),
|
||||
emoji="🧠",
|
||||
)
|
||||
@@ -1,471 +0,0 @@
|
||||
"""
|
||||
Session templates for code-first seeding.
|
||||
|
||||
Research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
|
||||
improve over time. File-heavy sessions degrade. Key is deterministic feedback loops.
|
||||
|
||||
This module provides:
|
||||
1. Template extraction from successful sessions
|
||||
2. Task type classification (code, file, research, mixed)
|
||||
3. Template storage in ~/.hermes/session-templates/
|
||||
4. Template injection into new sessions
|
||||
5. CLI interface for template management
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default template directory
|
||||
DEFAULT_TEMPLATE_DIR = Path.home() / ".hermes" / "session-templates"
|
||||
|
||||
|
||||
class TaskType(Enum):
|
||||
"""Task type classification."""
|
||||
CODE = "code"
|
||||
FILE = "file"
|
||||
RESEARCH = "research"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallExample:
|
||||
"""A single tool call example."""
|
||||
tool_name: str
|
||||
arguments: Dict[str, Any]
|
||||
result: str
|
||||
success: bool
|
||||
turn_number: int = 0
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'ToolCallExample':
|
||||
return cls(**data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionTemplate:
|
||||
"""A session template with tool call examples."""
|
||||
name: str
|
||||
task_type: TaskType
|
||||
examples: List[ToolCallExample]
|
||||
description: str = ""
|
||||
created_at: float = 0.0
|
||||
usage_count: int = 0
|
||||
source_session_id: Optional[str] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
|
||||
def __post_init__(self):
|
||||
if self.created_at == 0.0:
|
||||
self.created_at = time.time()
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
data = asdict(self)
|
||||
data['task_type'] = self.task_type.value
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> 'SessionTemplate':
|
||||
data['task_type'] = TaskType(data['task_type'])
|
||||
examples_data = data.get('examples', [])
|
||||
data['examples'] = [ToolCallExample.from_dict(e) for e in examples_data]
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class SessionTemplates:
|
||||
"""Manages session templates for code-first seeding."""
|
||||
|
||||
def __init__(self, template_dir: Optional[Path] = None):
|
||||
self.template_dir = template_dir or DEFAULT_TEMPLATE_DIR
|
||||
self.template_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.templates: Dict[str, SessionTemplate] = {}
|
||||
self._load_templates()
|
||||
|
||||
def _load_templates(self):
|
||||
"""Load all templates from disk."""
|
||||
for template_file in self.template_dir.glob("*.json"):
|
||||
try:
|
||||
with open(template_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
template = SessionTemplate.from_dict(data)
|
||||
self.templates[template.name] = template
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load template {template_file}: {e}")
|
||||
|
||||
def _save_template(self, template: SessionTemplate):
|
||||
"""Save a template to disk."""
|
||||
template_file = self.template_dir / f"{template.name}.json"
|
||||
with open(template_file, 'w') as f:
|
||||
json.dump(template.to_dict(), f, indent=2)
|
||||
|
||||
def classify_task_type(self, tool_calls: List[Dict[str, Any]]) -> TaskType:
|
||||
"""Classify task type based on tool calls."""
|
||||
if not tool_calls:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Count tool types
|
||||
code_tools = {'execute_code', 'code_execution'}
|
||||
file_tools = {'read_file', 'write_file', 'patch', 'search_files'}
|
||||
research_tools = {'web_search', 'web_fetch', 'browser_navigate'}
|
||||
|
||||
tool_names = [tc.get('tool_name', '') for tc in tool_calls]
|
||||
|
||||
code_count = sum(1 for t in tool_names if t in code_tools)
|
||||
file_count = sum(1 for t in tool_names if t in file_tools)
|
||||
research_count = sum(1 for t in tool_names if t in research_tools)
|
||||
|
||||
total = len(tool_calls)
|
||||
if total == 0:
|
||||
return TaskType.MIXED
|
||||
|
||||
# Determine dominant type (60% threshold)
|
||||
code_ratio = code_count / total
|
||||
file_ratio = file_count / total
|
||||
research_ratio = research_count / total
|
||||
|
||||
if code_ratio > 0.6:
|
||||
return TaskType.CODE
|
||||
elif file_ratio > 0.6:
|
||||
return TaskType.FILE
|
||||
elif research_ratio > 0.6:
|
||||
return TaskType.RESEARCH
|
||||
else:
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract_from_session(self, session_id: str, max_examples: int = 10) -> List[ToolCallExample]:
|
||||
"""Extract successful tool calls from a session."""
|
||||
db_path = Path.home() / ".hermes" / "state.db"
|
||||
if not db_path.exists():
|
||||
logger.warning(f"Session database not found: {db_path}")
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Get messages with tool calls
|
||||
cursor = conn.execute("""
|
||||
SELECT role, content, tool_calls, tool_name, timestamp
|
||||
FROM messages
|
||||
WHERE session_id = ?
|
||||
ORDER BY timestamp
|
||||
LIMIT 100
|
||||
""", (session_id,))
|
||||
|
||||
messages = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
examples = []
|
||||
turn_number = 0
|
||||
|
||||
for msg in messages:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
if msg['role'] == 'assistant' and msg['tool_calls']:
|
||||
try:
|
||||
tool_calls = json.loads(msg['tool_calls'])
|
||||
for tc in tool_calls:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
tool_name = tc.get('function', {}).get('name')
|
||||
if not tool_name:
|
||||
continue
|
||||
|
||||
try:
|
||||
arguments = json.loads(tc.get('function', {}).get('arguments', '{}'))
|
||||
except:
|
||||
arguments = {}
|
||||
|
||||
examples.append(ToolCallExample(
|
||||
tool_name=tool_name,
|
||||
arguments=arguments,
|
||||
result="", # Will be filled from tool response
|
||||
success=True,
|
||||
turn_number=turn_number
|
||||
))
|
||||
turn_number += 1
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
elif msg['role'] == 'tool' and examples and examples[-1].result == "":
|
||||
examples[-1].result = msg['content'] or ""
|
||||
|
||||
return examples
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to extract from session {session_id}: {e}")
|
||||
return []
|
||||
|
||||
def create_template(self, session_id: str, name: Optional[str] = None,
|
||||
task_type: Optional[TaskType] = None,
|
||||
max_examples: int = 10,
|
||||
description: str = "",
|
||||
tags: Optional[List[str]] = None) -> Optional[SessionTemplate]:
|
||||
"""Create a template from a session."""
|
||||
examples = self.extract_from_session(session_id, max_examples)
|
||||
if not examples:
|
||||
logger.warning(f"No successful tool calls found in session {session_id}")
|
||||
return None
|
||||
|
||||
# Classify task type if not provided
|
||||
if task_type is None:
|
||||
tool_calls = [{'tool_name': e.tool_name} for e in examples]
|
||||
task_type = self.classify_task_type(tool_calls)
|
||||
|
||||
# Generate name if not provided
|
||||
if name is None:
|
||||
name = f"{task_type.value}_{session_id[:8]}_{int(time.time())}"
|
||||
|
||||
# Create template
|
||||
template = SessionTemplate(
|
||||
name=name,
|
||||
task_type=task_type,
|
||||
examples=examples,
|
||||
description=description or f"Template with {len(examples)} examples",
|
||||
source_session_id=session_id,
|
||||
tags=tags or []
|
||||
)
|
||||
|
||||
# Save template
|
||||
self.templates[name] = template
|
||||
self._save_template(template)
|
||||
|
||||
logger.info(f"Created template {name} with {len(examples)} examples")
|
||||
return template
|
||||
|
||||
def get_template(self, task_type: TaskType, tags: Optional[List[str]] = None) -> Optional[SessionTemplate]:
|
||||
"""Get the best template for a task type and optional tags."""
|
||||
matching = [t for t in self.templates.values() if t.task_type == task_type]
|
||||
|
||||
# Filter by tags if provided
|
||||
if tags:
|
||||
matching = [t for t in matching if any(tag in t.tags for tag in tags)]
|
||||
|
||||
if not matching:
|
||||
return None
|
||||
|
||||
# Sort by usage count (prefer less used templates)
|
||||
matching.sort(key=lambda t: t.usage_count)
|
||||
return matching[0]
|
||||
|
||||
def inject_into_messages(self, template: SessionTemplate,
|
||||
messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Inject template examples into messages."""
|
||||
if not template.examples:
|
||||
return messages
|
||||
|
||||
# Create injection messages
|
||||
injection = []
|
||||
|
||||
# Add system message about template
|
||||
injection.append({
|
||||
"role": "system",
|
||||
"content": f"Session template loaded: {template.name} ({template.task_type.value})\n"
|
||||
f"Description: {template.description}\n"
|
||||
f"This template contains {len(template.examples)} successful tool calls "
|
||||
f"to establish a feedback loop early."
|
||||
})
|
||||
|
||||
# Add tool call examples
|
||||
for i, example in enumerate(template.examples):
|
||||
# Assistant message with tool call
|
||||
injection.append({
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [{
|
||||
"id": f"template_{template.name}_{i}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": example.tool_name,
|
||||
"arguments": json.dumps(example.arguments)
|
||||
}
|
||||
}]
|
||||
})
|
||||
|
||||
# Tool response
|
||||
injection.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"template_{template.name}_{i}",
|
||||
"content": example.result
|
||||
})
|
||||
|
||||
# Insert after system messages
|
||||
insert_index = 0
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") != "system":
|
||||
break
|
||||
insert_index = i + 1
|
||||
|
||||
# Insert injection
|
||||
for i, msg in enumerate(injection):
|
||||
messages.insert(insert_index + i, msg)
|
||||
|
||||
# Update usage count
|
||||
template.usage_count += 1
|
||||
self._save_template(template)
|
||||
|
||||
return messages
|
||||
|
||||
def list_templates(self, task_type: Optional[TaskType] = None,
|
||||
tags: Optional[List[str]] = None) -> List[SessionTemplate]:
|
||||
"""List templates, optionally filtered by task type and tags."""
|
||||
templates = list(self.templates.values())
|
||||
|
||||
if task_type:
|
||||
templates = [t for t in templates if t.task_type == task_type]
|
||||
|
||||
if tags:
|
||||
templates = [t for t in templates if any(tag in t.tags for tag in tags)]
|
||||
|
||||
templates.sort(key=lambda t: t.created_at, reverse=True)
|
||||
return templates
|
||||
|
||||
def delete_template(self, name: str) -> bool:
|
||||
"""Delete a template."""
|
||||
if name not in self.templates:
|
||||
return False
|
||||
|
||||
del self.templates[name]
|
||||
template_file = self.template_dir / f"{name}.json"
|
||||
if template_file.exists():
|
||||
template_file.unlink()
|
||||
|
||||
logger.info(f"Deleted template {name}")
|
||||
return True
|
||||
|
||||
def get_template_stats(self) -> Dict[str, Any]:
|
||||
"""Get statistics about templates."""
|
||||
if not self.templates:
|
||||
return {
|
||||
"total": 0,
|
||||
"by_type": {},
|
||||
"total_examples": 0,
|
||||
"total_usage": 0
|
||||
}
|
||||
|
||||
by_type = {}
|
||||
total_examples = 0
|
||||
total_usage = 0
|
||||
|
||||
for template in self.templates.values():
|
||||
task_type = template.task_type.value
|
||||
by_type[task_type] = by_type.get(task_type, 0) + 1
|
||||
total_examples += len(template.examples)
|
||||
total_usage += template.usage_count
|
||||
|
||||
return {
|
||||
"total": len(self.templates),
|
||||
"by_type": by_type,
|
||||
"total_examples": total_examples,
|
||||
"total_usage": total_usage
|
||||
}
|
||||
|
||||
|
||||
# CLI interface
|
||||
def main():
|
||||
"""CLI for session templates."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session Templates")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# List templates
|
||||
list_parser = subparsers.add_parser("list", help="List templates")
|
||||
list_parser.add_argument("--type", choices=["code", "file", "research", "mixed"],
|
||||
help="Filter by task type")
|
||||
list_parser.add_argument("--tags", help="Filter by tags (comma-separated)")
|
||||
|
||||
# Create template
|
||||
create_parser = subparsers.add_parser("create", help="Create template from session")
|
||||
create_parser.add_argument("session_id", help="Session ID")
|
||||
create_parser.add_argument("--name", help="Template name")
|
||||
create_parser.add_argument("--type", choices=["code", "file", "research", "mixed"],
|
||||
help="Task type")
|
||||
create_parser.add_argument("--max-examples", type=int, default=10,
|
||||
help="Maximum examples to extract")
|
||||
create_parser.add_argument("--description", help="Template description")
|
||||
create_parser.add_argument("--tags", help="Tags (comma-separated)")
|
||||
|
||||
# Delete template
|
||||
delete_parser = subparsers.add_parser("delete", help="Delete template")
|
||||
delete_parser.add_argument("name", help="Template name")
|
||||
|
||||
# Show stats
|
||||
stats_parser = subparsers.add_parser("stats", help="Show template statistics")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
templates = SessionTemplates()
|
||||
|
||||
if args.command == "list":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
tags = args.tags.split(",") if args.tags else None
|
||||
template_list = templates.list_templates(task_type, tags)
|
||||
|
||||
if not template_list:
|
||||
print("No templates found")
|
||||
return
|
||||
|
||||
print(f"Found {len(template_list)} templates:")
|
||||
for t in template_list:
|
||||
tags_str = f" [tags: {', '.join(t.tags)}]" if t.tags else ""
|
||||
print(f" {t.name}: {t.task_type.value} ({len(t.examples)} examples, "
|
||||
f"used {t.usage_count} times){tags_str}")
|
||||
|
||||
elif args.command == "create":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
tags = args.tags.split(",") if args.tags else None
|
||||
|
||||
template = templates.create_template(
|
||||
args.session_id,
|
||||
name=args.name,
|
||||
task_type=task_type,
|
||||
max_examples=args.max_examples,
|
||||
description=args.description or "",
|
||||
tags=tags
|
||||
)
|
||||
|
||||
if template:
|
||||
print(f"Created template: {template.name}")
|
||||
print(f" Type: {template.task_type.value}")
|
||||
print(f" Examples: {len(template.examples)}")
|
||||
if template.tags:
|
||||
print(f" Tags: {', '.join(template.tags)}")
|
||||
else:
|
||||
print("Failed to create template")
|
||||
|
||||
elif args.command == "delete":
|
||||
if templates.delete_template(args.name):
|
||||
print(f"Deleted template: {args.name}")
|
||||
else:
|
||||
print(f"Template not found: {args.name}")
|
||||
|
||||
elif args.command == "stats":
|
||||
stats = templates.get_template_stats()
|
||||
print("Template Statistics:")
|
||||
print(f" Total templates: {stats['total']}")
|
||||
print(f" Total examples: {stats['total_examples']}")
|
||||
print(f" Total usage: {stats['total_usage']}")
|
||||
if stats['by_type']:
|
||||
print(" By type:")
|
||||
for task_type, count in stats['by_type'].items():
|
||||
print(f" {task_type}: {count}")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user