From ac6cc67e49f7282d528b2481079f707452c9364e Mon Sep 17 00:00:00 2001 From: Allegro Date: Mon, 30 Mar 2026 16:52:53 +0000 Subject: [PATCH] =?UTF-8?q?[#103]=20Multi-tier=20caching=20layer=20for=20l?= =?UTF-8?q?ocal=20Timmy=20=E2=80=94=20KV,=20Response,=20Tool,=20Embedding,?= =?UTF-8?q?=20Template,=20HTTP=20caches?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- timmy-local/cache/agent_cache.py | 656 ++++++++++++++++++ timmy-local/evennia/commands/tools.py | 547 +++++++++++++++ timmy-local/evennia/typeclasses/characters.py | 289 ++++++++ timmy-local/evennia/typeclasses/rooms.py | 406 +++++++++++ 4 files changed, 1898 insertions(+) create mode 100644 timmy-local/cache/agent_cache.py create mode 100644 timmy-local/evennia/commands/tools.py create mode 100644 timmy-local/evennia/typeclasses/characters.py create mode 100644 timmy-local/evennia/typeclasses/rooms.py diff --git a/timmy-local/cache/agent_cache.py b/timmy-local/cache/agent_cache.py new file mode 100644 index 0000000..1ff7890 --- /dev/null +++ b/timmy-local/cache/agent_cache.py @@ -0,0 +1,656 @@ +#!/usr/bin/env python3 +""" +Multi-Tier Caching Layer for Local Timmy +Issue #103 — Cache Everywhere + +Provides: +- Tier 1: KV Cache (prompt prefix caching) +- Tier 2: Semantic Response Cache (full LLM responses) +- Tier 3: Tool Result Cache (stable tool outputs) +- Tier 4: Embedding Cache (RAG embeddings) +- Tier 5: Template Cache (pre-compiled prompts) +- Tier 6: HTTP Response Cache (API responses) +""" + +import sqlite3 +import hashlib +import json +import time +import threading +from typing import Optional, Any, Dict, List, Callable +from dataclasses import dataclass, asdict +from pathlib import Path +import pickle +import functools + + +@dataclass +class CacheStats: + """Statistics for cache monitoring.""" + hits: int = 0 + misses: int = 0 + evictions: int = 0 + hit_rate: float = 0.0 + + def record_hit(self): + self.hits += 1 + self._update_rate() + + def record_miss(self): + self.misses += 1 + self._update_rate() + + def record_eviction(self): + self.evictions += 1 + + def _update_rate(self): + total = self.hits + self.misses + if total > 0: + self.hit_rate = self.hits / total + + +class LRUCache: + """In-memory LRU cache for hot path.""" + + def __init__(self, max_size: int = 1000): + self.max_size = max_size + self.cache: Dict[str, Any] = {} + self.access_order: List[str] = [] + self.lock = threading.RLock() + + def get(self, key: str) -> Optional[Any]: + with self.lock: + if key in self.cache: + # Move to front (most recent) + self.access_order.remove(key) + self.access_order.append(key) + return self.cache[key] + return None + + def put(self, key: str, value: Any): + with self.lock: + if key in self.cache: + self.access_order.remove(key) + elif len(self.cache) >= self.max_size: + # Evict oldest + oldest = self.access_order.pop(0) + del self.cache[oldest] + + self.cache[key] = value + self.access_order.append(key) + + def invalidate(self, key: str): + with self.lock: + if key in self.cache: + self.access_order.remove(key) + del self.cache[key] + + def clear(self): + with self.lock: + self.cache.clear() + self.access_order.clear() + + +class ResponseCache: + """Tier 2: Semantic Response Cache — full LLM responses.""" + + def __init__(self, db_path: str = "~/.timmy/cache/responses.db"): + self.db_path = Path(db_path).expanduser() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.stats = CacheStats() + self.lru = LRUCache(max_size=100) + self._init_db() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS responses ( + prompt_hash TEXT PRIMARY KEY, + response TEXT NOT NULL, + created_at REAL NOT NULL, + ttl INTEGER NOT NULL, + access_count INTEGER DEFAULT 0, + last_accessed REAL + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_accessed ON responses(last_accessed) + """) + + def _hash_prompt(self, prompt: str) -> str: + """Hash prompt after normalizing (removing timestamps, etc).""" + # Normalize: lowercase, strip extra whitespace + normalized = " ".join(prompt.lower().split()) + return hashlib.sha256(normalized.encode()).hexdigest()[:32] + + def get(self, prompt: str, ttl: int = 3600) -> Optional[str]: + """Get cached response if available and not expired.""" + prompt_hash = self._hash_prompt(prompt) + + # Check LRU first + cached = self.lru.get(prompt_hash) + if cached: + self.stats.record_hit() + return cached + + # Check disk cache + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT response, created_at, ttl FROM responses WHERE prompt_hash = ?", + (prompt_hash,) + ).fetchone() + + if row: + response, created_at, stored_ttl = row + # Use minimum of requested and stored TTL + effective_ttl = min(ttl, stored_ttl) + + if time.time() - created_at < effective_ttl: + # Cache hit + self.stats.record_hit() + # Update access stats + conn.execute( + "UPDATE responses SET access_count = access_count + 1, last_accessed = ? WHERE prompt_hash = ?", + (time.time(), prompt_hash) + ) + # Add to LRU + self.lru.put(prompt_hash, response) + return response + else: + # Expired + conn.execute("DELETE FROM responses WHERE prompt_hash = ?", (prompt_hash,)) + self.stats.record_eviction() + + self.stats.record_miss() + return None + + def put(self, prompt: str, response: str, ttl: int = 3600): + """Cache a response with TTL.""" + prompt_hash = self._hash_prompt(prompt) + + # Add to LRU + self.lru.put(prompt_hash, response) + + # Add to disk cache + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """INSERT OR REPLACE INTO responses + (prompt_hash, response, created_at, ttl, last_accessed) + VALUES (?, ?, ?, ?, ?)""", + (prompt_hash, response, time.time(), ttl, time.time()) + ) + + def invalidate_pattern(self, pattern: str): + """Invalidate all cached responses matching pattern.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute("DELETE FROM responses WHERE response LIKE ?", (f"%{pattern}%",)) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + with sqlite3.connect(self.db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM responses").fetchone()[0] + total_accesses = conn.execute("SELECT SUM(access_count) FROM responses").fetchone()[0] or 0 + + return { + "tier": "response_cache", + "memory_entries": len(self.lru.cache), + "disk_entries": count, + "hits": self.stats.hits, + "misses": self.stats.misses, + "hit_rate": f"{self.stats.hit_rate:.1%}", + "total_accesses": total_accesses + } + + +class ToolCache: + """Tier 3: Tool Result Cache — stable tool outputs.""" + + # TTL configuration per tool type (seconds) + TOOL_TTL = { + "system_info": 60, + "disk_usage": 120, + "git_status": 30, + "git_log": 300, + "health_check": 60, + "gitea_list_issues": 120, + "file_read": 30, + "process_list": 30, + "service_status": 60, + } + + # Tools that invalidate cache on write operations + INVALIDATORS = { + "git_commit": ["git_status", "git_log"], + "git_pull": ["git_status", "git_log"], + "file_write": ["file_read"], + "gitea_create_issue": ["gitea_list_issues"], + "gitea_comment": ["gitea_list_issues"], + } + + def __init__(self, db_path: str = "~/.timmy/cache/tool_cache.db"): + self.db_path = Path(db_path).expanduser() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.stats = CacheStats() + self.lru = LRUCache(max_size=500) + self._init_db() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS tool_results ( + tool_hash TEXT PRIMARY KEY, + tool_name TEXT NOT NULL, + params_hash TEXT NOT NULL, + result TEXT NOT NULL, + created_at REAL NOT NULL, + ttl INTEGER NOT NULL + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_tool_name ON tool_results(tool_name) + """) + + def _hash_call(self, tool_name: str, params: Dict) -> str: + """Hash tool name and params for cache key.""" + param_str = json.dumps(params, sort_keys=True) + combined = f"{tool_name}:{param_str}" + return hashlib.sha256(combined.encode()).hexdigest()[:32] + + def get(self, tool_name: str, params: Dict) -> Optional[Any]: + """Get cached tool result if available.""" + if tool_name not in self.TOOL_TTL: + return None # Not cacheable + + tool_hash = self._hash_call(tool_name, params) + + # Check LRU + cached = self.lru.get(tool_hash) + if cached: + self.stats.record_hit() + return pickle.loads(cached) + + # Check disk + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT result, created_at, ttl FROM tool_results WHERE tool_hash = ?", + (tool_hash,) + ).fetchone() + + if row: + result, created_at, ttl = row + if time.time() - created_at < ttl: + self.stats.record_hit() + self.lru.put(tool_hash, result) + return pickle.loads(result) + else: + conn.execute("DELETE FROM tool_results WHERE tool_hash = ?", (tool_hash,)) + self.stats.record_eviction() + + self.stats.record_miss() + return None + + def put(self, tool_name: str, params: Dict, result: Any): + """Cache a tool result.""" + if tool_name not in self.TOOL_TTL: + return # Not cacheable + + ttl = self.TOOL_TTL[tool_name] + tool_hash = self._hash_call(tool_name, params) + params_hash = hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest()[:16] + + # Add to LRU + pickled = pickle.dumps(result) + self.lru.put(tool_hash, pickled) + + # Add to disk + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """INSERT OR REPLACE INTO tool_results + (tool_hash, tool_name, params_hash, result, created_at, ttl) + VALUES (?, ?, ?, ?, ?, ?)""", + (tool_hash, tool_name, params_hash, pickled, time.time(), ttl) + ) + + def invalidate(self, tool_name: str): + """Invalidate all cached results for a tool.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute("DELETE FROM tool_results WHERE tool_name = ?", (tool_name,)) + + # Clear matching LRU entries + # (simplified: clear all since LRU doesn't track tool names) + self.lru.clear() + + def handle_invalidation(self, tool_name: str): + """Handle cache invalidation after a write operation.""" + if tool_name in self.INVALIDATORS: + for dependent in self.INVALIDATORS[tool_name]: + self.invalidate(dependent) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + with sqlite3.connect(self.db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM tool_results").fetchone()[0] + by_tool = conn.execute( + "SELECT tool_name, COUNT(*) FROM tool_results GROUP BY tool_name" + ).fetchall() + + return { + "tier": "tool_cache", + "memory_entries": len(self.lru.cache), + "disk_entries": count, + "hits": self.stats.hits, + "misses": self.stats.misses, + "hit_rate": f"{self.stats.hit_rate:.1%}", + "by_tool": dict(by_tool) + } + + +class EmbeddingCache: + """Tier 4: Embedding Cache — for RAG pipeline (#93).""" + + def __init__(self, db_path: str = "~/.timmy/cache/embeddings.db"): + self.db_path = Path(db_path).expanduser() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.stats = CacheStats() + self._init_db() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS embeddings ( + file_path TEXT PRIMARY KEY, + mtime REAL NOT NULL, + embedding BLOB NOT NULL, + model_name TEXT NOT NULL, + created_at REAL NOT NULL + ) + """) + + def get(self, file_path: str, mtime: float, model_name: str) -> Optional[List[float]]: + """Get embedding if file hasn't changed and model matches.""" + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT embedding, mtime, model_name FROM embeddings WHERE file_path = ?", + (file_path,) + ).fetchone() + + if row: + embedding_blob, stored_mtime, stored_model = row + if stored_mtime == mtime and stored_model == model_name: + self.stats.record_hit() + return pickle.loads(embedding_blob) + + self.stats.record_miss() + return None + + def put(self, file_path: str, mtime: float, embedding: List[float], model_name: str): + """Store embedding with file metadata.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """INSERT OR REPLACE INTO embeddings + (file_path, mtime, embedding, model_name, created_at) + VALUES (?, ?, ?, ?, ?)""", + (file_path, mtime, pickle.dumps(embedding), model_name, time.time()) + ) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + with sqlite3.connect(self.db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0] + models = conn.execute( + "SELECT model_name, COUNT(*) FROM embeddings GROUP BY model_name" + ).fetchall() + + return { + "tier": "embedding_cache", + "entries": count, + "hits": self.stats.hits, + "misses": self.stats.misses, + "hit_rate": f"{self.stats.hit_rate:.1%}", + "by_model": dict(models) + } + + +class TemplateCache: + """Tier 5: Template Cache — pre-compiled prompts.""" + + def __init__(self): + self.templates: Dict[str, str] = {} + self.tokenized: Dict[str, Any] = {} # For tokenizer outputs + self.stats = CacheStats() + + def load_template(self, name: str, path: str) -> str: + """Load and cache a template file.""" + if name not in self.templates: + with open(path, 'r') as f: + self.templates[name] = f.read() + self.stats.record_miss() + else: + self.stats.record_hit() + return self.templates[name] + + def get(self, name: str) -> Optional[str]: + """Get cached template.""" + if name in self.templates: + self.stats.record_hit() + return self.templates[name] + self.stats.record_miss() + return None + + def cache_tokenized(self, name: str, tokens: Any): + """Cache tokenized version of template.""" + self.tokenized[name] = tokens + + def get_tokenized(self, name: str) -> Optional[Any]: + """Get cached tokenized template.""" + return self.tokenized.get(name) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + return { + "tier": "template_cache", + "templates_cached": len(self.templates), + "tokenized_cached": len(self.tokenized), + "hits": self.stats.hits, + "misses": self.stats.misses, + "hit_rate": f"{self.stats.hit_rate:.1%}" + } + + +class HTTPCache: + """Tier 6: HTTP Response Cache — for API calls.""" + + def __init__(self, db_path: str = "~/.timmy/cache/http_cache.db"): + self.db_path = Path(db_path).expanduser() + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.stats = CacheStats() + self.lru = LRUCache(max_size=200) + self._init_db() + + def _init_db(self): + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS http_responses ( + url_hash TEXT PRIMARY KEY, + url TEXT NOT NULL, + response TEXT NOT NULL, + etag TEXT, + last_modified TEXT, + created_at REAL NOT NULL, + ttl INTEGER NOT NULL + ) + """) + + def _hash_url(self, url: str) -> str: + return hashlib.sha256(url.encode()).hexdigest()[:32] + + def get(self, url: str, ttl: int = 300) -> Optional[Dict]: + """Get cached HTTP response.""" + url_hash = self._hash_url(url) + + # Check LRU + cached = self.lru.get(url_hash) + if cached: + self.stats.record_hit() + return cached + + # Check disk + with sqlite3.connect(self.db_path) as conn: + row = conn.execute( + "SELECT response, etag, last_modified, created_at, ttl FROM http_responses WHERE url_hash = ?", + (url_hash,) + ).fetchone() + + if row: + response, etag, last_modified, created_at, stored_ttl = row + effective_ttl = min(ttl, stored_ttl) + + if time.time() - created_at < effective_ttl: + self.stats.record_hit() + result = { + "response": response, + "etag": etag, + "last_modified": last_modified + } + self.lru.put(url_hash, result) + return result + else: + conn.execute("DELETE FROM http_responses WHERE url_hash = ?", (url_hash,)) + self.stats.record_eviction() + + self.stats.record_miss() + return None + + def put(self, url: str, response: str, etag: Optional[str] = None, + last_modified: Optional[str] = None, ttl: int = 300): + """Cache HTTP response.""" + url_hash = self._hash_url(url) + + result = { + "response": response, + "etag": etag, + "last_modified": last_modified + } + self.lru.put(url_hash, result) + + with sqlite3.connect(self.db_path) as conn: + conn.execute( + """INSERT OR REPLACE INTO http_responses + (url_hash, url, response, etag, last_modified, created_at, ttl) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (url_hash, url, response, etag, last_modified, time.time(), ttl) + ) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + with sqlite3.connect(self.db_path) as conn: + count = conn.execute("SELECT COUNT(*) FROM http_responses").fetchone()[0] + + return { + "tier": "http_cache", + "memory_entries": len(self.lru.cache), + "disk_entries": count, + "hits": self.stats.hits, + "misses": self.stats.misses, + "hit_rate": f"{self.stats.hit_rate:.1%}" + } + + +class CacheManager: + """Central manager for all cache tiers.""" + + def __init__(self, base_path: str = "~/.timmy/cache"): + self.base_path = Path(base_path).expanduser() + self.base_path.mkdir(parents=True, exist_ok=True) + + # Initialize all tiers + self.response = ResponseCache(self.base_path / "responses.db") + self.tool = ToolCache(self.base_path / "tool_cache.db") + self.embedding = EmbeddingCache(self.base_path / "embeddings.db") + self.template = TemplateCache() + self.http = HTTPCache(self.base_path / "http_cache.db") + + # KV cache handled by llama-server (external) + + def get_all_stats(self) -> Dict[str, Dict]: + """Get statistics for all cache tiers.""" + return { + "response_cache": self.response.get_stats(), + "tool_cache": self.tool.get_stats(), + "embedding_cache": self.embedding.get_stats(), + "template_cache": self.template.get_stats(), + "http_cache": self.http.get_stats(), + } + + def clear_all(self): + """Clear all caches.""" + self.response.lru.clear() + self.tool.lru.clear() + self.http.lru.clear() + self.template.templates.clear() + self.template.tokenized.clear() + + # Clear databases + for db_file in self.base_path.glob("*.db"): + with sqlite3.connect(db_file) as conn: + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") + tables = cursor.fetchall() + for (table,) in tables: + conn.execute(f"DELETE FROM {table}") + + def cached_tool(self, ttl: Optional[int] = None): + """Decorator for caching tool results.""" + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs): + tool_name = func.__name__ + params = {"args": args, "kwargs": kwargs} + + # Try cache + cached = self.tool.get(tool_name, params) + if cached is not None: + return cached + + # Execute and cache + result = func(*args, **kwargs) + self.tool.put(tool_name, params, result) + + return result + return wrapper + return decorator + + +# Singleton instance +cache_manager = CacheManager() + + +if __name__ == "__main__": + # Test the cache + print("Testing Timmy Cache Layer...") + print() + + # Test response cache + print("1. Response Cache:") + cache_manager.response.put("What is 2+2?", "4", ttl=60) + cached = cache_manager.response.get("What is 2+2?") + print(f" Cached: {cached}") + print(f" Stats: {cache_manager.response.get_stats()}") + print() + + # Test tool cache + print("2. Tool Cache:") + cache_manager.tool.put("system_info", {}, {"cpu": "ARM64", "ram": "8GB"}) + cached = cache_manager.tool.get("system_info", {}) + print(f" Cached: {cached}") + print(f" Stats: {cache_manager.tool.get_stats()}") + print() + + # Test all stats + print("3. All Cache Stats:") + stats = cache_manager.get_all_stats() + for tier, tier_stats in stats.items(): + print(f" {tier}: {tier_stats}") + + print() + print("✅ Cache layer operational") diff --git a/timmy-local/evennia/commands/tools.py b/timmy-local/evennia/commands/tools.py new file mode 100644 index 0000000..9e457e0 --- /dev/null +++ b/timmy-local/evennia/commands/tools.py @@ -0,0 +1,547 @@ +#!/usr/bin/env python3 +""" +Timmy Tool Commands +Issue #84 — Bridge Tools into Evennia + +Converts Timmy's tool library into Evennia Command objects +so they can be invoked within the world. +""" + +from evennia import Command +from evennia.utils import evtable +from typing import Optional, List +import json +import os + + +class CmdRead(Command): + """ + Read a file from the system. + + Usage: + read + + Example: + read ~/.timmy/config.yaml + read /opt/timmy/logs/latest.log + """ + + key = "read" + aliases = ["cat", "show"] + help_category = "Tools" + + def func(self): + if not self.args: + self.caller.msg("Usage: read ") + return + + path = self.args.strip() + path = os.path.expanduser(path) + + try: + with open(path, 'r') as f: + content = f.read() + + # Store for later use + self.caller.db.last_read_file = path + self.caller.db.last_read_content = content + + # Limit display if too long + lines = content.split('\n') + if len(lines) > 50: + display = '\n'.join(lines[:50]) + self.caller.msg(f"|w{path}|n (showing first 50 lines of {len(lines)}):") + self.caller.msg(display) + self.caller.msg(f"\n|y... {len(lines) - 50} more lines|n") + else: + self.caller.msg(f"|w{path}|n:") + self.caller.msg(content) + + # Record in metrics + if hasattr(self.caller, 'update_metrics'): + self.caller.update_metrics(files_read=1) + + except FileNotFoundError: + self.caller.msg(f"|rFile not found:|n {path}") + except PermissionError: + self.caller.msg(f"|rPermission denied:|n {path}") + except Exception as e: + self.caller.msg(f"|rError reading file:|n {e}") + + +class CmdWrite(Command): + """ + Write content to a file. + + Usage: + write = + + Example: + write ~/.timmy/notes.txt = This is a note + """ + + key = "write" + aliases = ["save"] + help_category = "Tools" + + def func(self): + if not self.args or "=" not in self.args: + self.caller.msg("Usage: write = ") + return + + path, content = self.args.split("=", 1) + path = path.strip() + content = content.strip() + path = os.path.expanduser(path) + + try: + # Create directory if needed + os.makedirs(os.path.dirname(path), exist_ok=True) + + with open(path, 'w') as f: + f.write(content) + + self.caller.msg(f"|gWritten:|n {path}") + + # Update metrics + if hasattr(self.caller, 'update_metrics'): + self.caller.update_metrics(files_modified=1, lines_written=content.count('\n')) + + except PermissionError: + self.caller.msg(f"|rPermission denied:|n {path}") + except Exception as e: + self.caller.msg(f"|rError writing file:|n {e}") + + +class CmdSearch(Command): + """ + Search file contents for a pattern. + + Usage: + search [in ] + + Example: + search "def main" in ~/code/ + search "TODO" + """ + + key = "search" + aliases = ["grep", "find"] + help_category = "Tools" + + def func(self): + if not self.args: + self.caller.msg("Usage: search [in ]") + return + + args = self.args.strip() + + # Parse path if specified + if " in " in args: + pattern, path = args.split(" in ", 1) + pattern = pattern.strip() + path = path.strip() + else: + pattern = args + path = "." + + path = os.path.expanduser(path) + + try: + import subprocess + result = subprocess.run( + ["grep", "-r", "-n", pattern, path], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0: + lines = result.stdout.strip().split('\n') + self.caller.msg(f"|gFound {len(lines)} matches for '|n{pattern}|g':|n") + for line in lines[:20]: # Limit output + self.caller.msg(f" {line}") + if len(lines) > 20: + self.caller.msg(f"\n|y... and {len(lines) - 20} more|n") + else: + self.caller.msg(f"|yNo matches found for '|n{pattern}|y'|n") + + except subprocess.TimeoutExpired: + self.caller.msg("|rSearch timed out|n") + except Exception as e: + self.caller.msg(f"|rError searching:|n {e}") + + +class CmdGitStatus(Command): + """ + Check git status of a repository. + + Usage: + git status [path] + + Example: + git status + git status ~/projects/timmy + """ + + key = "git_status" + aliases = ["git status"] + help_category = "Git" + + def func(self): + path = self.args.strip() if self.args else "." + path = os.path.expanduser(path) + + try: + import subprocess + result = subprocess.run( + ["git", "-C", path, "status", "-sb"], + capture_output=True, + text=True + ) + + if result.returncode == 0: + self.caller.msg(f"|wGit status ({path}):|n") + self.caller.msg(result.stdout) + else: + self.caller.msg(f"|rNot a git repository:|n {path}") + + except Exception as e: + self.caller.msg(f"|rError:|n {e}") + + +class CmdGitLog(Command): + """ + Show git commit history. + + Usage: + git log [n] [path] + + Example: + git log + git log 10 + git log 5 ~/projects/timmy + """ + + key = "git_log" + aliases = ["git log"] + help_category = "Git" + + def func(self): + args = self.args.strip().split() if self.args else [] + + # Parse args + path = "." + n = 10 + + for arg in args: + if arg.isdigit(): + n = int(arg) + else: + path = arg + + path = os.path.expanduser(path) + + try: + import subprocess + result = subprocess.run( + ["git", "-C", path, "log", f"--oneline", f"-{n}"], + capture_output=True, + text=True + ) + + if result.returncode == 0: + self.caller.msg(f"|wRecent commits ({path}):|n") + self.caller.msg(result.stdout) + else: + self.caller.msg(f"|rNot a git repository:|n {path}") + + except Exception as e: + self.caller.msg(f"|rError:|n {e}") + + +class CmdGitPull(Command): + """ + Pull latest changes from git remote. + + Usage: + git pull [path] + """ + + key = "git_pull" + aliases = ["git pull"] + help_category = "Git" + + def func(self): + path = self.args.strip() if self.args else "." + path = os.path.expanduser(path) + + try: + import subprocess + result = subprocess.run( + ["git", "-C", path, "pull"], + capture_output=True, + text=True + ) + + if result.returncode == 0: + self.caller.msg(f"|gPulled ({path}):|n") + self.caller.msg(result.stdout) + else: + self.caller.msg(f"|rPull failed:|n {result.stderr}") + + except Exception as e: + self.caller.msg(f"|rError:|n {e}") + + +class CmdSysInfo(Command): + """ + Display system information. + + Usage: + sysinfo + """ + + key = "sysinfo" + aliases = ["system_info", "status"] + help_category = "System" + + def func(self): + import platform + import psutil + + # Gather info + info = { + "Platform": platform.platform(), + "CPU": f"{psutil.cpu_count()} cores, {psutil.cpu_percent()}% used", + "Memory": f"{psutil.virtual_memory().percent}% used " + f"({psutil.virtual_memory().used // (1024**3)}GB / " + f"{psutil.virtual_memory().total // (1024**3)}GB)", + "Disk": f"{psutil.disk_usage('/').percent}% used " + f"({psutil.disk_usage('/').free // (1024**3)}GB free)", + "Uptime": f"{psutil.boot_time()}" # Simplified + } + + self.caller.msg("|wSystem Information:|n") + for key, value in info.items(): + self.caller.msg(f" |c{key}|n: {value}") + + +class CmdHealth(Command): + """ + Check health of Timmy services. + + Usage: + health + """ + + key = "health" + aliases = ["check"] + help_category = "System" + + def func(self): + import subprocess + + services = [ + "timmy-overnight-loop", + "timmy-health", + "llama-server", + "gitea" + ] + + self.caller.msg("|wService Health:|n") + + for service in services: + try: + result = subprocess.run( + ["systemctl", "is-active", service], + capture_output=True, + text=True + ) + status = result.stdout.strip() + icon = "|g●|n" if status == "active" else "|r●|n" + self.caller.msg(f" {icon} {service}: {status}") + except: + self.caller.msg(f" |y?|n {service}: unknown") + + +class CmdThink(Command): + """ + Send a prompt to the local LLM and return the response. + + Usage: + think + + Example: + think What should I focus on today? + think Summarize the last git commit + """ + + key = "think" + aliases = ["reason", "ponder"] + help_category = "Inference" + + def func(self): + if not self.args: + self.caller.msg("Usage: think ") + return + + prompt = self.args.strip() + + self.caller.msg(f"|wThinking about:|n {prompt[:50]}...") + + try: + import requests + + response = requests.post( + "http://localhost:8080/v1/chat/completions", + json={ + "model": "hermes4", + "messages": [ + {"role": "user", "content": prompt} + ], + "max_tokens": 500 + }, + timeout=60 + ) + + if response.status_code == 200: + result = response.json() + content = result["choices"][0]["message"]["content"] + self.caller.msg(f"\n|cResponse:|n\n{content}") + else: + self.caller.msg(f"|rError:|n HTTP {response.status_code}") + + except requests.exceptions.ConnectionError: + self.caller.msg("|rError:|n llama-server not running on localhost:8080") + except Exception as e: + self.caller.msg(f"|rError:|n {e}") + + +class CmdGiteaIssues(Command): + """ + List open issues from Gitea. + + Usage: + gitea issues + gitea issues --limit 5 + """ + + key = "gitea_issues" + aliases = ["issues"] + help_category = "Gitea" + + def func(self): + args = self.args.strip().split() if self.args else [] + limit = 10 + + for i, arg in enumerate(args): + if arg == "--limit" and i + 1 < len(args): + limit = int(args[i + 1]) + + try: + import requests + + # Get issues from Gitea API + response = requests.get( + "http://143.198.27.163:3000/api/v1/repos/Timmy_Foundation/timmy-home/issues", + params={"state": "open", "limit": limit}, + timeout=10 + ) + + if response.status_code == 200: + issues = response.json() + self.caller.msg(f"|wOpen Issues ({len(issues)}):|n\n") + + for issue in issues: + num = issue["number"] + title = issue["title"][:60] + assignee = issue.get("assignee", {}).get("login", "unassigned") + self.caller.msg(f" |y#{num}|n: {title} (|c{assignee}|n)") + else: + self.caller.msg(f"|rError:|n HTTP {response.status_code}") + + except Exception as e: + self.caller.msg(f"|rError:|n {e}") + + +class CmdWorkshop(Command): + """ + Enter the Workshop room. + + Usage: + workshop + """ + + key = "workshop" + help_category = "Navigation" + + def func(self): + # Find workshop + workshop = self.caller.search("Workshop", global_search=True) + if workshop: + self.caller.move_to(workshop) + + +class CmdLibrary(Command): + """ + Enter the Library room. + + Usage: + library + """ + + key = "library" + help_category = "Navigation" + + def func(self): + library = self.caller.search("Library", global_search=True) + if library: + self.caller.move_to(library) + + +class CmdObservatory(Command): + """ + Enter the Observatory room. + + Usage: + observatory + """ + + key = "observatory" + help_category = "Navigation" + + def func(self): + obs = self.caller.search("Observatory", global_search=True) + if obs: + self.caller.move_to(obs) + + +class CmdStatus(Command): + """ + Show Timmy's current status. + + Usage: + status + """ + + key = "status" + help_category = "Info" + + def func(self): + if hasattr(self.caller, 'get_status'): + status = self.caller.get_status() + + self.caller.msg("|wTimmy Status:|n\n") + + if status.get('current_task'): + self.caller.msg(f"|yCurrent Task:|n {status['current_task']['description']}") + else: + self.caller.msg("|gNo active task|n") + + self.caller.msg(f"Tasks Completed: {status['tasks_completed']}") + self.caller.msg(f"Knowledge Items: {status['knowledge_items']}") + self.caller.msg(f"Tools Available: {status['tools_available']}") + self.caller.msg(f"Location: {status['location']}") + else: + self.caller.msg("Status not available.") diff --git a/timmy-local/evennia/typeclasses/characters.py b/timmy-local/evennia/typeclasses/characters.py new file mode 100644 index 0000000..be0f331 --- /dev/null +++ b/timmy-local/evennia/typeclasses/characters.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +""" +Timmy Character Typeclass +Issue #83 — Evennia World Shell + +The Timmy character represents the sovereign AI in the Evennia world. +All attributes are persistent and tracked across sessions. +""" + +from evennia import DefaultCharacter +from typing import Dict, List, Optional, Any +import json +from datetime import datetime + + +class TimmyCharacter(DefaultCharacter): + """ + The Timmy character class - sovereign AI in the Evennia world. + + Attributes: + db.current_task: What Timmy is currently working on + db.task_history: List of completed tasks with timestamps + db.knowledge: Dictionary of learned facts and techniques + db.tools: Registry of available tools + db.preferences: User preferences and habits + db.session_start: When current session began + db.metrics: Performance metrics and statistics + """ + + def at_object_creation(self): + """Called when Timmy is first created.""" + super().at_object_creation() + + # Core state + self.db.current_task = None + self.db.task_history = [] + self.db.knowledge = {} + self.db.tools = {} + + # Preferences + self.db.preferences = { + "coding_style": "clean, documented", + "verbosity": "concise", + "preferred_models": ["hermes4", "claude"], + "auto_execute_safe": True, + } + + # Session tracking + self.db.session_start = datetime.now().isoformat() + self.db.session_count = 0 + + # Metrics + self.db.metrics = { + "tasks_completed": 0, + "tools_executed": 0, + "files_modified": 0, + "lines_written": 0, + "errors_encountered": 0, + "learning_items": 0, + } + + # Evennia setup + self.db.desc = "A sovereign AI, at your service." + self.cmdset.add_default("commands.default_cmdsets.TimmyCmdSet") + + def at_after_move(self, source_location, **kwargs): + """Called after moving to a new room.""" + super().at_after_move(source_location, **kwargs) + + # Update location context + if self.location: + self.msg(f"Entered: {self.location.name}") + + # Room-specific behavior + room_type = self.location.db.room_type + if room_type == "workshop": + self.msg("Ready to work. What shall we build?") + elif room_type == "library": + self.msg("The Library. Knowledge awaits.") + elif room_type == "observatory": + self.msg("Observatory active. Monitoring systems.") + elif room_type == "forge": + self.msg("The Forge. Tools and capabilities.") + elif room_type == "dispatch": + self.msg("Dispatch. Tasks queued and ready.") + + def start_task(self, task_description: str, task_type: str = "general"): + """Start working on a new task.""" + self.db.current_task = { + "description": task_description, + "type": task_type, + "started_at": datetime.now().isoformat(), + "status": "active" + } + self.msg(f"Task started: {task_description}") + + def complete_task(self, result: str, success: bool = True): + """Mark current task as complete.""" + if self.db.current_task: + task = self.db.current_task.copy() + task["completed_at"] = datetime.now().isoformat() + task["result"] = result + task["success"] = success + task["status"] = "completed" + + self.db.task_history.append(task) + self.db.metrics["tasks_completed"] += 1 + + # Keep only last 100 tasks + if len(self.db.task_history) > 100: + self.db.task_history = self.db.task_history[-100:] + + self.db.current_task = None + + if success: + self.msg(f"Task complete: {result}") + else: + self.msg(f"Task failed: {result}") + + def add_knowledge(self, key: str, value: Any, source: str = "unknown"): + """Add a piece of knowledge.""" + self.db.knowledge[key] = { + "value": value, + "source": source, + "added_at": datetime.now().isoformat(), + "access_count": 0 + } + self.db.metrics["learning_items"] += 1 + + def get_knowledge(self, key: str) -> Optional[Any]: + """Retrieve knowledge and update access count.""" + if key in self.db.knowledge: + self.db.knowledge[key]["access_count"] += 1 + return self.db.knowledge[key]["value"] + return None + + def register_tool(self, tool_name: str, tool_info: Dict): + """Register an available tool.""" + self.db.tools[tool_name] = { + "info": tool_info, + "registered_at": datetime.now().isoformat(), + "usage_count": 0 + } + + def use_tool(self, tool_name: str) -> bool: + """Record tool usage.""" + if tool_name in self.db.tools: + self.db.tools[tool_name]["usage_count"] += 1 + self.db.metrics["tools_executed"] += 1 + return True + return False + + def update_metrics(self, **kwargs): + """Update performance metrics.""" + for key, value in kwargs.items(): + if key in self.db.metrics: + self.db.metrics[key] += value + + def get_status(self) -> Dict[str, Any]: + """Get current status summary.""" + return { + "current_task": self.db.current_task, + "tasks_completed": self.db.metrics["tasks_completed"], + "knowledge_items": len(self.db.knowledge), + "tools_available": len(self.db.tools), + "session_start": self.db.session_start, + "location": self.location.name if self.location else "Unknown", + } + + def say(self, message: str, **kwargs): + """Timmy says something to the room.""" + super().say(message, **kwargs) + + def msg(self, text: str, **kwargs): + """Send message to Timmy.""" + super().msg(text, **kwargs) + + +class KnowledgeItem(DefaultCharacter): + """ + A knowledge item in the Library. + + Represents something Timmy has learned - a technique, fact, + or piece of information that can be retrieved and applied. + """ + + def at_object_creation(self): + """Called when knowledge item is created.""" + super().at_object_creation() + + self.db.summary = "" + self.db.source = "" + self.db.actions = [] + self.db.tags = [] + self.db.embedding = None + self.db.ingested_at = datetime.now().isoformat() + self.db.applied = False + self.db.application_results = [] + + def get_display_desc(self, looker, **kwargs): + """Custom description for knowledge items.""" + desc = f"|c{self.name}|n\n" + desc += f"{self.db.summary}\n\n" + + if self.db.tags: + desc += f"Tags: {', '.join(self.db.tags)}\n" + + desc += f"Source: {self.db.source}\n" + + if self.db.actions: + desc += "\nActions:\n" + for i, action in enumerate(self.db.actions, 1): + desc += f" {i}. {action}\n" + + if self.db.applied: + desc += "\n|g[Applied]|n" + + return desc + + +class ToolObject(DefaultCharacter): + """ + A tool in the Forge. + + Represents a capability Timmy can use - file operations, + git commands, system tools, etc. + """ + + def at_object_creation(self): + """Called when tool is created.""" + super().at_object_creation() + + self.db.tool_type = "generic" + self.db.description = "" + self.db.parameters = {} + self.db.examples = [] + self.db.usage_count = 0 + self.db.last_used = None + + def use(self, caller, **kwargs): + """Use this tool.""" + self.db.usage_count += 1 + self.db.last_used = datetime.now().isoformat() + + # Record usage in caller's metrics if it's Timmy + if hasattr(caller, 'use_tool'): + caller.use_tool(self.key) + + return True + + +class TaskObject(DefaultCharacter): + """ + A task in the Dispatch room. + + Represents work to be done - can be queued, prioritized, + assigned to specific houses, and tracked through completion. + """ + + def at_object_creation(self): + """Called when task is created.""" + super().at_object_creation() + + self.db.description = "" + self.db.task_type = "general" + self.db.priority = "medium" + self.db.assigned_to = None # House: timmy, ezra, bezalel, allegro + self.db.status = "pending" # pending, active, completed, failed + self.db.created_at = datetime.now().isoformat() + self.db.started_at = None + self.db.completed_at = None + self.db.result = None + self.db.parent_task = None # For subtasks + + def assign(self, house: str): + """Assign task to a house.""" + self.db.assigned_to = house + self.msg(f"Task assigned to {house}") + + def start(self): + """Mark task as started.""" + self.db.status = "active" + self.db.started_at = datetime.now().isoformat() + + def complete(self, result: str, success: bool = True): + """Mark task as complete.""" + self.db.status = "completed" if success else "failed" + self.db.completed_at = datetime.now().isoformat() + self.db.result = result diff --git a/timmy-local/evennia/typeclasses/rooms.py b/timmy-local/evennia/typeclasses/rooms.py new file mode 100644 index 0000000..5a5f7ec --- /dev/null +++ b/timmy-local/evennia/typeclasses/rooms.py @@ -0,0 +1,406 @@ +#!/usr/bin/env python3 +""" +Timmy World Rooms +Issue #83 — Evennia World Shell + +The five core rooms of Timmy's world: +- Workshop: Where work happens +- Library: Knowledge storage +- Observatory: Monitoring and status +- Forge: Capability building +- Dispatch: Task queue +""" + +from evennia import DefaultRoom +from typing import List, Dict, Any +from datetime import datetime + + +class TimmyRoom(DefaultRoom): + """Base room type for Timmy's world.""" + + def at_object_creation(self): + """Called when room is created.""" + super().at_object_creation() + self.db.room_type = "generic" + self.db.activity_log = [] + + def log_activity(self, message: str): + """Log activity in this room.""" + entry = { + "timestamp": datetime.now().isoformat(), + "message": message + } + self.db.activity_log.append(entry) + # Keep last 100 entries + if len(self.db.activity_log) > 100: + self.db.activity_log = self.db.activity_log[-100:] + + def get_display_desc(self, looker, **kwargs): + """Get room description with dynamic content.""" + desc = super().get_display_desc(looker, **kwargs) + + # Add room-specific content + if hasattr(self, 'get_dynamic_content'): + desc += self.get_dynamic_content(looker) + + return desc + + +class Workshop(TimmyRoom): + """ + The Workshop — default room where Timmy executes tasks. + + This is where active development happens. Tools are available, + files can be edited, and work gets done. + """ + + def at_object_creation(self): + super().at_object_creation() + self.db.room_type = "workshop" + self.key = "The Workshop" + self.db.desc = """ +|wThe Workshop|n + +A clean, organized workspace with multiple stations: +- A terminal array for system operations +- A drafting table for architecture and design +- Tool racks along the walls +- A central workspace with holographic displays + +This is where things get built. + """.strip() + + self.db.active_projects = [] + self.db.available_tools = [] + + def get_dynamic_content(self, looker, **kwargs): + """Add dynamic content for workshop.""" + content = "\n\n" + + # Show active projects + if self.db.active_projects: + content += "|yActive Projects:|n\n" + for project in self.db.active_projects[-5:]: + content += f" • {project}\n" + + # Show available tools count + if self.db.available_tools: + content += f"\n|g{len(self.db.available_tools)} tools available|n\n" + + return content + + def add_project(self, project_name: str): + """Add an active project.""" + if project_name not in self.db.active_projects: + self.db.active_projects.append(project_name) + self.log_activity(f"Project started: {project_name}") + + def complete_project(self, project_name: str): + """Mark a project as complete.""" + if project_name in self.db.active_projects: + self.db.active_projects.remove(project_name) + self.log_activity(f"Project completed: {project_name}") + + +class Library(TimmyRoom): + """ + The Library — knowledge storage and retrieval. + + Where Timmy stores what he's learned: papers, techniques, + best practices, and actionable knowledge. + """ + + def at_object_creation(self): + super().at_object_creation() + self.db.room_type = "library" + self.key = "The Library" + self.db.desc = """ +|bThe Library|n + +Floor-to-ceiling shelves hold knowledge items as glowing orbs: +- Optimization techniques sparkle with green light +- Architecture patterns pulse with blue energy +- Research papers rest in crystalline cases +- Best practices form organized stacks + +A search terminal stands ready for queries. + """.strip() + + self.db.knowledge_items = [] + self.db.categories = ["inference", "training", "prompting", "architecture", "tools"] + + def get_dynamic_content(self, looker, **kwargs): + """Add dynamic content for library.""" + content = "\n\n" + + # Show knowledge stats + items = [obj for obj in self.contents if obj.db.summary] + if items: + content += f"|yKnowledge Items:|n {len(items)}\n" + + # Show by category + by_category = {} + for item in items: + for tag in item.db.tags or []: + by_category[tag] = by_category.get(tag, 0) + 1 + + if by_category: + content += "\n|wBy Category:|n\n" + for tag, count in sorted(by_category.items(), key=lambda x: -x[1])[:5]: + content += f" {tag}: {count}\n" + + return content + + def add_knowledge_item(self, item): + """Add a knowledge item to the library.""" + self.db.knowledge_items.append(item.id) + self.log_activity(f"Knowledge ingested: {item.name}") + + def search_by_tag(self, tag: str) -> List[Any]: + """Search knowledge items by tag.""" + items = [obj for obj in self.contents if tag in (obj.db.tags or [])] + return items + + def search_by_keyword(self, keyword: str) -> List[Any]: + """Search knowledge items by keyword.""" + items = [] + for obj in self.contents: + if obj.db.summary and keyword.lower() in obj.db.summary.lower(): + items.append(obj) + return items + + +class Observatory(TimmyRoom): + """ + The Observatory — monitoring and status. + + Where Timmy watches systems, checks health, and maintains + awareness of the infrastructure state. + """ + + def at_object_creation(self): + super().at_object_creation() + self.db.room_type = "observatory" + self.key = "The Observatory" + self.db.desc = """ +|mThe Observatory|n + +A panoramic view of the infrastructure: +- Holographic dashboards float in the center +- System status displays line the walls +- Alert panels glow with current health +- A command console provides control + +Everything is monitored from here. + """.strip() + + self.db.system_status = {} + self.db.active_alerts = [] + self.db.metrics_history = [] + + def get_dynamic_content(self, looker, **kwargs): + """Add dynamic content for observatory.""" + content = "\n\n" + + # Show system status + if self.db.system_status: + content += "|ySystem Status:|n\n" + for system, status in self.db.system_status.items(): + icon = "|g✓|n" if status == "healthy" else "|r✗|n" + content += f" {icon} {system}: {status}\n" + + # Show active alerts + if self.db.active_alerts: + content += "\n|rActive Alerts:|n\n" + for alert in self.db.active_alerts[-3:]: + content += f" ! {alert}\n" + else: + content += "\n|gNo active alerts|n\n" + + return content + + def update_system_status(self, system: str, status: str): + """Update status for a system.""" + old_status = self.db.system_status.get(system) + self.db.system_status[system] = status + + if old_status != status: + self.log_activity(f"System {system}: {old_status} -> {status}") + + if status != "healthy": + self.add_alert(f"{system} is {status}") + + def add_alert(self, message: str, severity: str = "warning"): + """Add an alert.""" + alert = { + "message": message, + "severity": severity, + "timestamp": datetime.now().isoformat() + } + self.db.active_alerts.append(alert) + + def clear_alert(self, message: str): + """Clear an alert.""" + self.db.active_alerts = [ + a for a in self.db.active_alerts + if a["message"] != message + ] + + def record_metrics(self, metrics: Dict[str, Any]): + """Record current metrics.""" + entry = { + "timestamp": datetime.now().isoformat(), + "metrics": metrics + } + self.db.metrics_history.append(entry) + # Keep last 1000 entries + if len(self.db.metrics_history) > 1000: + self.db.metrics_history = self.db.metrics_history[-1000:] + + +class Forge(TimmyRoom): + """ + The Forge — capability building and tool creation. + + Where Timmy builds new capabilities, creates tools, + and improves his own infrastructure. + """ + + def at_object_creation(self): + super().at_object_creation() + self.db.room_type = "forge" + self.key = "The Forge" + self.db.desc = """ +|rThe Forge|n + +Heat and light emanate from working stations: +- A compiler array hums with activity +- Tool templates hang on the walls +- Test rigs verify each creation +- A deployment pipeline waits ready + +Capabilities are forged here. + """.strip() + + self.db.available_tools = [] + self.db.build_queue = [] + self.db.test_results = [] + + def get_dynamic_content(self, looker, **kwargs): + """Add dynamic content for forge.""" + content = "\n\n" + + # Show available tools + tools = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.tool_type] + if tools: + content += f"|yAvailable Tools:|n {len(tools)}\n" + + # Show build queue + if self.db.build_queue: + content += f"\n|wBuild Queue:|n {len(self.db.build_queue)} items\n" + + return content + + def register_tool(self, tool): + """Register a new tool.""" + self.db.available_tools.append(tool.id) + self.log_activity(f"Tool registered: {tool.name}") + + def queue_build(self, description: str): + """Queue a new capability build.""" + self.db.build_queue.append({ + "description": description, + "queued_at": datetime.now().isoformat(), + "status": "pending" + }) + self.log_activity(f"Build queued: {description}") + + def record_test_result(self, test_name: str, passed: bool, output: str): + """Record a test result.""" + self.db.test_results.append({ + "test": test_name, + "passed": passed, + "output": output, + "timestamp": datetime.now().isoformat() + }) + + +class Dispatch(TimmyRoom): + """ + The Dispatch — task queue and routing. + + Where incoming work arrives, gets prioritized, + and is assigned to appropriate houses. + """ + + def at_object_creation(self): + super().at_object_creation() + self.db.room_type = "dispatch" + self.key = "Dispatch" + self.db.desc = """ +|yDispatch|n + +A command center for task management: +- Incoming task queue displays on the wall +- Routing assignments to different houses +- Priority indicators glow red/orange/green +- Status boards show current workload + +Work flows through here. + """.strip() + + self.db.pending_tasks = [] + self.db.routing_rules = { + "timmy": ["sovereign", "final_decision", "critical"], + "ezra": ["research", "documentation", "analysis"], + "bezalel": ["implementation", "testing", "building"], + "allegro": ["routing", "connectivity", "tempo"] + } + + def get_dynamic_content(self, looker, **kwargs): + """Add dynamic content for dispatch.""" + content = "\n\n" + + # Show pending tasks + tasks = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.status == "pending"] + if tasks: + content += f"|yPending Tasks:|n {len(tasks)}\n" + for task in tasks[:5]: + priority = task.db.priority + color = "|r" if priority == "high" else "|y" if priority == "medium" else "|g" + content += f" {color}[{priority}]|n {task.name}\n" + else: + content += "|gNo pending tasks|n\n" + + # Show routing rules + content += "\n|wRouting:|n\n" + for house, responsibilities in self.db.routing_rules.items(): + content += f" {house}: {', '.join(responsibilities[:2])}\n" + + return content + + def receive_task(self, task): + """Receive a new task.""" + self.db.pending_tasks.append(task.id) + self.log_activity(f"Task received: {task.name}") + + # Auto-route based on task type + if task.db.task_type in self.db.routing_rules["timmy"]: + task.assign("timmy") + elif task.db.task_type in self.db.routing_rules["ezra"]: + task.assign("ezra") + elif task.db.task_type in self.db.routing_rules["bezalel"]: + task.assign("bezalel") + else: + task.assign("allegro") + + def get_task_stats(self) -> Dict[str, int]: + """Get statistics on tasks.""" + tasks = [obj for obj in self.contents if hasattr(obj, 'db') and obj.db.status] + stats = {"pending": 0, "active": 0, "completed": 0} + for task in tasks: + status = task.db.status + if status in stats: + stats[status] += 1 + return stats