[loop-cycle-43] refactor: remove 1035 lines of dead code (#136) (#146)

This commit is contained in:
2026-03-15 10:10:12 -04:00
parent 16b31b30cb
commit d2c51763d0
6 changed files with 2 additions and 1037 deletions

View File

@@ -1,193 +0,0 @@
"""Event Broadcaster - bridges event_log to WebSocket clients.
When events are logged, they are broadcast to all connected dashboard clients
via WebSocket for real-time activity feed updates.
"""
import asyncio
import logging
from typing import Optional
try:
from swarm.event_log import EventLogEntry
except ImportError:
EventLogEntry = None
logger = logging.getLogger(__name__)
class EventBroadcaster:
"""Broadcasts events to WebSocket clients.
Usage:
from infrastructure.events.broadcaster import event_broadcaster
event_broadcaster.broadcast(event)
"""
def __init__(self) -> None:
self._ws_manager: Optional = None
def _get_ws_manager(self):
"""Lazy import to avoid circular deps."""
if self._ws_manager is None:
try:
from infrastructure.ws_manager.handler import ws_manager
self._ws_manager = ws_manager
except Exception as exc:
logger.debug("WebSocket manager not available: %s", exc)
return self._ws_manager
async def broadcast(self, event: EventLogEntry) -> int:
"""Broadcast an event to all connected WebSocket clients.
Args:
event: The event to broadcast
Returns:
Number of clients notified
"""
ws_manager = self._get_ws_manager()
if not ws_manager:
return 0
# Build message payload
payload = {
"type": "event",
"payload": {
"id": event.id,
"event_type": event.event_type.value,
"source": event.source,
"task_id": event.task_id,
"agent_id": event.agent_id,
"timestamp": event.timestamp,
"data": event.data,
},
}
try:
# Broadcast to all connected clients
count = await ws_manager.broadcast_json(payload)
logger.debug("Broadcasted event %s to %d clients", event.id[:8], count)
return count
except Exception as exc:
logger.error("Failed to broadcast event: %s", exc)
return 0
def broadcast_sync(self, event: EventLogEntry) -> None:
"""Synchronous wrapper for broadcast.
Use this from synchronous code - it schedules the async broadcast
in the event loop if one is running.
"""
try:
asyncio.get_running_loop()
# Schedule in background, don't wait
asyncio.create_task(self.broadcast(event))
except RuntimeError:
# No event loop running, skip broadcast
pass
# Global singleton
event_broadcaster = EventBroadcaster()
# Event type to icon/emoji mapping
EVENT_ICONS = {
"task.created": "📝",
"task.bidding": "",
"task.assigned": "👤",
"task.started": "▶️",
"task.completed": "",
"task.failed": "",
"agent.joined": "🟢",
"agent.left": "🔴",
"agent.status_changed": "🔄",
"bid.submitted": "💰",
"auction.closed": "🏁",
"tool.called": "🔧",
"tool.completed": "⚙️",
"tool.failed": "💥",
"system.error": "⚠️",
"system.warning": "🔶",
"system.info": "",
"error.captured": "🐛",
"bug_report.created": "📋",
}
EVENT_LABELS = {
"task.created": "New task",
"task.bidding": "Bidding open",
"task.assigned": "Task assigned",
"task.started": "Task started",
"task.completed": "Task completed",
"task.failed": "Task failed",
"agent.joined": "Agent joined",
"agent.left": "Agent left",
"agent.status_changed": "Status changed",
"bid.submitted": "Bid submitted",
"auction.closed": "Auction closed",
"tool.called": "Tool called",
"tool.completed": "Tool completed",
"tool.failed": "Tool failed",
"system.error": "Error",
"system.warning": "Warning",
"system.info": "Info",
"error.captured": "Error captured",
"bug_report.created": "Bug report filed",
}
def get_event_icon(event_type: str) -> str:
"""Get emoji icon for event type."""
return EVENT_ICONS.get(event_type, "")
def get_event_label(event_type: str) -> str:
"""Get human-readable label for event type."""
return EVENT_LABELS.get(event_type, event_type)
def format_event_for_display(event: EventLogEntry) -> dict:
"""Format event for display in activity feed.
Returns dict with display-friendly fields.
"""
data = event.data or {}
# Build description based on event type
description = ""
if event.event_type.value == "task.created":
desc = data.get("description", "")
description = desc[:60] + "..." if len(desc) > 60 else desc
elif event.event_type.value == "task.assigned":
agent = event.agent_id[:8] if event.agent_id else "unknown"
bid = data.get("bid_sats", "?")
description = f"to {agent} ({bid} sats)"
elif event.event_type.value == "bid.submitted":
bid = data.get("bid_sats", "?")
description = f"{bid} sats"
elif event.event_type.value == "agent.joined":
persona = data.get("persona_id", "")
description = f"Persona: {persona}" if persona else "New agent"
else:
# Generic: use any string data
for key in ["message", "reason", "description"]:
if key in data:
val = str(data[key])
description = val[:60] + "..." if len(val) > 60 else val
break
return {
"id": event.id,
"icon": get_event_icon(event.event_type.value),
"label": get_event_label(event.event_type.value),
"type": event.event_type.value,
"source": event.source,
"description": description,
"timestamp": event.timestamp,
"time_short": event.timestamp[11:19] if event.timestamp else "",
"task_id": event.task_id,
"agent_id": event.agent_id,
}

View File

@@ -1,275 +0,0 @@
"""Ollama-based implementation of TimAgent interface.
This adapter wraps the existing Timmy Ollama agent to conform
to the substrate-agnostic TimAgent interface. It's the bridge
between the old codebase and the new embodiment-ready architecture.
Usage:
from timmy.agent_core import AgentIdentity, Perception
from timmy.agent_core.ollama_adapter import OllamaAgent
identity = AgentIdentity.generate("Timmy")
agent = OllamaAgent(identity)
perception = Perception.text("Hello!")
memory = agent.perceive(perception)
action = agent.reason("How should I respond?", [memory])
result = agent.act(action)
"""
from typing import Any
from timmy.agent import _resolve_model_with_fallback, create_timmy
from timmy.agent_core.interface import (
Action,
ActionType,
AgentCapability,
AgentEffect,
AgentIdentity,
Communication,
Memory,
Perception,
PerceptionType,
TimAgent,
)
class OllamaAgent(TimAgent):
"""TimAgent implementation using local Ollama LLM.
This is the production agent for Timmy Time v2. It uses
Ollama for reasoning and SQLite for memory persistence.
Capabilities:
- REASONING: LLM-based inference
- CODING: Code generation and analysis
- WRITING: Long-form content creation
- ANALYSIS: Data processing and insights
- COMMUNICATION: Multi-agent messaging
"""
def __init__(
self,
identity: AgentIdentity,
model: str | None = None,
effect_log: str | None = None,
require_vision: bool = False,
) -> None:
"""Initialize Ollama-based agent.
Args:
identity: Agent identity (persistent across sessions)
model: Ollama model to use (auto-resolves with fallback)
effect_log: Path to log agent effects (optional)
require_vision: Whether to select a vision-capable model
"""
super().__init__(identity)
# Resolve model with automatic pulling and fallback
resolved_model, is_fallback = _resolve_model_with_fallback(
requested_model=model,
require_vision=require_vision,
auto_pull=True,
)
if is_fallback:
import logging
logging.getLogger(__name__).info(
"OllamaAdapter using fallback model %s", resolved_model
)
# Initialize underlying Ollama agent
self._timmy = create_timmy(model=resolved_model)
# Set capabilities based on what Ollama can do
self._capabilities = {
AgentCapability.REASONING,
AgentCapability.CODING,
AgentCapability.WRITING,
AgentCapability.ANALYSIS,
AgentCapability.COMMUNICATION,
}
# Effect logging for audit/replay
self._effect_log = AgentEffect(effect_log) if effect_log else None
# Simple in-memory working memory (short term)
self._working_memory: list[Memory] = []
self._max_working_memory = 10
def perceive(self, perception: Perception) -> Memory:
"""Process perception and store in memory.
For text perceptions, we might do light preprocessing
(summarization, keyword extraction) before storage.
"""
# Create memory from perception
memory = Memory(
id=f"mem_{len(self._working_memory)}",
content={
"type": perception.type.name,
"data": perception.data,
"source": perception.source,
},
created_at=perception.timestamp,
tags=self._extract_tags(perception),
)
# Add to working memory
self._working_memory.append(memory)
if len(self._working_memory) > self._max_working_memory:
self._working_memory.pop(0) # FIFO eviction
# Log effect
if self._effect_log:
self._effect_log.log_perceive(perception, memory.id)
return memory
def reason(self, query: str, context: list[Memory]) -> Action:
"""Use LLM to reason and decide on action.
This is where the Ollama agent does its work. We construct
a prompt from the query and context, then interpret the
response as an action.
"""
# Build context string from memories
context_str = self._format_context(context)
# Construct prompt
prompt = f"""You are {self._identity.name}, an AI assistant.
Context from previous interactions:
{context_str}
Current query: {query}
Respond naturally and helpfully."""
# Run LLM inference
result = self._timmy.run(prompt, stream=False)
response_text = result.content if hasattr(result, "content") else str(result)
# Create text response action
action = Action.respond(response_text, confidence=0.9)
# Log effect
if self._effect_log:
self._effect_log.log_reason(query, action.type)
return action
def act(self, action: Action) -> Any:
"""Execute action in the Ollama substrate.
For text actions, the "execution" is just returning the
text (already generated during reasoning). For future
action types (MOVE, SPEAK), this would trigger the
appropriate Ollama tool calls.
"""
result = None
if action.type == ActionType.TEXT:
result = action.payload
elif action.type == ActionType.SPEAK:
# Would call TTS here
result = {"spoken": action.payload, "tts_engine": "pyttsx3"}
elif action.type == ActionType.CALL:
# Would make API call
result = {"status": "not_implemented", "payload": action.payload}
else:
result = {"error": f"Action type {action.type} not supported by OllamaAgent"}
# Log effect
if self._effect_log:
self._effect_log.log_act(action, result)
return result
def remember(self, memory: Memory) -> None:
"""Store memory in working memory.
Adds the memory to the sliding window and bumps its importance.
"""
memory.touch()
# Deduplicate by id
self._working_memory = [m for m in self._working_memory if m.id != memory.id]
self._working_memory.append(memory)
# Evict oldest if over capacity
if len(self._working_memory) > self._max_working_memory:
self._working_memory.pop(0)
def recall(self, query: str, limit: int = 5) -> list[Memory]:
"""Retrieve relevant memories.
Simple keyword matching for now. Future: vector similarity.
"""
query_lower = query.lower()
scored = []
for memory in self._working_memory:
score = 0
content_str = str(memory.content).lower()
# Simple keyword overlap
query_words = set(query_lower.split())
content_words = set(content_str.split())
overlap = len(query_words & content_words)
score += overlap
# Boost recent memories
score += memory.importance
scored.append((score, memory))
# Sort by score descending
scored.sort(key=lambda x: x[0], reverse=True)
# Return top N
return [m for _, m in scored[:limit]]
def communicate(self, message: Communication) -> bool:
"""Send message to another agent.
Swarm comms removed — inter-agent communication will be handled
by the unified brain memory layer.
"""
return False
def _extract_tags(self, perception: Perception) -> list[str]:
"""Extract searchable tags from perception."""
tags = [perception.type.name, perception.source]
if perception.type == PerceptionType.TEXT:
# Simple keyword extraction
text = str(perception.data).lower()
keywords = ["code", "bug", "help", "question", "task"]
for kw in keywords:
if kw in text:
tags.append(kw)
return tags
def _format_context(self, memories: list[Memory]) -> str:
"""Format memories into context string for prompt."""
if not memories:
return "No previous context."
parts = []
for mem in memories[-5:]: # Last 5 memories
if isinstance(mem.content, dict):
data = mem.content.get("data", "")
parts.append(f"- {data}")
else:
parts.append(f"- {mem.content}")
return "\n".join(parts)
def get_effect_log(self) -> list[dict] | None:
"""Export effect log if logging is enabled."""
if self._effect_log:
return self._effect_log.export()
return None

View File

@@ -1,105 +0,0 @@
"""Agent-to-agent messaging for the Timmy serve layer.
Provides a simple message-passing interface that allows agents to
communicate with each other. Messages are routed through the swarm
comms layer when available, or stored in an in-memory queue for
single-process operation.
"""
import logging
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import UTC, datetime
logger = logging.getLogger(__name__)
@dataclass
class AgentMessage:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
content: str = ""
message_type: str = "text" # text | command | response | error
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
replied: bool = False
class InterAgentMessenger:
"""In-memory message queue for agent-to-agent communication."""
def __init__(self, max_queue_size: int = 1000) -> None:
self._queues: dict[str, deque[AgentMessage]] = {}
self._max_size = max_queue_size
self._all_messages: list[AgentMessage] = []
def send(
self,
from_agent: str,
to_agent: str,
content: str,
message_type: str = "text",
) -> AgentMessage:
"""Send a message from one agent to another."""
msg = AgentMessage(
from_agent=from_agent,
to_agent=to_agent,
content=content,
message_type=message_type,
)
queue = self._queues.setdefault(to_agent, deque(maxlen=self._max_size))
queue.append(msg)
self._all_messages.append(msg)
logger.info(
"Message %s%s: %s (%s)",
from_agent,
to_agent,
content[:50],
message_type,
)
return msg
def receive(self, agent_id: str, limit: int = 10) -> list[AgentMessage]:
"""Receive pending messages for an agent (FIFO, non-destructive peek)."""
queue = self._queues.get(agent_id, deque())
return list(queue)[:limit]
def pop(self, agent_id: str) -> AgentMessage | None:
"""Pop the oldest message from an agent's queue."""
queue = self._queues.get(agent_id, deque())
if not queue:
return None
return queue.popleft()
def pop_all(self, agent_id: str) -> list[AgentMessage]:
"""Pop all pending messages for an agent."""
queue = self._queues.get(agent_id, deque())
messages = list(queue)
queue.clear()
return messages
def broadcast(self, from_agent: str, content: str, message_type: str = "text") -> int:
"""Broadcast a message to all known agents. Returns count sent."""
count = 0
for agent_id in list(self._queues.keys()):
if agent_id != from_agent:
self.send(from_agent, agent_id, content, message_type)
count += 1
return count
def history(self, limit: int = 50) -> list[AgentMessage]:
"""Return recent message history across all agents."""
return self._all_messages[-limit:]
def clear(self, agent_id: str | None = None) -> None:
"""Clear message queue(s)."""
if agent_id:
self._queues.pop(agent_id, None)
else:
self._queues.clear()
self._all_messages.clear()
# Module-level singleton
messenger = InterAgentMessenger()

View File

@@ -1,191 +0,0 @@
"""Tests for the event broadcaster (infrastructure.events.broadcaster)."""
from dataclasses import dataclass
from enum import Enum
from unittest.mock import AsyncMock, MagicMock
from infrastructure.events.broadcaster import (
EVENT_ICONS,
EVENT_LABELS,
EventBroadcaster,
format_event_for_display,
get_event_icon,
get_event_label,
)
# ── Fake EventLogEntry for testing ──────────────────────────────────────────
class FakeEventType(Enum):
TASK_CREATED = "task.created"
TASK_ASSIGNED = "task.assigned"
BID_SUBMITTED = "bid.submitted"
AGENT_JOINED = "agent.joined"
SYSTEM_INFO = "system.info"
@dataclass
class FakeEventLogEntry:
id: str = "evt-abc123"
event_type: FakeEventType = FakeEventType.TASK_CREATED
source: str = "test"
task_id: str = "task-1"
agent_id: str = "agent-1"
timestamp: str = "2026-03-06T12:00:00Z"
data: dict = None
def __post_init__(self):
if self.data is None:
self.data = {}
class TestEventBroadcaster:
"""Test EventBroadcaster class."""
def test_init(self):
b = EventBroadcaster()
assert b._ws_manager is None
async def test_broadcast_no_ws_manager(self):
b = EventBroadcaster()
# _get_ws_manager returns None => returns 0
count = await b.broadcast(FakeEventLogEntry())
assert count == 0
async def test_broadcast_with_ws_manager(self):
b = EventBroadcaster()
mock_ws = MagicMock()
mock_ws.broadcast_json = AsyncMock(return_value=3)
b._ws_manager = mock_ws
event = FakeEventLogEntry()
count = await b.broadcast(event)
assert count == 3
mock_ws.broadcast_json.assert_awaited_once()
# Verify payload structure
payload = mock_ws.broadcast_json.call_args[0][0]
assert payload["type"] == "event"
assert payload["payload"]["id"] == "evt-abc123"
assert payload["payload"]["event_type"] == "task.created"
async def test_broadcast_ws_error_returns_zero(self):
b = EventBroadcaster()
mock_ws = MagicMock()
mock_ws.broadcast_json = AsyncMock(side_effect=RuntimeError("ws down"))
b._ws_manager = mock_ws
count = await b.broadcast(FakeEventLogEntry())
assert count == 0
def test_broadcast_sync_no_loop(self):
"""broadcast_sync should not crash when no event loop is running."""
b = EventBroadcaster()
# This should silently pass (no event loop)
b.broadcast_sync(FakeEventLogEntry())
class TestEventIcons:
"""Test icon/label lookup functions."""
def test_known_icon(self):
assert get_event_icon("task.created") == "📝"
assert get_event_icon("agent.joined") == "🟢"
def test_unknown_icon_returns_bullet(self):
assert get_event_icon("nonexistent") == ""
def test_known_label(self):
assert get_event_label("task.created") == "New task"
assert get_event_label("task.failed") == "Task failed"
def test_unknown_label_returns_type(self):
assert get_event_label("custom.event") == "custom.event"
def test_all_icons_have_labels(self):
"""Every icon key should also have a label."""
for key in EVENT_ICONS:
assert key in EVENT_LABELS, f"Missing label for icon key: {key}"
class TestFormatEventForDisplay:
"""Test format_event_for_display helper."""
def test_task_created_truncates_description(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_CREATED,
data={"description": "A" * 100},
)
result = format_event_for_display(event)
assert result["description"].endswith("...")
assert len(result["description"]) <= 63
def test_task_created_short_description(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_CREATED,
data={"description": "Short task"},
)
result = format_event_for_display(event)
assert result["description"] == "Short task"
def test_task_assigned(self):
event = FakeEventLogEntry(
event_type=FakeEventType.TASK_ASSIGNED,
agent_id="agent-12345678-long",
data={"bid_sats": 500},
)
result = format_event_for_display(event)
assert "agent-12" in result["description"]
assert "500 sats" in result["description"]
def test_bid_submitted(self):
event = FakeEventLogEntry(
event_type=FakeEventType.BID_SUBMITTED,
data={"bid_sats": 250},
)
result = format_event_for_display(event)
assert "250 sats" in result["description"]
def test_agent_joined_with_persona(self):
event = FakeEventLogEntry(
event_type=FakeEventType.AGENT_JOINED,
data={"persona_id": "forge"},
)
result = format_event_for_display(event)
assert "forge" in result["description"]
def test_agent_joined_no_persona(self):
event = FakeEventLogEntry(
event_type=FakeEventType.AGENT_JOINED,
data={},
)
result = format_event_for_display(event)
assert result["description"] == "New agent"
def test_generic_event_with_message(self):
event = FakeEventLogEntry(
event_type=FakeEventType.SYSTEM_INFO,
data={"message": "All systems go"},
)
result = format_event_for_display(event)
assert result["description"] == "All systems go"
def test_generic_event_no_data(self):
event = FakeEventLogEntry(
event_type=FakeEventType.SYSTEM_INFO,
data={},
)
result = format_event_for_display(event)
assert result["description"] == ""
def test_output_structure(self):
event = FakeEventLogEntry()
result = format_event_for_display(event)
assert "id" in result
assert "icon" in result
assert "label" in result
assert "type" in result
assert "source" in result
assert "timestamp" in result
assert "time_short" in result
assert result["time_short"] == "12:00:00"

View File

@@ -1,12 +1,10 @@
"""Functional tests for agent_core — interface and ollama_adapter.
"""Functional tests for agent_core — interface.
Covers the substrate-agnostic agent contract (data classes, enums,
factory methods, abstract enforcement) and the OllamaAgent adapter
(perceive → reason → act → remember → recall → communicate workflow).
factory methods, abstract enforcement).
"""
import uuid
from unittest.mock import MagicMock, patch
import pytest
@@ -343,160 +341,3 @@ class TestAgentEffect:
assert len(log) == 3
types = [e["type"] for e in log]
assert types == ["perceive", "reason", "act"]
# ── OllamaAgent functional tests ─────────────────────────────────────────────
class TestOllamaAgent:
"""Functional tests for the OllamaAgent adapter.
Uses mocked Ollama (create_timmy returns a mock) to exercise
the full perceive → reason → act → remember → recall pipeline.
"""
@pytest.fixture
def agent(self):
with patch("timmy.agent_core.ollama_adapter.create_timmy") as mock_ct:
mock_timmy = MagicMock()
mock_run = MagicMock()
mock_run.content = "Mocked LLM response"
mock_timmy.run.return_value = mock_run
mock_ct.return_value = mock_timmy
from timmy.agent_core.ollama_adapter import OllamaAgent
identity = AgentIdentity.generate("TestTimmy")
return OllamaAgent(identity, effect_log="/tmp/test_effects")
def test_capabilities_set(self, agent):
caps = agent.capabilities
assert AgentCapability.REASONING in caps
assert AgentCapability.CODING in caps
assert AgentCapability.WRITING in caps
assert AgentCapability.ANALYSIS in caps
assert AgentCapability.COMMUNICATION in caps
def test_perceive_creates_memory(self, agent):
p = Perception.text("Hello Timmy")
mem = agent.perceive(p)
assert mem.id == "mem_0"
assert mem.content["data"] == "Hello Timmy"
assert mem.content["type"] == "TEXT"
def test_perceive_extracts_tags(self, agent):
p = Perception.text("I need help with a bug in my code")
mem = agent.perceive(p)
assert "TEXT" in mem.tags
assert "user" in mem.tags
assert "help" in mem.tags
assert "bug" in mem.tags
assert "code" in mem.tags
def test_perceive_fifo_eviction(self, agent):
for i in range(12):
agent.perceive(Perception.text(f"msg {i}"))
assert len(agent._working_memory) == 10
# oldest two evicted
assert agent._working_memory[0].content["data"] == "msg 2"
def test_reason_returns_action(self, agent):
mem = agent.perceive(Perception.text("context"))
action = agent.reason("What should I do?", [mem])
assert action.type == ActionType.TEXT
assert action.payload == "Mocked LLM response"
assert action.confidence == 0.9
def test_act_text(self, agent):
action = Action.respond("Hello!")
result = agent.act(action)
assert result == "Hello!"
def test_act_speak(self, agent):
action = Action(type=ActionType.SPEAK, payload="Speak this")
result = agent.act(action)
assert result["spoken"] == "Speak this"
assert result["tts_engine"] == "pyttsx3"
def test_act_call(self, agent):
action = Action(type=ActionType.CALL, payload={"url": "http://example.com"})
result = agent.act(action)
assert result["status"] == "not_implemented"
def test_act_unsupported(self, agent):
action = Action(type=ActionType.MOVE, payload=(0, 0, 0))
result = agent.act(action)
assert "error" in result
def test_remember_stores_and_deduplicates(self, agent):
mem = agent.perceive(Perception.text("original"))
assert len(agent._working_memory) == 1
agent.remember(mem)
assert len(agent._working_memory) == 1 # deduplicated
assert mem.access_count == 1
def test_remember_evicts_on_overflow(self, agent):
for i in range(10):
agent.perceive(Perception.text(f"fill {i}"))
extra = Memory(id="extra", content="overflow", created_at="now")
agent.remember(extra)
assert len(agent._working_memory) == 10
# first memory evicted
assert agent._working_memory[-1].id == "extra"
def test_recall_keyword_matching(self, agent):
agent.perceive(Perception.text("python code review"))
agent.perceive(Perception.text("weather forecast"))
agent.perceive(Perception.text("python bug fix"))
results = agent.recall("python", limit=5)
# All memories returned (recall returns up to limit)
assert len(results) == 3
# Memories containing "python" should score higher and appear first
first_content = str(results[0].content)
assert "python" in first_content.lower()
def test_recall_respects_limit(self, agent):
for i in range(10):
agent.perceive(Perception.text(f"memory {i}"))
results = agent.recall("memory", limit=3)
assert len(results) == 3
def test_communicate_returns_false_comms_removed(self, agent):
"""Swarm comms removed — communicate() always returns False until brain wired."""
msg = Communication(sender="Timmy", recipient="Echo", content="hi")
assert agent.communicate(msg) is False
def test_effect_logging_full_workflow(self, agent):
p = Perception.text("test input")
mem = agent.perceive(p)
action = agent.reason("respond", [mem])
agent.act(action)
log = agent.get_effect_log()
assert len(log) == 3
assert log[0]["type"] == "perceive"
assert log[1]["type"] == "reason"
assert log[2]["type"] == "act"
def test_no_effect_log_when_disabled(self):
with patch("timmy.agent_core.ollama_adapter.create_timmy") as mock_ct:
mock_timmy = MagicMock()
mock_ct.return_value = mock_timmy
from timmy.agent_core.ollama_adapter import OllamaAgent
identity = AgentIdentity.generate("NoLog")
agent = OllamaAgent(identity) # no effect_log
assert agent.get_effect_log() is None
def test_format_context_empty(self, agent):
result = agent._format_context([])
assert result == "No previous context."
def test_format_context_with_dict_content(self, agent):
mem = Memory(id="m", content={"data": "hello"}, created_at="now")
result = agent._format_context([mem])
assert "hello" in result
def test_format_context_with_string_content(self, agent):
mem = Memory(id="m", content="plain string", created_at="now")
result = agent._format_context([mem])
assert "plain string" in result

View File

@@ -1,112 +0,0 @@
"""Tests for inter-agent messaging system."""
from timmy_serve.inter_agent import AgentMessage, InterAgentMessenger, messenger
class TestAgentMessage:
def test_defaults(self):
msg = AgentMessage()
assert msg.from_agent == ""
assert msg.to_agent == ""
assert msg.content == ""
assert msg.message_type == "text"
assert msg.replied is False
assert msg.id # UUID should be generated
assert msg.timestamp # timestamp should be generated
def test_custom_fields(self):
msg = AgentMessage(
from_agent="seer",
to_agent="forge",
content="hello",
message_type="command",
)
assert msg.from_agent == "seer"
assert msg.to_agent == "forge"
assert msg.content == "hello"
assert msg.message_type == "command"
class TestInterAgentMessenger:
def setup_method(self):
self.m = InterAgentMessenger(max_queue_size=100)
def test_send_and_receive(self):
msg = self.m.send("seer", "forge", "build this")
assert msg.from_agent == "seer"
assert msg.to_agent == "forge"
received = self.m.receive("forge")
assert len(received) == 1
assert received[0].content == "build this"
def test_receive_empty(self):
assert self.m.receive("nobody") == []
def test_pop(self):
self.m.send("a", "b", "first")
self.m.send("a", "b", "second")
msg = self.m.pop("b")
assert msg.content == "first"
msg2 = self.m.pop("b")
assert msg2.content == "second"
assert self.m.pop("b") is None
def test_pop_empty(self):
assert self.m.pop("nobody") is None
def test_pop_all(self):
self.m.send("a", "b", "one")
self.m.send("a", "b", "two")
msgs = self.m.pop_all("b")
assert len(msgs) == 2
assert self.m.receive("b") == []
def test_pop_all_empty(self):
assert self.m.pop_all("nobody") == []
def test_broadcast(self):
# Set up queues for agents
self.m.send("setup", "forge", "init")
self.m.send("setup", "echo", "init")
self.m.pop_all("forge")
self.m.pop_all("echo")
count = self.m.broadcast("seer", "alert")
assert count == 2
assert len(self.m.receive("forge")) == 1
assert len(self.m.receive("echo")) == 1
def test_broadcast_excludes_sender(self):
self.m.send("setup", "seer", "init")
self.m.pop_all("seer")
count = self.m.broadcast("seer", "hello")
assert count == 0 # no other agents
def test_history(self):
self.m.send("a", "b", "msg1")
self.m.send("b", "a", "msg2")
history = self.m.history(limit=50)
assert len(history) == 2
def test_history_limit(self):
for i in range(10):
self.m.send("a", "b", f"msg{i}")
assert len(self.m.history(limit=3)) == 3
def test_clear_specific_agent(self):
self.m.send("a", "b", "hello")
self.m.send("a", "c", "world")
self.m.clear("b")
assert self.m.receive("b") == []
assert len(self.m.receive("c")) == 1
def test_clear_all(self):
self.m.send("a", "b", "hello")
self.m.send("a", "c", "world")
self.m.clear()
assert self.m.receive("b") == []
assert self.m.receive("c") == []
assert self.m.history() == []
def test_module_singleton(self):
assert isinstance(messenger, InterAgentMessenger)