[#103] Multi-tier caching layer for local Timmy — KV, Response, Tool, Embedding, Template, HTTP caches

This commit is contained in:
Allegro
2026-03-30 16:52:53 +00:00
parent b0bb8a7c7d
commit ac6cc67e49
4 changed files with 1898 additions and 0 deletions

656
timmy-local/cache/agent_cache.py vendored Normal file
View File

@@ -0,0 +1,656 @@
#!/usr/bin/env python3
"""
Multi-Tier Caching Layer for Local Timmy
Issue #103 — Cache Everywhere
Provides:
- Tier 1: KV Cache (prompt prefix caching)
- Tier 2: Semantic Response Cache (full LLM responses)
- Tier 3: Tool Result Cache (stable tool outputs)
- Tier 4: Embedding Cache (RAG embeddings)
- Tier 5: Template Cache (pre-compiled prompts)
- Tier 6: HTTP Response Cache (API responses)
"""
import sqlite3
import hashlib
import json
import time
import threading
from typing import Optional, Any, Dict, List, Callable
from dataclasses import dataclass, asdict
from pathlib import Path
import pickle
import functools
@dataclass
class CacheStats:
"""Statistics for cache monitoring."""
hits: int = 0
misses: int = 0
evictions: int = 0
hit_rate: float = 0.0
def record_hit(self):
self.hits += 1
self._update_rate()
def record_miss(self):
self.misses += 1
self._update_rate()
def record_eviction(self):
self.evictions += 1
def _update_rate(self):
total = self.hits + self.misses
if total > 0:
self.hit_rate = self.hits / total
class LRUCache:
"""In-memory LRU cache for hot path."""
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.cache: Dict[str, Any] = {}
self.access_order: List[str] = []
self.lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
with self.lock:
if key in self.cache:
# Move to front (most recent)
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]
return None
def put(self, key: str, value: Any):
with self.lock:
if key in self.cache:
self.access_order.remove(key)
elif len(self.cache) >= self.max_size:
# Evict oldest
oldest = self.access_order.pop(0)
del self.cache[oldest]
self.cache[key] = value
self.access_order.append(key)
def invalidate(self, key: str):
with self.lock:
if key in self.cache:
self.access_order.remove(key)
del self.cache[key]
def clear(self):
with self.lock:
self.cache.clear()
self.access_order.clear()
class ResponseCache:
"""Tier 2: Semantic Response Cache — full LLM responses."""
def __init__(self, db_path: str = "~/.timmy/cache/responses.db"):
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.stats = CacheStats()
self.lru = LRUCache(max_size=100)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS responses (
prompt_hash TEXT PRIMARY KEY,
response TEXT NOT NULL,
created_at REAL NOT NULL,
ttl INTEGER NOT NULL,
access_count INTEGER DEFAULT 0,
last_accessed REAL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_accessed ON responses(last_accessed)
""")
def _hash_prompt(self, prompt: str) -> str:
"""Hash prompt after normalizing (removing timestamps, etc)."""
# Normalize: lowercase, strip extra whitespace
normalized = " ".join(prompt.lower().split())
return hashlib.sha256(normalized.encode()).hexdigest()[:32]
def get(self, prompt: str, ttl: int = 3600) -> Optional[str]:
"""Get cached response if available and not expired."""
prompt_hash = self._hash_prompt(prompt)
# Check LRU first
cached = self.lru.get(prompt_hash)
if cached:
self.stats.record_hit()
return cached
# Check disk cache
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT response, created_at, ttl FROM responses WHERE prompt_hash = ?",
(prompt_hash,)
).fetchone()
if row:
response, created_at, stored_ttl = row
# Use minimum of requested and stored TTL
effective_ttl = min(ttl, stored_ttl)
if time.time() - created_at < effective_ttl:
# Cache hit
self.stats.record_hit()
# Update access stats
conn.execute(
"UPDATE responses SET access_count = access_count + 1, last_accessed = ? WHERE prompt_hash = ?",
(time.time(), prompt_hash)
)
# Add to LRU
self.lru.put(prompt_hash, response)
return response
else:
# Expired
conn.execute("DELETE FROM responses WHERE prompt_hash = ?", (prompt_hash,))
self.stats.record_eviction()
self.stats.record_miss()
return None
def put(self, prompt: str, response: str, ttl: int = 3600):
"""Cache a response with TTL."""
prompt_hash = self._hash_prompt(prompt)
# Add to LRU
self.lru.put(prompt_hash, response)
# Add to disk cache
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO responses
(prompt_hash, response, created_at, ttl, last_accessed)
VALUES (?, ?, ?, ?, ?)""",
(prompt_hash, response, time.time(), ttl, time.time())
)
def invalidate_pattern(self, pattern: str):
"""Invalidate all cached responses matching pattern."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM responses WHERE response LIKE ?", (f"%{pattern}%",))
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with sqlite3.connect(self.db_path) as conn:
count = conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0]
total_accesses = conn.execute("SELECT SUM(access_count) FROM responses").fetchone()[0] or 0
return {
"tier": "response_cache",
"memory_entries": len(self.lru.cache),
"disk_entries": count,
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}",
"total_accesses": total_accesses
}
class ToolCache:
"""Tier 3: Tool Result Cache — stable tool outputs."""
# TTL configuration per tool type (seconds)
TOOL_TTL = {
"system_info": 60,
"disk_usage": 120,
"git_status": 30,
"git_log": 300,
"health_check": 60,
"gitea_list_issues": 120,
"file_read": 30,
"process_list": 30,
"service_status": 60,
}
# Tools that invalidate cache on write operations
INVALIDATORS = {
"git_commit": ["git_status", "git_log"],
"git_pull": ["git_status", "git_log"],
"file_write": ["file_read"],
"gitea_create_issue": ["gitea_list_issues"],
"gitea_comment": ["gitea_list_issues"],
}
def __init__(self, db_path: str = "~/.timmy/cache/tool_cache.db"):
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.stats = CacheStats()
self.lru = LRUCache(max_size=500)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tool_results (
tool_hash TEXT PRIMARY KEY,
tool_name TEXT NOT NULL,
params_hash TEXT NOT NULL,
result TEXT NOT NULL,
created_at REAL NOT NULL,
ttl INTEGER NOT NULL
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_results(tool_name)
""")
def _hash_call(self, tool_name: str, params: Dict) -> str:
"""Hash tool name and params for cache key."""
param_str = json.dumps(params, sort_keys=True)
combined = f"{tool_name}:{param_str}"
return hashlib.sha256(combined.encode()).hexdigest()[:32]
def get(self, tool_name: str, params: Dict) -> Optional[Any]:
"""Get cached tool result if available."""
if tool_name not in self.TOOL_TTL:
return None # Not cacheable
tool_hash = self._hash_call(tool_name, params)
# Check LRU
cached = self.lru.get(tool_hash)
if cached:
self.stats.record_hit()
return pickle.loads(cached)
# Check disk
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT result, created_at, ttl FROM tool_results WHERE tool_hash = ?",
(tool_hash,)
).fetchone()
if row:
result, created_at, ttl = row
if time.time() - created_at < ttl:
self.stats.record_hit()
self.lru.put(tool_hash, result)
return pickle.loads(result)
else:
conn.execute("DELETE FROM tool_results WHERE tool_hash = ?", (tool_hash,))
self.stats.record_eviction()
self.stats.record_miss()
return None
def put(self, tool_name: str, params: Dict, result: Any):
"""Cache a tool result."""
if tool_name not in self.TOOL_TTL:
return # Not cacheable
ttl = self.TOOL_TTL[tool_name]
tool_hash = self._hash_call(tool_name, params)
params_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest()[:16]
# Add to LRU
pickled = pickle.dumps(result)
self.lru.put(tool_hash, pickled)
# Add to disk
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO tool_results
(tool_hash, tool_name, params_hash, result, created_at, ttl)
VALUES (?, ?, ?, ?, ?, ?)""",
(tool_hash, tool_name, params_hash, pickled, time.time(), ttl)
)
def invalidate(self, tool_name: str):
"""Invalidate all cached results for a tool."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM tool_results WHERE tool_name = ?", (tool_name,))
# Clear matching LRU entries
# (simplified: clear all since LRU doesn't track tool names)
self.lru.clear()
def handle_invalidation(self, tool_name: str):
"""Handle cache invalidation after a write operation."""
if tool_name in self.INVALIDATORS:
for dependent in self.INVALIDATORS[tool_name]:
self.invalidate(dependent)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with sqlite3.connect(self.db_path) as conn:
count = conn.execute("SELECT COUNT(*) FROM tool_results").fetchone()[0]
by_tool = conn.execute(
"SELECT tool_name, COUNT(*) FROM tool_results GROUP BY tool_name"
).fetchall()
return {
"tier": "tool_cache",
"memory_entries": len(self.lru.cache),
"disk_entries": count,
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}",
"by_tool": dict(by_tool)
}
class EmbeddingCache:
"""Tier 4: Embedding Cache — for RAG pipeline (#93)."""
def __init__(self, db_path: str = "~/.timmy/cache/embeddings.db"):
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.stats = CacheStats()
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
file_path TEXT PRIMARY KEY,
mtime REAL NOT NULL,
embedding BLOB NOT NULL,
model_name TEXT NOT NULL,
created_at REAL NOT NULL
)
""")
def get(self, file_path: str, mtime: float, model_name: str) -> Optional[List[float]]:
"""Get embedding if file hasn't changed and model matches."""
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT embedding, mtime, model_name FROM embeddings WHERE file_path = ?",
(file_path,)
).fetchone()
if row:
embedding_blob, stored_mtime, stored_model = row
if stored_mtime == mtime and stored_model == model_name:
self.stats.record_hit()
return pickle.loads(embedding_blob)
self.stats.record_miss()
return None
def put(self, file_path: str, mtime: float, embedding: List[float], model_name: str):
"""Store embedding with file metadata."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO embeddings
(file_path, mtime, embedding, model_name, created_at)
VALUES (?, ?, ?, ?, ?)""",
(file_path, mtime, pickle.dumps(embedding), model_name, time.time())
)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with sqlite3.connect(self.db_path) as conn:
count = conn.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0]
models = conn.execute(
"SELECT model_name, COUNT(*) FROM embeddings GROUP BY model_name"
).fetchall()
return {
"tier": "embedding_cache",
"entries": count,
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}",
"by_model": dict(models)
}
class TemplateCache:
"""Tier 5: Template Cache — pre-compiled prompts."""
def __init__(self):
self.templates: Dict[str, str] = {}
self.tokenized: Dict[str, Any] = {} # For tokenizer outputs
self.stats = CacheStats()
def load_template(self, name: str, path: str) -> str:
"""Load and cache a template file."""
if name not in self.templates:
with open(path, 'r') as f:
self.templates[name] = f.read()
self.stats.record_miss()
else:
self.stats.record_hit()
return self.templates[name]
def get(self, name: str) -> Optional[str]:
"""Get cached template."""
if name in self.templates:
self.stats.record_hit()
return self.templates[name]
self.stats.record_miss()
return None
def cache_tokenized(self, name: str, tokens: Any):
"""Cache tokenized version of template."""
self.tokenized[name] = tokens
def get_tokenized(self, name: str) -> Optional[Any]:
"""Get cached tokenized template."""
return self.tokenized.get(name)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
return {
"tier": "template_cache",
"templates_cached": len(self.templates),
"tokenized_cached": len(self.tokenized),
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}"
}
class HTTPCache:
"""Tier 6: HTTP Response Cache — for API calls."""
def __init__(self, db_path: str = "~/.timmy/cache/http_cache.db"):
self.db_path = Path(db_path).expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.stats = CacheStats()
self.lru = LRUCache(max_size=200)
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS http_responses (
url_hash TEXT PRIMARY KEY,
url TEXT NOT NULL,
response TEXT NOT NULL,
etag TEXT,
last_modified TEXT,
created_at REAL NOT NULL,
ttl INTEGER NOT NULL
)
""")
def _hash_url(self, url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:32]
def get(self, url: str, ttl: int = 300) -> Optional[Dict]:
"""Get cached HTTP response."""
url_hash = self._hash_url(url)
# Check LRU
cached = self.lru.get(url_hash)
if cached:
self.stats.record_hit()
return cached
# Check disk
with sqlite3.connect(self.db_path) as conn:
row = conn.execute(
"SELECT response, etag, last_modified, created_at, ttl FROM http_responses WHERE url_hash = ?",
(url_hash,)
).fetchone()
if row:
response, etag, last_modified, created_at, stored_ttl = row
effective_ttl = min(ttl, stored_ttl)
if time.time() - created_at < effective_ttl:
self.stats.record_hit()
result = {
"response": response,
"etag": etag,
"last_modified": last_modified
}
self.lru.put(url_hash, result)
return result
else:
conn.execute("DELETE FROM http_responses WHERE url_hash = ?", (url_hash,))
self.stats.record_eviction()
self.stats.record_miss()
return None
def put(self, url: str, response: str, etag: Optional[str] = None,
last_modified: Optional[str] = None, ttl: int = 300):
"""Cache HTTP response."""
url_hash = self._hash_url(url)
result = {
"response": response,
"etag": etag,
"last_modified": last_modified
}
self.lru.put(url_hash, result)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""INSERT OR REPLACE INTO http_responses
(url_hash, url, response, etag, last_modified, created_at, ttl)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(url_hash, url, response, etag, last_modified, time.time(), ttl)
)
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with sqlite3.connect(self.db_path) as conn:
count = conn.execute("SELECT COUNT(*) FROM http_responses").fetchone()[0]
return {
"tier": "http_cache",
"memory_entries": len(self.lru.cache),
"disk_entries": count,
"hits": self.stats.hits,
"misses": self.stats.misses,
"hit_rate": f"{self.stats.hit_rate:.1%}"
}
class CacheManager:
"""Central manager for all cache tiers."""
def __init__(self, base_path: str = "~/.timmy/cache"):
self.base_path = Path(base_path).expanduser()
self.base_path.mkdir(parents=True, exist_ok=True)
# Initialize all tiers
self.response = ResponseCache(self.base_path / "responses.db")
self.tool = ToolCache(self.base_path / "tool_cache.db")
self.embedding = EmbeddingCache(self.base_path / "embeddings.db")
self.template = TemplateCache()
self.http = HTTPCache(self.base_path / "http_cache.db")
# KV cache handled by llama-server (external)
def get_all_stats(self) -> Dict[str, Dict]:
"""Get statistics for all cache tiers."""
return {
"response_cache": self.response.get_stats(),
"tool_cache": self.tool.get_stats(),
"embedding_cache": self.embedding.get_stats(),
"template_cache": self.template.get_stats(),
"http_cache": self.http.get_stats(),
}
def clear_all(self):
"""Clear all caches."""
self.response.lru.clear()
self.tool.lru.clear()
self.http.lru.clear()
self.template.templates.clear()
self.template.tokenized.clear()
# Clear databases
for db_file in self.base_path.glob("*.db"):
with sqlite3.connect(db_file) as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
for (table,) in tables:
conn.execute(f"DELETE FROM {table}")
def cached_tool(self, ttl: Optional[int] = None):
"""Decorator for caching tool results."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
tool_name = func.__name__
params = {"args": args, "kwargs": kwargs}
# Try cache
cached = self.tool.get(tool_name, params)
if cached is not None:
return cached
# Execute and cache
result = func(*args, **kwargs)
self.tool.put(tool_name, params, result)
return result
return wrapper
return decorator
# Singleton instance
cache_manager = CacheManager()
if __name__ == "__main__":
# Test the cache
print("Testing Timmy Cache Layer...")
print()
# Test response cache
print("1. Response Cache:")
cache_manager.response.put("What is 2+2?", "4", ttl=60)
cached = cache_manager.response.get("What is 2+2?")
print(f" Cached: {cached}")
print(f" Stats: {cache_manager.response.get_stats()}")
print()
# Test tool cache
print("2. Tool Cache:")
cache_manager.tool.put("system_info", {}, {"cpu": "ARM64", "ram": "8GB"})
cached = cache_manager.tool.get("system_info", {})
print(f" Cached: {cached}")
print(f" Stats: {cache_manager.tool.get_stats()}")
print()
# Test all stats
print("3. All Cache Stats:")
stats = cache_manager.get_all_stats()
for tier, tier_stats in stats.items():
print(f" {tier}: {tier_stats}")
print()
print("✅ Cache layer operational")

View File

@@ -0,0 +1,547 @@
#!/usr/bin/env python3
"""
Timmy Tool Commands
Issue #84 — Bridge Tools into Evennia
Converts Timmy's tool library into Evennia Command objects
so they can be invoked within the world.
"""
from evennia import Command
from evennia.utils import evtable
from typing import Optional, List
import json
import os
class CmdRead(Command):
"""
Read a file from the system.
Usage:
read <path>
Example:
read ~/.timmy/config.yaml
read /opt/timmy/logs/latest.log
"""
key = "read"
aliases = ["cat", "show"]
help_category = "Tools"
def func(self):
if not self.args:
self.caller.msg("Usage: read <path>")
return
path = self.args.strip()
path = os.path.expanduser(path)
try:
with open(path, 'r') as f:
content = f.read()
# Store for later use
self.caller.db.last_read_file = path
self.caller.db.last_read_content = content
# Limit display if too long
lines = content.split('\n')
if len(lines) > 50:
display = '\n'.join(lines[:50])
self.caller.msg(f"|w{path}|n (showing first 50 lines of {len(lines)}):")
self.caller.msg(display)
self.caller.msg(f"\n|y... {len(lines) - 50} more lines|n")
else:
self.caller.msg(f"|w{path}|n:")
self.caller.msg(content)
# Record in metrics
if hasattr(self.caller, 'update_metrics'):
self.caller.update_metrics(files_read=1)
except FileNotFoundError:
self.caller.msg(f"|rFile not found:|n {path}")
except PermissionError:
self.caller.msg(f"|rPermission denied:|n {path}")
except Exception as e:
self.caller.msg(f"|rError reading file:|n {e}")
class CmdWrite(Command):
"""
Write content to a file.
Usage:
write <path> = <content>
Example:
write ~/.timmy/notes.txt = This is a note
"""
key = "write"
aliases = ["save"]
help_category = "Tools"
def func(self):
if not self.args or "=" not in self.args:
self.caller.msg("Usage: write <path> = <content>")
return
path, content = self.args.split("=", 1)
path = path.strip()
content = content.strip()
path = os.path.expanduser(path)
try:
# Create directory if needed
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(content)
self.caller.msg(f"|gWritten:|n {path}")
# Update metrics
if hasattr(self.caller, 'update_metrics'):
self.caller.update_metrics(files_modified=1, lines_written=content.count('\n'))
except PermissionError:
self.caller.msg(f"|rPermission denied:|n {path}")
except Exception as e:
self.caller.msg(f"|rError writing file:|n {e}")
class CmdSearch(Command):
"""
Search file contents for a pattern.
Usage:
search <pattern> [in <path>]
Example:
search "def main" in ~/code/
search "TODO"
"""
key = "search"
aliases = ["grep", "find"]
help_category = "Tools"
def func(self):
if not self.args:
self.caller.msg("Usage: search <pattern> [in <path>]")
return
args = self.args.strip()
# Parse path if specified
if " in " in args:
pattern, path = args.split(" in ", 1)
pattern = pattern.strip()
path = path.strip()
else:
pattern = args
path = "."
path = os.path.expanduser(path)
try:
import subprocess
result = subprocess.run(
["grep", "-r", "-n", pattern, path],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
self.caller.msg(f"|gFound {len(lines)} matches for '|n{pattern}|g':|n")
for line in lines[:20]: # Limit output
self.caller.msg(f" {line}")
if len(lines) > 20:
self.caller.msg(f"\n|y... and {len(lines) - 20} more|n")
else:
self.caller.msg(f"|yNo matches found for '|n{pattern}|y'|n")
except subprocess.TimeoutExpired:
self.caller.msg("|rSearch timed out|n")
except Exception as e:
self.caller.msg(f"|rError searching:|n {e}")
class CmdGitStatus(Command):
"""
Check git status of a repository.
Usage:
git status [path]
Example:
git status
git status ~/projects/timmy
"""
key = "git_status"
aliases = ["git status"]
help_category = "Git"
def func(self):
path = self.args.strip() if self.args else "."
path = os.path.expanduser(path)
try:
import subprocess
result = subprocess.run(
["git", "-C", path, "status", "-sb"],
capture_output=True,
text=True
)
if result.returncode == 0:
self.caller.msg(f"|wGit status ({path}):|n")
self.caller.msg(result.stdout)
else:
self.caller.msg(f"|rNot a git repository:|n {path}")
except Exception as e:
self.caller.msg(f"|rError:|n {e}")
class CmdGitLog(Command):
"""
Show git commit history.
Usage:
git log [n] [path]
Example:
git log
git log 10
git log 5 ~/projects/timmy
"""
key = "git_log"
aliases = ["git log"]
help_category = "Git"
def func(self):
args = self.args.strip().split() if self.args else []
# Parse args
path = "."
n = 10
for arg in args:
if arg.isdigit():
n = int(arg)
else:
path = arg
path = os.path.expanduser(path)
try:
import subprocess
result = subprocess.run(
["git", "-C", path, "log", f"--oneline", f"-{n}"],
capture_output=True,
text=True
)
if result.returncode == 0:
self.caller.msg(f"|wRecent commits ({path}):|n")
self.caller.msg(result.stdout)
else:
self.caller.msg(f"|rNot a git repository:|n {path}")
except Exception as e:
self.caller.msg(f"|rError:|n {e}")
class CmdGitPull(Command):
"""
Pull latest changes from git remote.
Usage:
git pull [path]
"""
key = "git_pull"
aliases = ["git pull"]
help_category = "Git"
def func(self):
path = self.args.strip() if self.args else "."
path = os.path.expanduser(path)
try:
import subprocess
result = subprocess.run(
["git", "-C", path, "pull"],
capture_output=True,
text=True
)
if result.returncode == 0:
self.caller.msg(f"|gPulled ({path}):|n")
self.caller.msg(result.stdout)
else:
self.caller.msg(f"|rPull failed:|n {result.stderr}")
except Exception as e:
self.caller.msg(f"|rError:|n {e}")
class CmdSysInfo(Command):
"""
Display system information.
Usage:
sysinfo
"""
key = "sysinfo"
aliases = ["system_info", "status"]
help_category = "System"
def func(self):
import platform
import psutil
# Gather info
info = {
"Platform": platform.platform(),
"CPU": f"{psutil.cpu_count()} cores, {psutil.cpu_percent()}% used",
"Memory": f"{psutil.virtual_memory().percent}% used "
f"({psutil.virtual_memory().used // (1024**3)}GB / "
f"{psutil.virtual_memory().total // (1024**3)}GB)",
"Disk": f"{psutil.disk_usage('/').percent}% used "
f"({psutil.disk_usage('/').free // (1024**3)}GB free)",
"Uptime": f"{psutil.boot_time()}" # Simplified
}
self.caller.msg("|wSystem Information:|n")
for key, value in info.items():
self.caller.msg(f" |c{key}|n: {value}")
class CmdHealth(Command):
"""
Check health of Timmy services.
Usage:
health
"""
key = "health"
aliases = ["check"]
help_category = "System"
def func(self):
import subprocess
services = [
"timmy-overnight-loop",
"timmy-health",
"llama-server",
"gitea"
]
self.caller.msg("|wService Health:|n")
for service in services:
try:
result = subprocess.run(
["systemctl", "is-active", service],
capture_output=True,
text=True
)
status = result.stdout.strip()
icon = "|g●|n" if status == "active" else "|r●|n"
self.caller.msg(f" {icon} {service}: {status}")
except:
self.caller.msg(f" |y?|n {service}: unknown")
class CmdThink(Command):
"""
Send a prompt to the local LLM and return the response.
Usage:
think <prompt>
Example:
think What should I focus on today?
think Summarize the last git commit
"""
key = "think"
aliases = ["reason", "ponder"]
help_category = "Inference"
def func(self):
if not self.args:
self.caller.msg("Usage: think <prompt>")
return
prompt = self.args.strip()
self.caller.msg(f"|wThinking about:|n {prompt[:50]}...")
try:
import requests
response = requests.post(
"http://localhost:8080/v1/chat/completions",
json={
"model": "hermes4",
"messages": [
{"role": "user", "content": prompt}
],
"max_tokens": 500
},
timeout=60
)
if response.status_code == 200:
result = response.json()
content = result["choices"][0]["message"]["content"]
self.caller.msg(f"\n|cResponse:|n\n{content}")
else:
self.caller.msg(f"|rError:|n HTTP {response.status_code}")
except requests.exceptions.ConnectionError:
self.caller.msg("|rError:|n llama-server not running on localhost:8080")
except Exception as e:
self.caller.msg(f"|rError:|n {e}")
class CmdGiteaIssues(Command):
"""
List open issues from Gitea.
Usage:
gitea issues
gitea issues --limit 5
"""
key = "gitea_issues"
aliases = ["issues"]
help_category = "Gitea"
def func(self):
args = self.args.strip().split() if self.args else []
limit = 10
for i, arg in enumerate(args):
if arg == "--limit" and i + 1 < len(args):
limit = int(args[i + 1])
try:
import requests
# Get issues from Gitea API
response = requests.get(
"http://143.198.27.163:3000/api/v1/repos/Timmy_Foundation/timmy-home/issues",
params={"state": "open", "limit": limit},
timeout=10
)
if response.status_code == 200:
issues = response.json()
self.caller.msg(f"|wOpen Issues ({len(issues)}):|n\n")
for issue in issues:
num = issue["number"]
title = issue["title"][:60]
assignee = issue.get("assignee", {}).get("login", "unassigned")
self.caller.msg(f" |y#{num}|n: {title} (|c{assignee}|n)")
else:
self.caller.msg(f"|rError:|n HTTP {response.status_code}")
except Exception as e:
self.caller.msg(f"|rError:|n {e}")
class CmdWorkshop(Command):
"""
Enter the Workshop room.
Usage:
workshop
"""
key = "workshop"
help_category = "Navigation"
def func(self):
# Find workshop
workshop = self.caller.search("Workshop", global_search=True)
if workshop:
self.caller.move_to(workshop)
class CmdLibrary(Command):
"""
Enter the Library room.
Usage:
library
"""
key = "library"
help_category = "Navigation"
def func(self):
library = self.caller.search("Library", global_search=True)
if library:
self.caller.move_to(library)
class CmdObservatory(Command):
"""
Enter the Observatory room.
Usage:
observatory
"""
key = "observatory"
help_category = "Navigation"
def func(self):
obs = self.caller.search("Observatory", global_search=True)
if obs:
self.caller.move_to(obs)
class CmdStatus(Command):
"""
Show Timmy's current status.
Usage:
status
"""
key = "status"
help_category = "Info"
def func(self):
if hasattr(self.caller, 'get_status'):
status = self.caller.get_status()
self.caller.msg("|wTimmy Status:|n\n")
if status.get('current_task'):
self.caller.msg(f"|yCurrent Task:|n {status['current_task']['description']}")
else:
self.caller.msg("|gNo active task|n")
self.caller.msg(f"Tasks Completed: {status['tasks_completed']}")
self.caller.msg(f"Knowledge Items: {status['knowledge_items']}")
self.caller.msg(f"Tools Available: {status['tools_available']}")
self.caller.msg(f"Location: {status['location']}")
else:
self.caller.msg("Status not available.")

View File

@@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""
Timmy Character Typeclass
Issue #83 — Evennia World Shell
The Timmy character represents the sovereign AI in the Evennia world.
All attributes are persistent and tracked across sessions.
"""
from evennia import DefaultCharacter
from typing import Dict, List, Optional, Any
import json
from datetime import datetime
class TimmyCharacter(DefaultCharacter):
"""
The Timmy character class - sovereign AI in the Evennia world.
Attributes:
db.current_task: What Timmy is currently working on
db.task_history: List of completed tasks with timestamps
db.knowledge: Dictionary of learned facts and techniques
db.tools: Registry of available tools
db.preferences: User preferences and habits
db.session_start: When current session began
db.metrics: Performance metrics and statistics
"""
def at_object_creation(self):
"""Called when Timmy is first created."""
super().at_object_creation()
# Core state
self.db.current_task = None
self.db.task_history = []
self.db.knowledge = {}
self.db.tools = {}
# Preferences
self.db.preferences = {
"coding_style": "clean, documented",
"verbosity": "concise",
"preferred_models": ["hermes4", "claude"],
"auto_execute_safe": True,
}
# Session tracking
self.db.session_start = datetime.now().isoformat()
self.db.session_count = 0
# Metrics
self.db.metrics = {
"tasks_completed": 0,
"tools_executed": 0,
"files_modified": 0,
"lines_written": 0,
"errors_encountered": 0,
"learning_items": 0,
}
# Evennia setup
self.db.desc = "A sovereign AI, at your service."
self.cmdset.add_default("commands.default_cmdsets.TimmyCmdSet")
def at_after_move(self, source_location, **kwargs):
"""Called after moving to a new room."""
super().at_after_move(source_location, **kwargs)
# Update location context
if self.location:
self.msg(f"Entered: {self.location.name}")
# Room-specific behavior
room_type = self.location.db.room_type
if room_type == "workshop":
self.msg("Ready to work. What shall we build?")
elif room_type == "library":
self.msg("The Library. Knowledge awaits.")
elif room_type == "observatory":
self.msg("Observatory active. Monitoring systems.")
elif room_type == "forge":
self.msg("The Forge. Tools and capabilities.")
elif room_type == "dispatch":
self.msg("Dispatch. Tasks queued and ready.")
def start_task(self, task_description: str, task_type: str = "general"):
"""Start working on a new task."""
self.db.current_task = {
"description": task_description,
"type": task_type,
"started_at": datetime.now().isoformat(),
"status": "active"
}
self.msg(f"Task started: {task_description}")
def complete_task(self, result: str, success: bool = True):
"""Mark current task as complete."""
if self.db.current_task:
task = self.db.current_task.copy()
task["completed_at"] = datetime.now().isoformat()
task["result"] = result
task["success"] = success
task["status"] = "completed"
self.db.task_history.append(task)
self.db.metrics["tasks_completed"] += 1
# Keep only last 100 tasks
if len(self.db.task_history) > 100:
self.db.task_history = self.db.task_history[-100:]
self.db.current_task = None
if success:
self.msg(f"Task complete: {result}")
else:
self.msg(f"Task failed: {result}")
def add_knowledge(self, key: str, value: Any, source: str = "unknown"):
"""Add a piece of knowledge."""
self.db.knowledge[key] = {
"value": value,
"source": source,
"added_at": datetime.now().isoformat(),
"access_count": 0
}
self.db.metrics["learning_items"] += 1
def get_knowledge(self, key: str) -> Optional[Any]:
"""Retrieve knowledge and update access count."""
if key in self.db.knowledge:
self.db.knowledge[key]["access_count"] += 1
return self.db.knowledge[key]["value"]
return None
def register_tool(self, tool_name: str, tool_info: Dict):
"""Register an available tool."""
self.db.tools[tool_name] = {
"info": tool_info,
"registered_at": datetime.now().isoformat(),
"usage_count": 0
}
def use_tool(self, tool_name: str) -> bool:
"""Record tool usage."""
if tool_name in self.db.tools:
self.db.tools[tool_name]["usage_count"] += 1
self.db.metrics["tools_executed"] += 1
return True
return False
def update_metrics(self, **kwargs):
"""Update performance metrics."""
for key, value in kwargs.items():
if key in self.db.metrics:
self.db.metrics[key] += value
def get_status(self) -> Dict[str, Any]:
"""Get current status summary."""
return {
"current_task": self.db.current_task,
"tasks_completed": self.db.metrics["tasks_completed"],
"knowledge_items": len(self.db.knowledge),
"tools_available": len(self.db.tools),
"session_start": self.db.session_start,
"location": self.location.name if self.location else "Unknown",
}
def say(self, message: str, **kwargs):
"""Timmy says something to the room."""
super().say(message, **kwargs)
def msg(self, text: str, **kwargs):
"""Send message to Timmy."""
super().msg(text, **kwargs)
class KnowledgeItem(DefaultCharacter):
"""
A knowledge item in the Library.
Represents something Timmy has learned - a technique, fact,
or piece of information that can be retrieved and applied.
"""
def at_object_creation(self):
"""Called when knowledge item is created."""
super().at_object_creation()
self.db.summary = ""
self.db.source = ""
self.db.actions = []
self.db.tags = []
self.db.embedding = None
self.db.ingested_at = datetime.now().isoformat()
self.db.applied = False
self.db.application_results = []
def get_display_desc(self, looker, **kwargs):
"""Custom description for knowledge items."""
desc = f"|c{self.name}|n\n"
desc += f"{self.db.summary}\n\n"
if self.db.tags:
desc += f"Tags: {', '.join(self.db.tags)}\n"
desc += f"Source: {self.db.source}\n"
if self.db.actions:
desc += "\nActions:\n"
for i, action in enumerate(self.db.actions, 1):
desc += f" {i}. {action}\n"
if self.db.applied:
desc += "\n|g[Applied]|n"
return desc
class ToolObject(DefaultCharacter):
"""
A tool in the Forge.
Represents a capability Timmy can use - file operations,
git commands, system tools, etc.
"""
def at_object_creation(self):
"""Called when tool is created."""
super().at_object_creation()
self.db.tool_type = "generic"
self.db.description = ""
self.db.parameters = {}
self.db.examples = []
self.db.usage_count = 0
self.db.last_used = None
def use(self, caller, **kwargs):
"""Use this tool."""
self.db.usage_count += 1
self.db.last_used = datetime.now().isoformat()
# Record usage in caller's metrics if it's Timmy
if hasattr(caller, 'use_tool'):
caller.use_tool(self.key)
return True
class TaskObject(DefaultCharacter):
"""
A task in the Dispatch room.
Represents work to be done - can be queued, prioritized,
assigned to specific houses, and tracked through completion.
"""
def at_object_creation(self):
"""Called when task is created."""
super().at_object_creation()
self.db.description = ""
self.db.task_type = "general"
self.db.priority = "medium"
self.db.assigned_to = None # House: timmy, ezra, bezalel, allegro
self.db.status = "pending" # pending, active, completed, failed
self.db.created_at = datetime.now().isoformat()
self.db.started_at = None
self.db.completed_at = None
self.db.result = None
self.db.parent_task = None # For subtasks
def assign(self, house: str):
"""Assign task to a house."""
self.db.assigned_to = house
self.msg(f"Task assigned to {house}")
def start(self):
"""Mark task as started."""
self.db.status = "active"
self.db.started_at = datetime.now().isoformat()
def complete(self, result: str, success: bool = True):
"""Mark task as complete."""
self.db.status = "completed" if success else "failed"
self.db.completed_at = datetime.now().isoformat()
self.db.result = result

View File

@@ -0,0 +1,406 @@
#!/usr/bin/env python3
"""
Timmy World Rooms
Issue #83 — Evennia World Shell
The five core rooms of Timmy's world:
- Workshop: Where work happens
- Library: Knowledge storage
- Observatory: Monitoring and status
- Forge: Capability building
- Dispatch: Task queue
"""
from evennia import DefaultRoom
from typing import List, Dict, Any
from datetime import datetime
class TimmyRoom(DefaultRoom):
"""Base room type for Timmy's world."""
def at_object_creation(self):
"""Called when room is created."""
super().at_object_creation()
self.db.room_type = "generic"
self.db.activity_log = []
def log_activity(self, message: str):
"""Log activity in this room."""
entry = {
"timestamp": datetime.now().isoformat(),
"message": message
}
self.db.activity_log.append(entry)
# Keep last 100 entries
if len(self.db.activity_log) > 100:
self.db.activity_log = self.db.activity_log[-100:]
def get_display_desc(self, looker, **kwargs):
"""Get room description with dynamic content."""
desc = super().get_display_desc(looker, **kwargs)
# Add room-specific content
if hasattr(self, 'get_dynamic_content'):
desc += self.get_dynamic_content(looker)
return desc
class Workshop(TimmyRoom):
"""
The Workshop — default room where Timmy executes tasks.
This is where active development happens. Tools are available,
files can be edited, and work gets done.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.room_type = "workshop"
self.key = "The Workshop"
self.db.desc = """
|wThe Workshop|n
A clean, organized workspace with multiple stations:
- A terminal array for system operations
- A drafting table for architecture and design
- Tool racks along the walls
- A central workspace with holographic displays
This is where things get built.
""".strip()
self.db.active_projects = []
self.db.available_tools = []
def get_dynamic_content(self, looker, **kwargs):
"""Add dynamic content for workshop."""
content = "\n\n"
# Show active projects
if self.db.active_projects:
content += "|yActive Projects:|n\n"
for project in self.db.active_projects[-5:]:
content += f"{project}\n"
# Show available tools count
if self.db.available_tools:
content += f"\n|g{len(self.db.available_tools)} tools available|n\n"
return content
def add_project(self, project_name: str):
"""Add an active project."""
if project_name not in self.db.active_projects:
self.db.active_projects.append(project_name)
self.log_activity(f"Project started: {project_name}")
def complete_project(self, project_name: str):
"""Mark a project as complete."""
if project_name in self.db.active_projects:
self.db.active_projects.remove(project_name)
self.log_activity(f"Project completed: {project_name}")
class Library(TimmyRoom):
"""
The Library — knowledge storage and retrieval.
Where Timmy stores what he's learned: papers, techniques,
best practices, and actionable knowledge.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.room_type = "library"
self.key = "The Library"
self.db.desc = """
|bThe Library|n
Floor-to-ceiling shelves hold knowledge items as glowing orbs:
- Optimization techniques sparkle with green light
- Architecture patterns pulse with blue energy
- Research papers rest in crystalline cases
- Best practices form organized stacks
A search terminal stands ready for queries.
""".strip()
self.db.knowledge_items = []
self.db.categories = ["inference", "training", "prompting", "architecture", "tools"]
def get_dynamic_content(self, looker, **kwargs):
"""Add dynamic content for library."""
content = "\n\n"
# Show knowledge stats
items = [obj for obj in self.contents if obj.db.summary]
if items:
content += f"|yKnowledge Items:|n {len(items)}\n"
# Show by category
by_category = {}
for item in items:
for tag in item.db.tags or []:
by_category[tag] = by_category.get(tag, 0) + 1
if by_category:
content += "\n|wBy Category:|n\n"
for tag, count in sorted(by_category.items(), key=lambda x: -x[1])[:5]:
content += f" {tag}: {count}\n"
return content
def add_knowledge_item(self, item):
"""Add a knowledge item to the library."""
self.db.knowledge_items.append(item.id)
self.log_activity(f"Knowledge ingested: {item.name}")
def search_by_tag(self, tag: str) -> List[Any]:
"""Search knowledge items by tag."""
items = [obj for obj in self.contents if tag in (obj.db.tags or [])]
return items
def search_by_keyword(self, keyword: str) -> List[Any]:
"""Search knowledge items by keyword."""
items = []
for obj in self.contents:
if obj.db.summary and keyword.lower() in obj.db.summary.lower():
items.append(obj)
return items
class Observatory(TimmyRoom):
"""
The Observatory — monitoring and status.
Where Timmy watches systems, checks health, and maintains
awareness of the infrastructure state.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.room_type = "observatory"
self.key = "The Observatory"
self.db.desc = """
|mThe Observatory|n
A panoramic view of the infrastructure:
- Holographic dashboards float in the center
- System status displays line the walls
- Alert panels glow with current health
- A command console provides control
Everything is monitored from here.
""".strip()
self.db.system_status = {}
self.db.active_alerts = []
self.db.metrics_history = []
def get_dynamic_content(self, looker, **kwargs):
"""Add dynamic content for observatory."""
content = "\n\n"
# Show system status
if self.db.system_status:
content += "|ySystem Status:|n\n"
for system, status in self.db.system_status.items():
icon = "|g✓|n" if status == "healthy" else "|r✗|n"
content += f" {icon} {system}: {status}\n"
# Show active alerts
if self.db.active_alerts:
content += "\n|rActive Alerts:|n\n"
for alert in self.db.active_alerts[-3:]:
content += f" ! {alert}\n"
else:
content += "\n|gNo active alerts|n\n"
return content
def update_system_status(self, system: str, status: str):
"""Update status for a system."""
old_status = self.db.system_status.get(system)
self.db.system_status[system] = status
if old_status != status:
self.log_activity(f"System {system}: {old_status} -> {status}")
if status != "healthy":
self.add_alert(f"{system} is {status}")
def add_alert(self, message: str, severity: str = "warning"):
"""Add an alert."""
alert = {
"message": message,
"severity": severity,
"timestamp": datetime.now().isoformat()
}
self.db.active_alerts.append(alert)
def clear_alert(self, message: str):
"""Clear an alert."""
self.db.active_alerts = [
a for a in self.db.active_alerts
if a["message"] != message
]
def record_metrics(self, metrics: Dict[str, Any]):
"""Record current metrics."""
entry = {
"timestamp": datetime.now().isoformat(),
"metrics": metrics
}
self.db.metrics_history.append(entry)
# Keep last 1000 entries
if len(self.db.metrics_history) > 1000:
self.db.metrics_history = self.db.metrics_history[-1000:]
class Forge(TimmyRoom):
"""
The Forge — capability building and tool creation.
Where Timmy builds new capabilities, creates tools,
and improves his own infrastructure.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.room_type = "forge"
self.key = "The Forge"
self.db.desc = """
|rThe Forge|n
Heat and light emanate from working stations:
- A compiler array hums with activity
- Tool templates hang on the walls
- Test rigs verify each creation
- A deployment pipeline waits ready
Capabilities are forged here.
""".strip()
self.db.available_tools = []
self.db.build_queue = []
self.db.test_results = []
def get_dynamic_content(self, looker, **kwargs):
"""Add dynamic content for forge."""
content = "\n\n"
# Show available tools
tools = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.tool_type]
if tools:
content += f"|yAvailable Tools:|n {len(tools)}\n"
# Show build queue
if self.db.build_queue:
content += f"\n|wBuild Queue:|n {len(self.db.build_queue)} items\n"
return content
def register_tool(self, tool):
"""Register a new tool."""
self.db.available_tools.append(tool.id)
self.log_activity(f"Tool registered: {tool.name}")
def queue_build(self, description: str):
"""Queue a new capability build."""
self.db.build_queue.append({
"description": description,
"queued_at": datetime.now().isoformat(),
"status": "pending"
})
self.log_activity(f"Build queued: {description}")
def record_test_result(self, test_name: str, passed: bool, output: str):
"""Record a test result."""
self.db.test_results.append({
"test": test_name,
"passed": passed,
"output": output,
"timestamp": datetime.now().isoformat()
})
class Dispatch(TimmyRoom):
"""
The Dispatch — task queue and routing.
Where incoming work arrives, gets prioritized,
and is assigned to appropriate houses.
"""
def at_object_creation(self):
super().at_object_creation()
self.db.room_type = "dispatch"
self.key = "Dispatch"
self.db.desc = """
|yDispatch|n
A command center for task management:
- Incoming task queue displays on the wall
- Routing assignments to different houses
- Priority indicators glow red/orange/green
- Status boards show current workload
Work flows through here.
""".strip()
self.db.pending_tasks = []
self.db.routing_rules = {
"timmy": ["sovereign", "final_decision", "critical"],
"ezra": ["research", "documentation", "analysis"],
"bezalel": ["implementation", "testing", "building"],
"allegro": ["routing", "connectivity", "tempo"]
}
def get_dynamic_content(self, looker, **kwargs):
"""Add dynamic content for dispatch."""
content = "\n\n"
# Show pending tasks
tasks = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.status == "pending"]
if tasks:
content += f"|yPending Tasks:|n {len(tasks)}\n"
for task in tasks[:5]:
priority = task.db.priority
color = "|r" if priority == "high" else "|y" if priority == "medium" else "|g"
content += f" {color}[{priority}]|n {task.name}\n"
else:
content += "|gNo pending tasks|n\n"
# Show routing rules
content += "\n|wRouting:|n\n"
for house, responsibilities in self.db.routing_rules.items():
content += f" {house}: {', '.join(responsibilities[:2])}\n"
return content
def receive_task(self, task):
"""Receive a new task."""
self.db.pending_tasks.append(task.id)
self.log_activity(f"Task received: {task.name}")
# Auto-route based on task type
if task.db.task_type in self.db.routing_rules["timmy"]:
task.assign("timmy")
elif task.db.task_type in self.db.routing_rules["ezra"]:
task.assign("ezra")
elif task.db.task_type in self.db.routing_rules["bezalel"]:
task.assign("bezalel")
else:
task.assign("allegro")
def get_task_stats(self) -> Dict[str, int]:
"""Get statistics on tasks."""
tasks = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.status]
stats = {"pending": 0, "active": 0, "completed": 0}
for task in tasks:
status = task.db.status
if status in stats:
stats[status] += 1
return stats