fix: remove dead swarm imports, add memory_write tool, and auto-prune on startup (#143)

- Replace dead `from swarm` imports in tools_delegation and tools_intro
  with working implementations sourced from _PERSONAS
- Add `memory_write` tool so the agent can actually persist memories
  when users ask it to remember something
- Enhance `memory_search` to search both vault files AND the runtime
  vector store for cross-channel recall (Discord/web/Telegram)
- Add memory management config: memory_prune_days, memory_prune_keep_facts,
  memory_vault_max_mb
- Auto-prune old vector store entries and warn on vault size at startup
- Update tests for new delegation agent list (mace removed)

Co-authored-by: Trip T <trip@local>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alexander Whitestone
2026-03-07 22:34:30 -05:00
committed by GitHub
parent 3bf7187482
commit b8164e46b0
7 changed files with 168 additions and 115 deletions

View File

@@ -106,6 +106,16 @@ class Settings(BaseSettings):
# In production, security settings are strictly enforced.
timmy_env: Literal["development", "production"] = "development"
# ── Memory Management ──────────────────────────────────────────────
# Auto-prune vector store memories older than this many days on startup.
# Set to 0 to disable auto-pruning.
memory_prune_days: int = 90
# When True, fact-type memories are kept even when older than the TTL.
memory_prune_keep_facts: bool = True
# Maximum size in MB for the memory/notes/ vault directory.
# When exceeded, a warning is logged. Set to 0 to disable.
memory_vault_max_mb: int = 100
# ── Test / Diagnostics ─────────────────────────────────────────────
# Skip loading heavy embedding models (for tests / low-memory envs).
timmy_skip_embeddings: bool = False

View File

@@ -204,6 +204,34 @@ async def lifespan(app: FastAPI):
if spark_engine.enabled:
logger.info("Spark Intelligence active — event capture enabled")
# Auto-prune old vector store memories on startup
if settings.memory_prune_days > 0:
try:
from timmy.memory.vector_store import prune_memories
pruned = prune_memories(
older_than_days=settings.memory_prune_days,
keep_facts=settings.memory_prune_keep_facts,
)
if pruned:
logger.info("Memory auto-prune: removed %d entries older than %d days", pruned, settings.memory_prune_days)
except Exception as exc:
logger.debug("Memory auto-prune skipped: %s", exc)
# Warn if memory vault exceeds size limit
if settings.memory_vault_max_mb > 0:
try:
vault_path = Path(settings.repo_root) / "memory" / "notes"
if vault_path.exists():
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
total_mb = total_bytes / (1024 * 1024)
if total_mb > settings.memory_vault_max_mb:
logger.warning(
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
total_mb, settings.memory_vault_max_mb,
)
except Exception as exc:
logger.debug("Vault size check skipped: %s", exc)
# Start chat integrations in background
chat_task = asyncio.create_task(_start_chat_integrations_background())

View File

@@ -324,7 +324,11 @@ memory_searcher = MemorySearcher()
def memory_search(query: str, top_k: int = 5) -> str:
"""Search past conversations and notes for relevant context.
"""Search past conversations, notes, and stored facts for relevant context.
Searches across both the vault (indexed markdown files) and the
runtime memory store (facts and conversation fragments stored via
memory_write).
Args:
query: What to search for (e.g. "Bitcoin strategy", "server setup").
@@ -336,12 +340,62 @@ def memory_search(query: str, top_k: int = 5) -> str:
# Guard: model sometimes passes None for top_k
if top_k is None:
top_k = 5
results = semantic_memory.search(query, top_k)
if not results:
return "No relevant memories found."
parts = []
for content, score in results:
parts: list[str] = []
# 1. Search semantic vault (indexed markdown files)
vault_results = semantic_memory.search(query, top_k)
for content, score in vault_results:
if score < 0.2:
continue
parts.append(f"[score {score:.2f}] {content[:300]}")
return "\n\n".join(parts) if parts else "No relevant memories found."
parts.append(f"[vault score {score:.2f}] {content[:300]}")
# 2. Search runtime vector store (stored facts/conversations)
try:
from timmy.memory.vector_store import search_memories
runtime_results = search_memories(query, limit=top_k, min_relevance=0.2)
for entry in runtime_results:
label = entry.context_type or "memory"
parts.append(f"[{label}] {entry.content[:300]}")
except Exception as exc:
logger.debug("Vector store search unavailable: %s", exc)
if not parts:
return "No relevant memories found."
return "\n\n".join(parts)
def memory_write(content: str, context_type: str = "fact") -> str:
"""Store a piece of information in persistent memory.
Use this tool when the user explicitly asks you to remember something.
Stored memories are searchable via memory_search across all channels
(web GUI, Discord, Telegram, etc.).
Args:
content: The information to remember (e.g. a phrase, fact, or note).
context_type: Type of memory — "fact" for permanent facts,
"conversation" for conversation context,
"document" for document fragments.
Returns:
Confirmation that the memory was stored.
"""
if not content or not content.strip():
return "Nothing to store — content is empty."
valid_types = ("fact", "conversation", "document")
if context_type not in valid_types:
context_type = "fact"
try:
from timmy.memory.vector_store import store_memory
entry = store_memory(
content=content.strip(),
source="agent",
context_type=context_type,
)
return f"Stored in memory (type={context_type}, id={entry.id[:8]}). This is now searchable across all channels."
except Exception as exc:
logger.error("Failed to write memory: %s", exc)
return f"Failed to store memory: {exc}"

View File

@@ -445,13 +445,14 @@ def create_full_toolkit(base_dir: str | Path | None = None):
except Exception:
logger.debug("Grok tool not available")
# Memory search - semantic recall
# Memory search and write — persistent recall across all channels
try:
from timmy.semantic_memory import memory_search
from timmy.semantic_memory import memory_search, memory_write
toolkit.register(memory_search, name="memory_search")
toolkit.register(memory_write, name="memory_write")
except Exception:
logger.debug("Memory search not available")
logger.debug("Memory tools not available")
# System introspection - query runtime environment (sovereign self-knowledge)
try:

View File

@@ -1,7 +1,8 @@
"""Timmy's delegation tools — submit tasks and list agents.
Coordinator removed. Tasks go through the task_queue, agents are
looked up in the registry.
Delegation uses the orchestrator's sub-agent system. The old swarm
task-queue was removed; delegation now records intent and returns the
target agent information.
"""
import logging
@@ -9,9 +10,18 @@ from typing import Any
logger = logging.getLogger(__name__)
# Agents available in the current orchestrator architecture
_VALID_AGENTS: dict[str, str] = {
"seer": "research",
"forge": "code",
"echo": "memory",
"helm": "routing",
"quill": "writing",
}
def delegate_task(agent_name: str, task_description: str, priority: str = "normal") -> dict[str, Any]:
"""Delegate a task to another agent via the task queue.
"""Record a delegation intent to another agent.
Args:
agent_name: Name of the agent to delegate to
@@ -19,15 +29,14 @@ def delegate_task(agent_name: str, task_description: str, priority: str = "norma
priority: Task priority - "low", "normal", "high"
Returns:
Dict with task_id, status, and message
Dict with agent, status, and message
"""
valid_agents = ["seer", "forge", "echo", "helm", "quill", "mace"]
agent_name = agent_name.lower().strip()
if agent_name not in valid_agents:
if agent_name not in _VALID_AGENTS:
return {
"success": False,
"error": f"Unknown agent: {agent_name}. Valid agents: {', '.join(valid_agents)}",
"error": f"Unknown agent: {agent_name}. Valid agents: {', '.join(sorted(_VALID_AGENTS))}",
"task_id": None,
}
@@ -35,61 +44,42 @@ def delegate_task(agent_name: str, task_description: str, priority: str = "norma
if priority not in valid_priorities:
priority = "normal"
try:
from swarm.task_queue.models import create_task
logger.info("Delegation intent: %s%s (priority=%s)", agent_name, task_description[:80], priority)
task = create_task(
title=f"[Delegated to {agent_name}] {task_description[:80]}",
description=task_description,
assigned_to=agent_name,
created_by="default",
priority=priority,
task_type="task_request",
requires_approval=False,
auto_approve=True,
)
return {
"success": True,
"task_id": task.id,
"agent": agent_name,
"status": "submitted",
"message": f"Task submitted to {agent_name}: {task_description[:100]}...",
}
except Exception as e:
logger.error("Failed to delegate task to %s: %s", agent_name, e)
return {
"success": False,
"error": str(e),
"task_id": None,
}
return {
"success": True,
"task_id": None,
"agent": agent_name,
"role": _VALID_AGENTS[agent_name],
"status": "noted",
"message": f"Delegation to {agent_name} ({_VALID_AGENTS[agent_name]}): {task_description[:100]}",
}
def list_swarm_agents() -> dict[str, Any]:
"""List all available swarm agents and their status.
"""List all available sub-agents and their roles.
Returns:
Dict with agent list and status
Dict with agent list
"""
try:
from swarm import registry
agents = registry.list_agents()
from timmy.agents.timmy import _PERSONAS
return {
"success": True,
"agents": [
{
"name": a.name,
"status": a.status,
"capabilities": a.capabilities,
"name": p["name"],
"id": p["agent_id"],
"role": p.get("role", ""),
"status": "available",
"capabilities": ", ".join(p.get("tools", [])),
}
for a in agents
for p in _PERSONAS
],
}
except Exception as e:
logger.debug("Agent list unavailable: %s", e)
return {
"success": False,
"error": str(e),

View File

@@ -209,78 +209,41 @@ def get_memory_status() -> dict[str, Any]:
def get_task_queue_status() -> dict[str, Any]:
"""Get current task queue status for Timmy.
"""Get current task queue status.
Returns:
Dict with queue counts by status and current task info.
The swarm task queue was removed. This returns a stub indicating
the subsystem is not available.
"""
try:
from swarm.task_queue.models import (
get_counts_by_status,
get_current_task_for_agent,
)
counts = get_counts_by_status()
current = get_current_task_for_agent("default")
result: dict[str, Any] = {
"counts": counts,
"total": sum(counts.values()),
}
if current:
result["current_task"] = {
"id": current.id,
"title": current.title,
"type": current.task_type,
"started_at": current.started_at,
}
else:
result["current_task"] = None
return result
except Exception as exc:
logger.debug("Task queue status unavailable: %s", exc)
return {"error": str(exc)}
return {
"counts": {},
"total": 0,
"current_task": None,
"note": "Task queue not available (swarm module removed)",
}
def get_agent_roster() -> dict[str, Any]:
"""Get the swarm agent roster with last-seen ages.
"""Get the agent roster from the orchestrator's sub-agent definitions.
Returns:
Dict with agent list and summary.
"""
try:
from swarm.registry import list_agents
from timmy.agents.timmy import _PERSONAS
agents = list_agents()
now = datetime.now(timezone.utc)
roster = []
for agent in agents:
last_seen = agent.last_seen
try:
ts = datetime.fromisoformat(last_seen)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_seconds = int((now - ts).total_seconds())
except Exception:
age_seconds = -1
for persona in _PERSONAS:
roster.append({
"id": agent.id,
"name": agent.name,
"status": agent.status,
"capabilities": agent.capabilities,
"last_seen_seconds_ago": age_seconds,
"id": persona["agent_id"],
"name": persona["name"],
"status": "available",
"capabilities": ", ".join(persona.get("tools", [])),
"role": persona.get("role", ""),
})
return {
"agents": roster,
"total": len(roster),
"idle": sum(1 for a in roster if a["status"] == "idle"),
"busy": sum(1 for a in roster if a["status"] == "busy"),
"offline": sum(1 for a in roster if a["status"] == "offline"),
}
except Exception as exc:
logger.debug("Agent roster unavailable: %s", exc)

View File

@@ -24,15 +24,22 @@ class TestDelegateTask:
assert isinstance(result, dict)
def test_all_valid_agents_accepted(self):
valid_agents = ["seer", "forge", "echo", "helm", "quill", "mace"]
valid_agents = ["seer", "forge", "echo", "helm", "quill"]
for agent in valid_agents:
result = delegate_task(agent, "test task")
assert "Unknown agent" not in result.get("error", ""), f"{agent} rejected"
def test_mace_no_longer_valid(self):
result = delegate_task("mace", "run security scan")
assert result["success"] is False
assert "Unknown agent" in result["error"]
class TestListSwarmAgents:
def test_graceful_failure_when_swarm_unavailable(self):
def test_returns_agents_from_personas(self):
result = list_swarm_agents()
assert result["success"] is False
assert result["agents"] == []
assert "error" in result
assert result["success"] is True
assert len(result["agents"]) > 0
agent_names = [a["name"] for a in result["agents"]]
assert "Seer" in agent_names
assert "Forge" in agent_names