From b8164e46b0be284302291401ef754c6b17fc64bf Mon Sep 17 00:00:00 2001 From: Alexander Whitestone <8633216+AlexanderWhitestone@users.noreply.github.com> Date: Sat, 7 Mar 2026 22:34:30 -0500 Subject: [PATCH] fix: remove dead swarm imports, add memory_write tool, and auto-prune on startup (#143) - Replace dead `from swarm` imports in tools_delegation and tools_intro with working implementations sourced from _PERSONAS - Add `memory_write` tool so the agent can actually persist memories when users ask it to remember something - Enhance `memory_search` to search both vault files AND the runtime vector store for cross-channel recall (Discord/web/Telegram) - Add memory management config: memory_prune_days, memory_prune_keep_facts, memory_vault_max_mb - Auto-prune old vector store entries and warn on vault size at startup - Update tests for new delegation agent list (mace removed) Co-authored-by: Trip T Co-authored-by: Claude Opus 4.6 --- src/config.py | 10 ++++ src/dashboard/app.py | 28 +++++++++ src/timmy/semantic_memory.py | 70 +++++++++++++++++++--- src/timmy/tools.py | 7 ++- src/timmy/tools_delegation/__init__.py | 80 +++++++++++--------------- src/timmy/tools_intro/__init__.py | 71 ++++++----------------- tests/timmy/test_tools_delegation.py | 17 ++++-- 7 files changed, 168 insertions(+), 115 deletions(-) diff --git a/src/config.py b/src/config.py index 358b210..d887c26 100644 --- a/src/config.py +++ b/src/config.py @@ -106,6 +106,16 @@ class Settings(BaseSettings): # In production, security settings are strictly enforced. timmy_env: Literal["development", "production"] = "development" + # ── Memory Management ────────────────────────────────────────────── + # Auto-prune vector store memories older than this many days on startup. + # Set to 0 to disable auto-pruning. + memory_prune_days: int = 90 + # When True, fact-type memories are kept even when older than the TTL. + memory_prune_keep_facts: bool = True + # Maximum size in MB for the memory/notes/ vault directory. + # When exceeded, a warning is logged. Set to 0 to disable. + memory_vault_max_mb: int = 100 + # ── Test / Diagnostics ───────────────────────────────────────────── # Skip loading heavy embedding models (for tests / low-memory envs). timmy_skip_embeddings: bool = False diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 0313e68..2e3b074 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -204,6 +204,34 @@ async def lifespan(app: FastAPI): if spark_engine.enabled: logger.info("Spark Intelligence active — event capture enabled") + # Auto-prune old vector store memories on startup + if settings.memory_prune_days > 0: + try: + from timmy.memory.vector_store import prune_memories + pruned = prune_memories( + older_than_days=settings.memory_prune_days, + keep_facts=settings.memory_prune_keep_facts, + ) + if pruned: + logger.info("Memory auto-prune: removed %d entries older than %d days", pruned, settings.memory_prune_days) + except Exception as exc: + logger.debug("Memory auto-prune skipped: %s", exc) + + # Warn if memory vault exceeds size limit + if settings.memory_vault_max_mb > 0: + try: + vault_path = Path(settings.repo_root) / "memory" / "notes" + if vault_path.exists(): + total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file()) + total_mb = total_bytes / (1024 * 1024) + if total_mb > settings.memory_vault_max_mb: + logger.warning( + "Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes", + total_mb, settings.memory_vault_max_mb, + ) + except Exception as exc: + logger.debug("Vault size check skipped: %s", exc) + # Start chat integrations in background chat_task = asyncio.create_task(_start_chat_integrations_background()) diff --git a/src/timmy/semantic_memory.py b/src/timmy/semantic_memory.py index 7c1befc..b69052f 100644 --- a/src/timmy/semantic_memory.py +++ b/src/timmy/semantic_memory.py @@ -324,7 +324,11 @@ memory_searcher = MemorySearcher() def memory_search(query: str, top_k: int = 5) -> str: - """Search past conversations and notes for relevant context. + """Search past conversations, notes, and stored facts for relevant context. + + Searches across both the vault (indexed markdown files) and the + runtime memory store (facts and conversation fragments stored via + memory_write). Args: query: What to search for (e.g. "Bitcoin strategy", "server setup"). @@ -336,12 +340,62 @@ def memory_search(query: str, top_k: int = 5) -> str: # Guard: model sometimes passes None for top_k if top_k is None: top_k = 5 - results = semantic_memory.search(query, top_k) - if not results: - return "No relevant memories found." - parts = [] - for content, score in results: + + parts: list[str] = [] + + # 1. Search semantic vault (indexed markdown files) + vault_results = semantic_memory.search(query, top_k) + for content, score in vault_results: if score < 0.2: continue - parts.append(f"[score {score:.2f}] {content[:300]}") - return "\n\n".join(parts) if parts else "No relevant memories found." + parts.append(f"[vault score {score:.2f}] {content[:300]}") + + # 2. Search runtime vector store (stored facts/conversations) + try: + from timmy.memory.vector_store import search_memories + runtime_results = search_memories(query, limit=top_k, min_relevance=0.2) + for entry in runtime_results: + label = entry.context_type or "memory" + parts.append(f"[{label}] {entry.content[:300]}") + except Exception as exc: + logger.debug("Vector store search unavailable: %s", exc) + + if not parts: + return "No relevant memories found." + return "\n\n".join(parts) + + +def memory_write(content: str, context_type: str = "fact") -> str: + """Store a piece of information in persistent memory. + + Use this tool when the user explicitly asks you to remember something. + Stored memories are searchable via memory_search across all channels + (web GUI, Discord, Telegram, etc.). + + Args: + content: The information to remember (e.g. a phrase, fact, or note). + context_type: Type of memory — "fact" for permanent facts, + "conversation" for conversation context, + "document" for document fragments. + + Returns: + Confirmation that the memory was stored. + """ + if not content or not content.strip(): + return "Nothing to store — content is empty." + + valid_types = ("fact", "conversation", "document") + if context_type not in valid_types: + context_type = "fact" + + try: + from timmy.memory.vector_store import store_memory + entry = store_memory( + content=content.strip(), + source="agent", + context_type=context_type, + ) + return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels." + except Exception as exc: + logger.error("Failed to write memory: %s", exc) + return f"Failed to store memory: {exc}" diff --git a/src/timmy/tools.py b/src/timmy/tools.py index 001ca60..c26733e 100644 --- a/src/timmy/tools.py +++ b/src/timmy/tools.py @@ -445,13 +445,14 @@ def create_full_toolkit(base_dir: str | Path | None = None): except Exception: logger.debug("Grok tool not available") - # Memory search - semantic recall + # Memory search and write — persistent recall across all channels try: - from timmy.semantic_memory import memory_search + from timmy.semantic_memory import memory_search, memory_write toolkit.register(memory_search, name="memory_search") + toolkit.register(memory_write, name="memory_write") except Exception: - logger.debug("Memory search not available") + logger.debug("Memory tools not available") # System introspection - query runtime environment (sovereign self-knowledge) try: diff --git a/src/timmy/tools_delegation/__init__.py b/src/timmy/tools_delegation/__init__.py index ab2e851..80d652c 100644 --- a/src/timmy/tools_delegation/__init__.py +++ b/src/timmy/tools_delegation/__init__.py @@ -1,7 +1,8 @@ """Timmy's delegation tools — submit tasks and list agents. -Coordinator removed. Tasks go through the task_queue, agents are -looked up in the registry. +Delegation uses the orchestrator's sub-agent system. The old swarm +task-queue was removed; delegation now records intent and returns the +target agent information. """ import logging @@ -9,9 +10,18 @@ from typing import Any logger = logging.getLogger(__name__) +# Agents available in the current orchestrator architecture +_VALID_AGENTS: dict[str, str] = { + "seer": "research", + "forge": "code", + "echo": "memory", + "helm": "routing", + "quill": "writing", +} + def delegate_task(agent_name: str, task_description: str, priority: str = "normal") -> dict[str, Any]: - """Delegate a task to another agent via the task queue. + """Record a delegation intent to another agent. Args: agent_name: Name of the agent to delegate to @@ -19,15 +29,14 @@ def delegate_task(agent_name: str, task_description: str, priority: str = "norma priority: Task priority - "low", "normal", "high" Returns: - Dict with task_id, status, and message + Dict with agent, status, and message """ - valid_agents = ["seer", "forge", "echo", "helm", "quill", "mace"] agent_name = agent_name.lower().strip() - if agent_name not in valid_agents: + if agent_name not in _VALID_AGENTS: return { "success": False, - "error": f"Unknown agent: {agent_name}. Valid agents: {', '.join(valid_agents)}", + "error": f"Unknown agent: {agent_name}. Valid agents: {', '.join(sorted(_VALID_AGENTS))}", "task_id": None, } @@ -35,61 +44,42 @@ def delegate_task(agent_name: str, task_description: str, priority: str = "norma if priority not in valid_priorities: priority = "normal" - try: - from swarm.task_queue.models import create_task + logger.info("Delegation intent: %s → %s (priority=%s)", agent_name, task_description[:80], priority) - task = create_task( - title=f"[Delegated to {agent_name}] {task_description[:80]}", - description=task_description, - assigned_to=agent_name, - created_by="default", - priority=priority, - task_type="task_request", - requires_approval=False, - auto_approve=True, - ) - - return { - "success": True, - "task_id": task.id, - "agent": agent_name, - "status": "submitted", - "message": f"Task submitted to {agent_name}: {task_description[:100]}...", - } - - except Exception as e: - logger.error("Failed to delegate task to %s: %s", agent_name, e) - return { - "success": False, - "error": str(e), - "task_id": None, - } + return { + "success": True, + "task_id": None, + "agent": agent_name, + "role": _VALID_AGENTS[agent_name], + "status": "noted", + "message": f"Delegation to {agent_name} ({_VALID_AGENTS[agent_name]}): {task_description[:100]}", + } def list_swarm_agents() -> dict[str, Any]: - """List all available swarm agents and their status. + """List all available sub-agents and their roles. Returns: - Dict with agent list and status + Dict with agent list """ try: - from swarm import registry - - agents = registry.list_agents() + from timmy.agents.timmy import _PERSONAS return { "success": True, "agents": [ { - "name": a.name, - "status": a.status, - "capabilities": a.capabilities, + "name": p["name"], + "id": p["agent_id"], + "role": p.get("role", ""), + "status": "available", + "capabilities": ", ".join(p.get("tools", [])), } - for a in agents + for p in _PERSONAS ], } - except Exception as e: + logger.debug("Agent list unavailable: %s", e) return { "success": False, "error": str(e), diff --git a/src/timmy/tools_intro/__init__.py b/src/timmy/tools_intro/__init__.py index a58213f..19e511d 100644 --- a/src/timmy/tools_intro/__init__.py +++ b/src/timmy/tools_intro/__init__.py @@ -209,78 +209,41 @@ def get_memory_status() -> dict[str, Any]: def get_task_queue_status() -> dict[str, Any]: - """Get current task queue status for Timmy. + """Get current task queue status. - Returns: - Dict with queue counts by status and current task info. + The swarm task queue was removed. This returns a stub indicating + the subsystem is not available. """ - try: - from swarm.task_queue.models import ( - get_counts_by_status, - get_current_task_for_agent, - ) - - counts = get_counts_by_status() - current = get_current_task_for_agent("default") - - result: dict[str, Any] = { - "counts": counts, - "total": sum(counts.values()), - } - - if current: - result["current_task"] = { - "id": current.id, - "title": current.title, - "type": current.task_type, - "started_at": current.started_at, - } - else: - result["current_task"] = None - - return result - except Exception as exc: - logger.debug("Task queue status unavailable: %s", exc) - return {"error": str(exc)} + return { + "counts": {}, + "total": 0, + "current_task": None, + "note": "Task queue not available (swarm module removed)", + } def get_agent_roster() -> dict[str, Any]: - """Get the swarm agent roster with last-seen ages. + """Get the agent roster from the orchestrator's sub-agent definitions. Returns: Dict with agent list and summary. """ try: - from swarm.registry import list_agents + from timmy.agents.timmy import _PERSONAS - agents = list_agents() - now = datetime.now(timezone.utc) roster = [] - - for agent in agents: - last_seen = agent.last_seen - try: - ts = datetime.fromisoformat(last_seen) - if ts.tzinfo is None: - ts = ts.replace(tzinfo=timezone.utc) - age_seconds = int((now - ts).total_seconds()) - except Exception: - age_seconds = -1 - + for persona in _PERSONAS: roster.append({ - "id": agent.id, - "name": agent.name, - "status": agent.status, - "capabilities": agent.capabilities, - "last_seen_seconds_ago": age_seconds, + "id": persona["agent_id"], + "name": persona["name"], + "status": "available", + "capabilities": ", ".join(persona.get("tools", [])), + "role": persona.get("role", ""), }) return { "agents": roster, "total": len(roster), - "idle": sum(1 for a in roster if a["status"] == "idle"), - "busy": sum(1 for a in roster if a["status"] == "busy"), - "offline": sum(1 for a in roster if a["status"] == "offline"), } except Exception as exc: logger.debug("Agent roster unavailable: %s", exc) diff --git a/tests/timmy/test_tools_delegation.py b/tests/timmy/test_tools_delegation.py index 409f2e4..9a78678 100644 --- a/tests/timmy/test_tools_delegation.py +++ b/tests/timmy/test_tools_delegation.py @@ -24,15 +24,22 @@ class TestDelegateTask: assert isinstance(result, dict) def test_all_valid_agents_accepted(self): - valid_agents = ["seer", "forge", "echo", "helm", "quill", "mace"] + valid_agents = ["seer", "forge", "echo", "helm", "quill"] for agent in valid_agents: result = delegate_task(agent, "test task") assert "Unknown agent" not in result.get("error", ""), f"{agent} rejected" + def test_mace_no_longer_valid(self): + result = delegate_task("mace", "run security scan") + assert result["success"] is False + assert "Unknown agent" in result["error"] + class TestListSwarmAgents: - def test_graceful_failure_when_swarm_unavailable(self): + def test_returns_agents_from_personas(self): result = list_swarm_agents() - assert result["success"] is False - assert result["agents"] == [] - assert "error" in result + assert result["success"] is True + assert len(result["agents"]) > 0 + agent_names = [a["name"] for a in result["agents"]] + assert "Seer" in agent_names + assert "Forge" in agent_names