Compare commits
1 Commits
burn/293-1
...
feat/505-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ddec887b75 |
@@ -1,226 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pre-commit hook for detecting hardcoded ~/.hermes paths.
|
||||
|
||||
This is a poka-yoke (error-proofing) measure to prevent profile isolation
|
||||
failures. All code should use get_hermes_home() from hermes_constants instead
|
||||
of hardcoding ~/.hermes or Path.home() / ".hermes".
|
||||
|
||||
Installation:
|
||||
git config core.hooksPath .githooks
|
||||
|
||||
To bypass:
|
||||
git commit --no-verify
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Iterable, List
|
||||
|
||||
# ANSI color codes
|
||||
RED = "\033[0;31m"
|
||||
YELLOW = "\033[1;33m"
|
||||
GREEN = "\033[0;32m"
|
||||
NC = "\033[0m"
|
||||
|
||||
|
||||
class Finding:
|
||||
"""Represents a single hardcoded path finding."""
|
||||
|
||||
def __init__(self, filename: str, line: int, message: str, suggestion: str = "") -> None:
|
||||
self.filename = filename
|
||||
self.line = line
|
||||
self.message = message
|
||||
self.suggestion = suggestion
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Finding({self.filename!r}, {self.line}, {self.message!r})"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Regex patterns for hardcoded paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Pattern 1: Path.home() / ".hermes" or Path.home() / '.hermes'
|
||||
_RE_PATH_HOME_HERMES = re.compile(
|
||||
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]"""
|
||||
)
|
||||
|
||||
# Pattern 2: Path.home() / ".hermes" / something
|
||||
_RE_PATH_HOME_HERMES_SUB = re.compile(
|
||||
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]\s*/"""
|
||||
)
|
||||
|
||||
# Pattern 3: ~/.hermes in strings (but not in comments or docs)
|
||||
_RE_TILDE_HERMES = re.compile(
|
||||
r"""['"]~/?\.hermes(/|['"])"""
|
||||
)
|
||||
|
||||
# Pattern 4: os.path.expanduser("~/.hermes")
|
||||
_RE_EXPANDUSER_HERMES = re.compile(
|
||||
r"""os\.path\.expanduser\(\s*['"]~/?\.hermes"""
|
||||
)
|
||||
|
||||
# Pattern 5: os.path.join(os.path.expanduser("~"), ".hermes")
|
||||
_RE_JOIN_EXPANDUSER = re.compile(
|
||||
r"""os\.path\.join\(\s*os\.path\.expanduser\(\s*['"]~['"]\s*\)\s*,\s*['"]\.hermes['"]"""
|
||||
)
|
||||
|
||||
# All patterns combined
|
||||
_ALL_PATTERNS = [
|
||||
(_RE_PATH_HOME_HERMES, "Path.home() / '.hermes' — use get_hermes_home() instead"),
|
||||
(_RE_PATH_HOME_HERMES_SUB, "Path.home() / '.hermes' / ... — use get_hermes_home() / '...' instead"),
|
||||
(_RE_TILDE_HERMES, "'~/.hermes' — use get_hermes_home() for paths, display_hermes_home() for display"),
|
||||
(_RE_EXPANDUSER_HERMES, "os.path.expanduser('~/.hermes') — use get_hermes_home() instead"),
|
||||
(_RE_JOIN_EXPANDUSER, "os.path.join(expanduser('~'), '.hermes') — use get_hermes_home() instead"),
|
||||
]
|
||||
|
||||
# Safe contexts (don't flag these)
|
||||
_SAFE_CONTEXTS = [
|
||||
# hermes_constants.py is allowed (it's the source of truth)
|
||||
"hermes_constants.py",
|
||||
# Test files can mock/test the behavior
|
||||
"test_",
|
||||
"_test.py",
|
||||
"/tests/",
|
||||
# Documentation files
|
||||
".md",
|
||||
"README",
|
||||
"CHANGELOG",
|
||||
"AGENTS.md",
|
||||
# Example/template files
|
||||
".example",
|
||||
"template",
|
||||
]
|
||||
|
||||
|
||||
def _is_safe_context(filename: str) -> bool:
|
||||
"""Check if the file is in a safe context where hardcoded paths are OK."""
|
||||
for safe in _SAFE_CONTEXTS:
|
||||
if safe in filename:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_comment_or_doc(line: str) -> bool:
|
||||
"""Check if the line is a comment or documentation."""
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
return True
|
||||
if stripped.startswith('"""') or stripped.startswith("'''"):
|
||||
return True
|
||||
if '"""' in stripped or "'''" in stripped:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def scan_line_for_hardcoded_paths(line: str, filename: str, line_no: int) -> Iterable[Finding]:
|
||||
"""Scan a single line for hardcoded ~/.hermes paths."""
|
||||
if _is_safe_context(filename):
|
||||
return
|
||||
|
||||
stripped = line.rstrip("\n")
|
||||
if not stripped:
|
||||
return
|
||||
|
||||
# Skip comments and docstrings
|
||||
if _is_comment_or_doc(stripped):
|
||||
return
|
||||
|
||||
for pattern, message in _ALL_PATTERNS:
|
||||
if pattern.search(stripped):
|
||||
yield Finding(
|
||||
filename,
|
||||
line_no,
|
||||
message,
|
||||
"Use get_hermes_home() from hermes_constants for paths, display_hermes_home() for display",
|
||||
)
|
||||
return # One finding per line is enough
|
||||
|
||||
|
||||
def get_staged_files() -> List[str]:
|
||||
"""Get list of staged files in the git index."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
return [f.strip() for f in result.stdout.splitlines() if f.strip()]
|
||||
except subprocess.CalledProcessError:
|
||||
return []
|
||||
|
||||
|
||||
def get_staged_content(filename: str) -> str:
|
||||
"""Get the staged content of a file."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "show", f":{filename}"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
return result.stdout
|
||||
except subprocess.CalledProcessError:
|
||||
return ""
|
||||
|
||||
|
||||
def scan_file(filename: str) -> List[Finding]:
|
||||
"""Scan a file for hardcoded ~/.hermes paths."""
|
||||
if _is_safe_context(filename):
|
||||
return []
|
||||
|
||||
# Only scan Python files
|
||||
if not filename.endswith(".py"):
|
||||
return []
|
||||
|
||||
content = get_staged_content(filename)
|
||||
if not content:
|
||||
return []
|
||||
|
||||
findings = []
|
||||
for line_no, line in enumerate(content.splitlines(), start=1):
|
||||
for finding in scan_line_for_hardcoded_paths(line, filename, line_no):
|
||||
findings.append(finding)
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Main entry point for the pre-commit hook."""
|
||||
staged_files = get_staged_files()
|
||||
if not staged_files:
|
||||
return 0
|
||||
|
||||
all_findings = []
|
||||
for filename in staged_files:
|
||||
findings = scan_file(filename)
|
||||
all_findings.extend(findings)
|
||||
|
||||
if not all_findings:
|
||||
return 0
|
||||
|
||||
# Print findings
|
||||
print(f"\n{RED}✗ Hardcoded ~/.hermes paths detected:{NC}\n")
|
||||
for finding in all_findings:
|
||||
print(f" {YELLOW}{finding.filename}:{finding.line}{NC}")
|
||||
print(f" {finding.message}")
|
||||
if finding.suggestion:
|
||||
print(f" {GREEN}Fix: {finding.suggestion}{NC}")
|
||||
print()
|
||||
|
||||
print(f"{RED}Found {len(all_findings)} hardcoded path(s).{NC}")
|
||||
print(f"{YELLOW}Use get_hermes_home() from hermes_constants for paths.{NC}")
|
||||
print(f"{YELLOW}Use display_hermes_home() for user-facing display.{NC}")
|
||||
print(f"\n{YELLOW}To bypass: git commit --no-verify{NC}\n")
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -295,22 +295,6 @@ def main() -> int:
|
||||
if line.startswith("+") and not line.startswith("+++"):
|
||||
findings.extend(scan_line(line[1:], "<diff>", line_no))
|
||||
|
||||
# Also check for hardcoded ~/.hermes paths
|
||||
print(f"{GREEN}🔍 Scanning for hardcoded ~/.hermes paths...{NC}")
|
||||
try:
|
||||
import subprocess as sp
|
||||
result = sp.run(
|
||||
[sys.executable, str(Path(__file__).parent / "check_hardcoded_paths.py")],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
# Print the output from the hardcoded path check
|
||||
print(result.stdout)
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"{YELLOW}Warning: Could not run hardcoded path check: {e}{NC}")
|
||||
|
||||
if not findings:
|
||||
print(f"{GREEN}✓ No potential secret leaks detected{NC}")
|
||||
return 0
|
||||
|
||||
17
.github/workflows/tests.yml
vendored
17
.github/workflows/tests.yml
vendored
@@ -12,23 +12,6 @@ concurrency:
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
check-hardcoded-paths:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 5
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
|
||||
- name: Check for hardcoded ~/.hermes paths
|
||||
run: |
|
||||
python .githooks/check_hardcoded_paths.py
|
||||
# This will fail if any hardcoded paths are found
|
||||
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
container: catthehacker/ubuntu:act-22.04
|
||||
|
||||
104
hermes_state.py
104
hermes_state.py
@@ -32,7 +32,7 @@ T = TypeVar("T")
|
||||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 7
|
||||
SCHEMA_VERSION = 6
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
@@ -66,7 +66,6 @@ CREATE TABLE IF NOT EXISTS sessions (
|
||||
cost_source TEXT,
|
||||
pricing_version TEXT,
|
||||
title TEXT,
|
||||
profile TEXT,
|
||||
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
|
||||
);
|
||||
|
||||
@@ -87,7 +86,6 @@ CREATE TABLE IF NOT EXISTS messages (
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
@@ -332,19 +330,6 @@ class SessionDB:
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 6")
|
||||
if current_version < 7:
|
||||
# v7: add profile column to sessions for profile isolation (#323)
|
||||
try:
|
||||
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
|
||||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
try:
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
pass
|
||||
cursor.execute("UPDATE schema_version SET version = 7")
|
||||
|
||||
# Unique title index — always ensure it exists (safe to run after migrations
|
||||
# since the title column is guaranteed to exist at this point)
|
||||
@@ -377,19 +362,13 @@ class SessionDB:
|
||||
system_prompt: str = None,
|
||||
user_id: str = None,
|
||||
parent_session_id: str = None,
|
||||
profile: str = None,
|
||||
) -> str:
|
||||
"""Create a new session record. Returns the session_id.
|
||||
|
||||
Args:
|
||||
profile: Profile name for session isolation. When set, sessions
|
||||
are tagged so queries can filter by profile. (#323)
|
||||
"""
|
||||
"""Create a new session record. Returns the session_id."""
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
|
||||
system_prompt, parent_session_id, profile, started_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
system_prompt, parent_session_id, started_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
session_id,
|
||||
source,
|
||||
@@ -398,7 +377,6 @@ class SessionDB:
|
||||
json.dumps(model_config) if model_config else None,
|
||||
system_prompt,
|
||||
parent_session_id,
|
||||
profile,
|
||||
time.time(),
|
||||
),
|
||||
)
|
||||
@@ -527,23 +505,19 @@ class SessionDB:
|
||||
session_id: str,
|
||||
source: str = "unknown",
|
||||
model: str = None,
|
||||
profile: str = None,
|
||||
) -> None:
|
||||
"""Ensure a session row exists, creating it with minimal metadata if absent.
|
||||
|
||||
Used by _flush_messages_to_session_db to recover from a failed
|
||||
create_session() call (e.g. transient SQLite lock at agent startup).
|
||||
INSERT OR IGNORE is safe to call even when the row already exists.
|
||||
|
||||
Args:
|
||||
profile: Profile name for session isolation. (#323)
|
||||
"""
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"""INSERT OR IGNORE INTO sessions
|
||||
(id, source, model, profile, started_at)
|
||||
VALUES (?, ?, ?, ?, ?)""",
|
||||
(session_id, source, model, profile, time.time()),
|
||||
(id, source, model, started_at)
|
||||
VALUES (?, ?, ?, ?)""",
|
||||
(session_id, source, model, time.time()),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
@@ -814,7 +788,6 @@ class SessionDB:
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
include_children: bool = False,
|
||||
profile: str = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List sessions with preview (first user message) and last active timestamp.
|
||||
|
||||
@@ -826,10 +799,6 @@ class SessionDB:
|
||||
|
||||
By default, child sessions (subagent runs, compression continuations)
|
||||
are excluded. Pass ``include_children=True`` to include them.
|
||||
|
||||
Args:
|
||||
profile: Filter sessions to this profile name. Pass None to see all.
|
||||
(#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
@@ -844,9 +813,6 @@ class SessionDB:
|
||||
placeholders = ",".join("?" for _ in exclude_sources)
|
||||
where_clauses.append(f"s.source NOT IN ({placeholders})")
|
||||
params.extend(exclude_sources)
|
||||
if profile:
|
||||
where_clauses.append("s.profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
query = f"""
|
||||
@@ -1192,52 +1158,34 @@ class SessionDB:
|
||||
source: str = None,
|
||||
limit: int = 20,
|
||||
offset: int = 0,
|
||||
profile: str = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List sessions, optionally filtered by source and profile.
|
||||
|
||||
Args:
|
||||
profile: Filter sessions to this profile name. Pass None to see all.
|
||||
(#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
if source:
|
||||
where_clauses.append("source = ?")
|
||||
params.append(source)
|
||||
if profile:
|
||||
where_clauses.append("profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
|
||||
params.extend([limit, offset])
|
||||
"""List sessions, optionally filtered by source."""
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(query, params)
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
|
||||
(source, limit, offset),
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
|
||||
(limit, offset),
|
||||
)
|
||||
return [dict(row) for row in cursor.fetchall()]
|
||||
|
||||
# =========================================================================
|
||||
# Utility
|
||||
# =========================================================================
|
||||
|
||||
def session_count(self, source: str = None, profile: str = None) -> int:
|
||||
"""Count sessions, optionally filtered by source and profile.
|
||||
|
||||
Args:
|
||||
profile: Filter to this profile name. Pass None to count all. (#323)
|
||||
"""
|
||||
where_clauses = []
|
||||
params = []
|
||||
if source:
|
||||
where_clauses.append("source = ?")
|
||||
params.append(source)
|
||||
if profile:
|
||||
where_clauses.append("profile = ?")
|
||||
params.append(profile)
|
||||
|
||||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||||
def session_count(self, source: str = None) -> int:
|
||||
"""Count sessions, optionally filtered by source."""
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
|
||||
if source:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
|
||||
)
|
||||
else:
|
||||
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
def message_count(self, session_id: str = None) -> int:
|
||||
|
||||
@@ -1,175 +0,0 @@
|
||||
"""
|
||||
Tests for hardcoded ~/.hermes path detection (poka-yoke).
|
||||
|
||||
These tests verify that the pre-commit hook correctly detects hardcoded
|
||||
paths and that the codebase uses get_hermes_home() correctly.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import the scanner
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / ".githooks"))
|
||||
from check_hardcoded_paths import scan_line_for_hardcoded_paths, Finding
|
||||
|
||||
|
||||
class TestHardcodedPathDetection:
|
||||
"""Test the hardcoded path detection logic."""
|
||||
|
||||
def test_detects_path_home_hermes(self):
|
||||
"""Detect Path.home() / '.hermes' pattern."""
|
||||
line = ' home = Path.home() / ".hermes"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 1
|
||||
assert "Path.home()" in findings[0].message
|
||||
|
||||
def test_detects_path_home_hermes_subpath(self):
|
||||
"""Detect Path.home() / '.hermes' / 'subdir' pattern."""
|
||||
line = ' config_dir = Path.home() / ".hermes" / "config"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 1
|
||||
|
||||
def test_detects_tilde_hermes_in_string(self):
|
||||
"""Detect '~/.hermes' in string literals."""
|
||||
line = ' path = "~/.hermes/config.yaml"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 1
|
||||
|
||||
def test_detects_expanduser_hermes(self):
|
||||
"""Detect os.path.expanduser('~/.hermes') pattern."""
|
||||
line = ' home = os.path.expanduser("~/.hermes")'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 1
|
||||
|
||||
def test_detects_join_expanduser(self):
|
||||
"""Detect os.path.join(expanduser('~'), '.hermes') pattern."""
|
||||
line = ' home = os.path.join(os.path.expanduser("~"), ".hermes")'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 1
|
||||
|
||||
def test_ignores_comments(self):
|
||||
"""Ignore hardcoded paths in comments."""
|
||||
line = ' # This is ~/.hermes in a comment'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_ignores_docstrings(self):
|
||||
"""Ignore hardcoded paths in docstrings."""
|
||||
line = ' """This mentions ~/.hermes in a docstring."""'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_ignores_hermes_constants(self):
|
||||
"""hermes_constants.py is allowed to have hardcoded paths."""
|
||||
line = ' return Path.home() / ".hermes"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "hermes_constants.py", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_ignores_test_files(self):
|
||||
"""Test files can have hardcoded paths for testing."""
|
||||
line = ' home = Path.home() / ".hermes"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test_something.py", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_ignores_markdown_files(self):
|
||||
"""Markdown files can have hardcoded paths in examples."""
|
||||
line = ' home = Path.home() / ".hermes"'
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "README.md", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_ignores_empty_lines(self):
|
||||
"""Empty lines should not produce findings."""
|
||||
line = ""
|
||||
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
|
||||
assert len(findings) == 0
|
||||
|
||||
|
||||
class TestHermesHomeUsage:
|
||||
"""Test that the codebase uses get_hermes_home() correctly."""
|
||||
|
||||
def test_hermes_constants_has_get_hermes_home(self):
|
||||
"""hermes_constants.py should export get_hermes_home()."""
|
||||
from hermes_constants import get_hermes_home
|
||||
assert callable(get_hermes_home)
|
||||
|
||||
def test_hermes_constants_has_display_hermes_home(self):
|
||||
"""hermes_constants.py should export display_hermes_home()."""
|
||||
from hermes_constants import display_hermes_home
|
||||
assert callable(display_hermes_home)
|
||||
|
||||
def test_get_hermes_home_returns_path(self):
|
||||
"""get_hermes_home() should return a Path object."""
|
||||
from hermes_constants import get_hermes_home
|
||||
result = get_hermes_home()
|
||||
assert isinstance(result, Path)
|
||||
|
||||
def test_get_hermes_home_honors_env_var(self):
|
||||
"""get_hermes_home() should honor HERMES_HOME env var."""
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with patch.dict(os.environ, {"HERMES_HOME": tmpdir}):
|
||||
result = get_hermes_home()
|
||||
assert result == Path(tmpdir)
|
||||
|
||||
def test_display_hermes_home_returns_string(self):
|
||||
"""display_hermes_home() should return a string."""
|
||||
from hermes_constants import display_hermes_home
|
||||
result = display_hermes_home()
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_display_hermes_home_uses_tilde_shorthand(self):
|
||||
"""display_hermes_home() should use ~/ shorthand for home directory."""
|
||||
from hermes_constants import display_hermes_home, get_hermes_home
|
||||
|
||||
# If HERMES_HOME is under home directory, should use ~/
|
||||
home = get_hermes_home()
|
||||
if home.is_relative_to(Path.home()):
|
||||
result = display_hermes_home()
|
||||
assert result.startswith("~/")
|
||||
|
||||
def test_profile_isolation_with_env_var(self):
|
||||
"""Each profile should have its own HERMES_HOME."""
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir1, tempfile.TemporaryDirectory() as tmpdir2:
|
||||
# Profile 1
|
||||
with patch.dict(os.environ, {"HERMES_HOME": tmpdir1}):
|
||||
home1 = get_hermes_home()
|
||||
|
||||
# Profile 2
|
||||
with patch.dict(os.environ, {"HERMES_HOME": tmpdir2}):
|
||||
home2 = get_hermes_home()
|
||||
|
||||
assert home1 != home2
|
||||
assert home1 == Path(tmpdir1)
|
||||
assert home2 == Path(tmpdir2)
|
||||
|
||||
|
||||
class TestPreCommitHookIntegration:
|
||||
"""Integration tests for the pre-commit hook."""
|
||||
|
||||
def test_hook_script_exists(self):
|
||||
"""The check_hardcoded_paths.py script should exist."""
|
||||
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
|
||||
assert hook_path.exists()
|
||||
|
||||
def test_hook_script_is_executable(self):
|
||||
"""The check_hardcoded_paths.py script should be executable."""
|
||||
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
|
||||
assert hook_path.stat().st_mode & 0o111 # Check executable bits
|
||||
|
||||
def test_pre_commit_calls_hardcoded_check(self):
|
||||
"""pre-commit.py should call the hardcoded path check."""
|
||||
pre_commit_path = Path(__file__).parent.parent / ".githooks" / "pre-commit.py"
|
||||
content = pre_commit_path.read_text()
|
||||
assert "check_hardcoded_paths.py" in content
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
188
tests/tools/test_session_templates.py
Normal file
188
tests/tools/test_session_templates.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""Tests for session templates (code-first seeding)."""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.session_templates import (
|
||||
SessionTemplate,
|
||||
SessionTemplates,
|
||||
TaskType,
|
||||
ToolCallExample,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_templates(tmp_path):
|
||||
return SessionTemplates(templates_dir=tmp_path / "templates")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Task type classification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestClassifyTaskType:
|
||||
def test_code_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "execute_code"}, {"name": "execute_code"},
|
||||
{"name": "execute_code"}, {"name": "read_file"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
|
||||
|
||||
def test_file_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "read_file"}, {"name": "write_file"},
|
||||
{"name": "patch"}, {"name": "read_file"},
|
||||
{"name": "execute_code"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.FILE
|
||||
|
||||
def test_research_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "web_search"}, {"name": "web_fetch"},
|
||||
{"name": "web_search"}, {"name": "read_file"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.RESEARCH
|
||||
|
||||
def test_mixed_no_dominant(self, tmp_templates):
|
||||
calls = [
|
||||
{"name": "execute_code"}, {"name": "read_file"},
|
||||
{"name": "web_search"},
|
||||
]
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
|
||||
|
||||
def test_empty_returns_mixed(self, tmp_templates):
|
||||
assert tmp_templates.classify_task_type([]) == TaskType.MIXED
|
||||
|
||||
def test_threshold_is_60_percent(self, tmp_templates):
|
||||
# 59% code (5/9) should be MIXED
|
||||
calls = [{"name": "execute_code"}] * 5 + [{"name": "read_file"}] * 4
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.MIXED
|
||||
|
||||
# 60% code (6/10) should be CODE
|
||||
calls = [{"name": "execute_code"}] * 6 + [{"name": "read_file"}] * 4
|
||||
assert tmp_templates.classify_task_type(calls) == TaskType.CODE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template CRUD
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTemplateCRUD:
|
||||
def test_save_and_list(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-code",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={"code": "print('hi')"}, success=True),
|
||||
],
|
||||
created_at="2026-01-01T00:00:00Z",
|
||||
)
|
||||
tmp_templates.save_template(template)
|
||||
|
||||
templates = tmp_templates.list_templates()
|
||||
assert len(templates) == 1
|
||||
assert templates[0].name == "test-code"
|
||||
assert templates[0].task_type == TaskType.CODE
|
||||
|
||||
def test_list_filter_by_type(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(name="t1", task_type=TaskType.CODE, examples=[]))
|
||||
tmp_templates.save_template(SessionTemplate(name="t2", task_type=TaskType.FILE, examples=[]))
|
||||
|
||||
code_templates = tmp_templates.list_templates(TaskType.CODE)
|
||||
assert len(code_templates) == 1
|
||||
assert code_templates[0].name == "t1"
|
||||
|
||||
def test_delete(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(name="delete-me", task_type=TaskType.CODE, examples=[]))
|
||||
assert tmp_templates.delete_template("delete-me") is True
|
||||
assert len(tmp_templates.list_templates()) == 0
|
||||
|
||||
def test_delete_nonexistent(self, tmp_templates):
|
||||
assert tmp_templates.delete_template("nope") is False
|
||||
|
||||
def test_get_template_returns_best(self, tmp_templates):
|
||||
tmp_templates.save_template(SessionTemplate(
|
||||
name="low-usage", task_type=TaskType.CODE, examples=[], usage_count=1,
|
||||
))
|
||||
tmp_templates.save_template(SessionTemplate(
|
||||
name="high-usage", task_type=TaskType.CODE, examples=[], usage_count=5,
|
||||
))
|
||||
best = tmp_templates.get_template(TaskType.CODE)
|
||||
assert best.name == "high-usage"
|
||||
|
||||
def test_get_template_returns_none_if_empty(self, tmp_templates):
|
||||
assert tmp_templates.get_template(TaskType.CODE) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Template injection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInjectIntoMessages:
|
||||
def test_injects_after_system(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-inject",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(
|
||||
tool_name="execute_code",
|
||||
args={"code": "x=1"},
|
||||
result_preview="1",
|
||||
success=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
messages = [
|
||||
{"role": "system", "content": "You are Timmy."},
|
||||
{"role": "user", "content": "Hello"},
|
||||
]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
|
||||
# Should have: system, template system note, assistant tool call, tool result, user
|
||||
assert len(result) == 5
|
||||
assert result[0]["role"] == "system"
|
||||
assert "Session Template" in result[1]["content"]
|
||||
assert result[2]["role"] == "assistant"
|
||||
assert result[3]["role"] == "tool"
|
||||
assert result[4]["role"] == "user"
|
||||
|
||||
def test_skips_failed_examples(self, tmp_templates):
|
||||
template = SessionTemplate(
|
||||
name="test-fail",
|
||||
task_type=TaskType.CODE,
|
||||
examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={}, success=False),
|
||||
ToolCallExample(tool_name="read_file", args={"path": "x"}, success=True),
|
||||
],
|
||||
)
|
||||
messages = [{"role": "system", "content": "sys"}]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
|
||||
# Only the successful example should be injected
|
||||
tool_calls = [m for m in result if m.get("role") == "assistant" and m.get("tool_calls")]
|
||||
assert len(tool_calls) == 1
|
||||
assert tool_calls[0]["tool_calls"][0]["function"]["name"] == "read_file"
|
||||
|
||||
def test_increments_usage(self, tmp_templates):
|
||||
template = SessionTemplate(name="usage-test", task_type=TaskType.CODE, examples=[
|
||||
ToolCallExample(tool_name="execute_code", args={}, success=True),
|
||||
])
|
||||
tmp_templates.save_template(template)
|
||||
|
||||
tmp_templates.inject_into_messages(template, [{"role": "system", "content": "x"}])
|
||||
assert template.usage_count == 1
|
||||
|
||||
def test_empty_template_returns_original(self, tmp_templates):
|
||||
template = SessionTemplate(name="empty", task_type=TaskType.CODE, examples=[])
|
||||
messages = [{"role": "user", "content": "hi"}]
|
||||
result = tmp_templates.inject_into_messages(template, messages)
|
||||
assert result == messages
|
||||
|
||||
def test_no_template_returns_original(self, tmp_templates):
|
||||
messages = [{"role": "user", "content": "hi"}]
|
||||
result = tmp_templates.inject_into_messages(None, messages)
|
||||
assert result == messages
|
||||
418
tools/session_templates.py
Normal file
418
tools/session_templates.py
Normal file
@@ -0,0 +1,418 @@
|
||||
"""
|
||||
Session templates for code-first seeding.
|
||||
|
||||
Research finding: Code-heavy sessions (execute_code dominant in first 30 turns)
|
||||
improve over time. File-heavy sessions degrade. The key is deterministic
|
||||
feedback loops, not arbitrary context.
|
||||
|
||||
This module provides:
|
||||
1. Task type classification (CODE, FILE, RESEARCH, MIXED)
|
||||
2. Template extraction from completed sessions
|
||||
3. Template storage (~/.hermes/session-templates/)
|
||||
4. Template injection into new sessions
|
||||
5. CLI interface for template management
|
||||
|
||||
Closes #329.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
HERMES_HOME = Path(os.environ.get("HERMES_HOME", str(Path.home() / ".hermes")))
|
||||
TEMPLATES_DIR = HERMES_HOME / "session-templates"
|
||||
SESSIONS_DB = HERMES_HOME / "state.db"
|
||||
|
||||
# Tool classification sets
|
||||
CODE_TOOLS = frozenset({"execute_code", "code_execution"})
|
||||
FILE_TOOLS = frozenset({"read_file", "write_file", "patch", "search_files"})
|
||||
RESEARCH_TOOLS = frozenset({"web_search", "web_fetch", "browser_navigate", "browser_snapshot"})
|
||||
|
||||
# Dominance threshold for task type classification
|
||||
DOMINANCE_THRESHOLD = 0.6
|
||||
|
||||
# Default max examples to extract per template
|
||||
DEFAULT_MAX_EXAMPLES = 10
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TaskType(str, Enum):
|
||||
CODE = "code"
|
||||
FILE = "file"
|
||||
RESEARCH = "research"
|
||||
MIXED = "mixed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolCallExample:
|
||||
"""A single tool call with its result, used as a template example."""
|
||||
tool_name: str
|
||||
args: dict[str, Any]
|
||||
result_preview: str = ""
|
||||
success: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class SessionTemplate:
|
||||
"""A session template containing tool call examples for seeding."""
|
||||
name: str
|
||||
task_type: TaskType
|
||||
examples: list[ToolCallExample] = field(default_factory=list)
|
||||
source_session_id: str = ""
|
||||
created_at: str = ""
|
||||
usage_count: int = 0
|
||||
description: str = ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class SessionTemplates:
|
||||
"""Manages session templates for code-first seeding."""
|
||||
|
||||
def __init__(self, templates_dir: Optional[Path] = None):
|
||||
self.templates_dir = templates_dir or TEMPLATES_DIR
|
||||
self.templates_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def classify_task_type(self, tool_calls: list[dict[str, Any]]) -> TaskType:
|
||||
"""Classify a session's task type based on tool call patterns.
|
||||
|
||||
Uses 60% threshold for dominant type.
|
||||
"""
|
||||
if not tool_calls:
|
||||
return TaskType.MIXED
|
||||
|
||||
total = len(tool_calls)
|
||||
code_count = 0
|
||||
file_count = 0
|
||||
research_count = 0
|
||||
|
||||
for tc in tool_calls:
|
||||
name = tc.get("name", tc.get("tool_name", "")).lower()
|
||||
if name in CODE_TOOLS:
|
||||
code_count += 1
|
||||
elif name in FILE_TOOLS:
|
||||
file_count += 1
|
||||
elif name in RESEARCH_TOOLS:
|
||||
research_count += 1
|
||||
|
||||
code_ratio = code_count / total
|
||||
file_ratio = file_count / total
|
||||
research_ratio = research_count / total
|
||||
|
||||
if code_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.CODE
|
||||
if file_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.FILE
|
||||
if research_ratio >= DOMINANCE_THRESHOLD:
|
||||
return TaskType.RESEARCH
|
||||
return TaskType.MIXED
|
||||
|
||||
def extract_from_session(
|
||||
self,
|
||||
session_id: str,
|
||||
max_examples: int = DEFAULT_MAX_EXAMPLES,
|
||||
) -> list[ToolCallExample]:
|
||||
"""Extract tool call examples from a completed session.
|
||||
|
||||
Reads from the SQLite session database.
|
||||
"""
|
||||
examples: list[ToolCallExample] = []
|
||||
|
||||
db_path = SESSIONS_DB
|
||||
if not db_path.exists():
|
||||
return examples
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
rows = conn.execute(
|
||||
"SELECT messages FROM sessions WHERE session_id = ? ORDER BY created_at DESC LIMIT 1",
|
||||
(session_id,),
|
||||
).fetchone()
|
||||
|
||||
if not rows:
|
||||
conn.close()
|
||||
return examples
|
||||
|
||||
messages = json.loads(rows["messages"])
|
||||
|
||||
# Extract tool calls from assistant messages
|
||||
for msg in messages:
|
||||
if msg.get("role") != "assistant":
|
||||
continue
|
||||
tool_calls = msg.get("tool_calls", [])
|
||||
if not tool_calls:
|
||||
continue
|
||||
|
||||
for tc in tool_calls:
|
||||
if len(examples) >= max_examples:
|
||||
break
|
||||
|
||||
fn = tc.get("function", {})
|
||||
name = fn.get("name", "")
|
||||
if not name:
|
||||
continue
|
||||
|
||||
try:
|
||||
args = json.loads(fn.get("arguments", "{}"))
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
args = {}
|
||||
|
||||
# Find the corresponding tool result
|
||||
result_preview = ""
|
||||
success = True
|
||||
tc_id = tc.get("id", "")
|
||||
|
||||
for result_msg in messages:
|
||||
if (result_msg.get("role") == "tool"
|
||||
and result_msg.get("tool_call_id") == tc_id):
|
||||
content = result_msg.get("content", "")
|
||||
result_preview = str(content)[:200]
|
||||
# Heuristic: errors contain common failure markers
|
||||
if any(marker in result_preview.lower() for marker in ("error", "failed", "traceback", "exception")):
|
||||
success = False
|
||||
break
|
||||
|
||||
examples.append(ToolCallExample(
|
||||
tool_name=name,
|
||||
args=args,
|
||||
result_preview=result_preview,
|
||||
success=success,
|
||||
))
|
||||
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return examples
|
||||
|
||||
def create_template(
|
||||
self,
|
||||
session_id: str,
|
||||
name: Optional[str] = None,
|
||||
description: str = "",
|
||||
max_examples: int = DEFAULT_MAX_EXAMPLES,
|
||||
) -> Optional[SessionTemplate]:
|
||||
"""Create a template from a session's tool call history."""
|
||||
examples = self.extract_from_session(session_id, max_examples)
|
||||
if not examples:
|
||||
return None
|
||||
|
||||
tool_calls_for_type = [{"name": e.tool_name} for e in examples]
|
||||
task_type = self.classify_task_type(tool_calls_for_type)
|
||||
|
||||
template_name = name or f"{task_type.value}_{session_id[:8]}"
|
||||
|
||||
from datetime import datetime
|
||||
template = SessionTemplate(
|
||||
name=template_name,
|
||||
task_type=task_type,
|
||||
examples=examples,
|
||||
source_session_id=session_id,
|
||||
created_at=datetime.utcnow().isoformat() + "Z",
|
||||
description=description or f"Auto-extracted from {session_id}",
|
||||
)
|
||||
|
||||
self.save_template(template)
|
||||
return template
|
||||
|
||||
def save_template(self, template: SessionTemplate) -> Path:
|
||||
"""Save a template to disk."""
|
||||
path = self.templates_dir / f"{template.name}.json"
|
||||
data = {
|
||||
"name": template.name,
|
||||
"task_type": template.task_type.value,
|
||||
"examples": [asdict(e) for e in template.examples],
|
||||
"source_session_id": template.source_session_id,
|
||||
"created_at": template.created_at,
|
||||
"usage_count": template.usage_count,
|
||||
"description": template.description,
|
||||
}
|
||||
path.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n")
|
||||
return path
|
||||
|
||||
def get_template(self, task_type: TaskType) -> Optional[SessionTemplate]:
|
||||
"""Get the best template for a given task type."""
|
||||
templates = self.list_templates(task_type)
|
||||
if not templates:
|
||||
return None
|
||||
|
||||
# Prefer templates with more usage (proven useful)
|
||||
templates.sort(key=lambda t: t.usage_count, reverse=True)
|
||||
return templates[0]
|
||||
|
||||
def list_templates(self, task_type: Optional[TaskType] = None) -> list[SessionTemplate]:
|
||||
"""List all templates, optionally filtered by type."""
|
||||
templates: list[SessionTemplate] = []
|
||||
|
||||
for path in sorted(self.templates_dir.glob("*.json")):
|
||||
try:
|
||||
data = json.loads(path.read_text())
|
||||
examples = [ToolCallExample(**e) for e in data.get("examples", [])]
|
||||
template = SessionTemplate(
|
||||
name=data["name"],
|
||||
task_type=TaskType(data["task_type"]),
|
||||
examples=examples,
|
||||
source_session_id=data.get("source_session_id", ""),
|
||||
created_at=data.get("created_at", ""),
|
||||
usage_count=data.get("usage_count", 0),
|
||||
description=data.get("description", ""),
|
||||
)
|
||||
if task_type is None or template.task_type == task_type:
|
||||
templates.append(template)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return templates
|
||||
|
||||
def delete_template(self, name: str) -> bool:
|
||||
"""Delete a template by name."""
|
||||
path = self.templates_dir / f"{name}.json"
|
||||
if path.exists():
|
||||
path.unlink()
|
||||
return True
|
||||
return False
|
||||
|
||||
def inject_into_messages(
|
||||
self,
|
||||
template: SessionTemplate,
|
||||
messages: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Inject template examples into a session's messages.
|
||||
|
||||
Inserts tool call examples after system messages to establish
|
||||
feedback loops early.
|
||||
"""
|
||||
if not template or not template.examples:
|
||||
return messages
|
||||
|
||||
# Build injection messages
|
||||
injection: list[dict[str, Any]] = []
|
||||
|
||||
# System note about the template
|
||||
injection.append({
|
||||
"role": "system",
|
||||
"content": (
|
||||
f"[Session Template: '{template.name}' ({template.task_type.value})]\n"
|
||||
f"The following are examples of successful tool calls from a similar session. "
|
||||
f"Use them as patterns for your own tool usage."
|
||||
),
|
||||
})
|
||||
|
||||
# Add example tool call/result pairs
|
||||
for ex in template.examples:
|
||||
if not ex.success:
|
||||
continue # Only inject successful examples
|
||||
|
||||
injection.append({
|
||||
"role": "assistant",
|
||||
"content": None,
|
||||
"tool_calls": [{
|
||||
"id": f"template_{template.name}_{ex.tool_name}",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": ex.tool_name,
|
||||
"arguments": json.dumps(ex.args),
|
||||
},
|
||||
}],
|
||||
})
|
||||
injection.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": f"template_{template.name}_{ex.tool_name}",
|
||||
"content": ex.result_preview or "(example result)",
|
||||
})
|
||||
|
||||
# Find insertion point: after system messages
|
||||
insert_idx = 0
|
||||
for i, msg in enumerate(messages):
|
||||
if msg.get("role") == "system":
|
||||
insert_idx = i + 1
|
||||
else:
|
||||
break
|
||||
|
||||
# Insert
|
||||
result = messages[:insert_idx] + injection + messages[insert_idx:]
|
||||
|
||||
# Update usage count
|
||||
template.usage_count += 1
|
||||
self.save_template(template)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _cli():
|
||||
"""Simple CLI for session template management."""
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(description="Session template management")
|
||||
sub = parser.add_subparsers(dest="command")
|
||||
|
||||
# list
|
||||
list_cmd = sub.add_parser("list", help="List templates")
|
||||
list_cmd.add_argument("--type", choices=["code", "file", "research", "mixed"])
|
||||
|
||||
# create
|
||||
create_cmd = sub.add_parser("create", help="Create template from session")
|
||||
create_cmd.add_argument("session_id", help="Session ID to extract from")
|
||||
create_cmd.add_argument("--name", help="Template name")
|
||||
create_cmd.add_argument("--max-examples", type=int, default=10)
|
||||
|
||||
# delete
|
||||
delete_cmd = sub.add_parser("delete", help="Delete template")
|
||||
delete_cmd.add_argument("name", help="Template name")
|
||||
|
||||
args = parser.parse_args()
|
||||
tm = SessionTemplates()
|
||||
|
||||
if args.command == "list":
|
||||
task_type = TaskType(args.type) if args.type else None
|
||||
templates = tm.list_templates(task_type)
|
||||
if not templates:
|
||||
print("No templates found.")
|
||||
return
|
||||
for t in templates:
|
||||
print(f" {t.name:30s} {t.task_type.value:10s} {len(t.examples)} examples, used {t.usage_count}x")
|
||||
|
||||
elif args.command == "create":
|
||||
template = tm.create_template(args.session_id, name=args.name, max_examples=args.max_examples)
|
||||
if template:
|
||||
print(f"Created template: {template.name} ({template.task_type.value}, {len(template.examples)} examples)")
|
||||
else:
|
||||
print(f"No tool calls found in session {args.session_id}")
|
||||
sys.exit(1)
|
||||
|
||||
elif args.command == "delete":
|
||||
if tm.delete_template(args.name):
|
||||
print(f"Deleted template: {args.name}")
|
||||
else:
|
||||
print(f"Template not found: {args.name}")
|
||||
sys.exit(1)
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_cli()
|
||||
Reference in New Issue
Block a user