diff --git a/src/infrastructure/events/broadcaster.py b/src/infrastructure/events/broadcaster.py deleted file mode 100644 index 4fe5152..0000000 --- a/src/infrastructure/events/broadcaster.py +++ /dev/null @@ -1,193 +0,0 @@ -"""Event Broadcaster - bridges event_log to WebSocket clients. - -When events are logged, they are broadcast to all connected dashboard clients -via WebSocket for real-time activity feed updates. -""" - -import asyncio -import logging -from typing import Optional - -try: - from swarm.event_log import EventLogEntry -except ImportError: - EventLogEntry = None - -logger = logging.getLogger(__name__) - - -class EventBroadcaster: - """Broadcasts events to WebSocket clients. - - Usage: - from infrastructure.events.broadcaster import event_broadcaster - event_broadcaster.broadcast(event) - """ - - def __init__(self) -> None: - self._ws_manager: Optional = None - - def _get_ws_manager(self): - """Lazy import to avoid circular deps.""" - if self._ws_manager is None: - try: - from infrastructure.ws_manager.handler import ws_manager - - self._ws_manager = ws_manager - except Exception as exc: - logger.debug("WebSocket manager not available: %s", exc) - return self._ws_manager - - async def broadcast(self, event: EventLogEntry) -> int: - """Broadcast an event to all connected WebSocket clients. - - Args: - event: The event to broadcast - - Returns: - Number of clients notified - """ - ws_manager = self._get_ws_manager() - if not ws_manager: - return 0 - - # Build message payload - payload = { - "type": "event", - "payload": { - "id": event.id, - "event_type": event.event_type.value, - "source": event.source, - "task_id": event.task_id, - "agent_id": event.agent_id, - "timestamp": event.timestamp, - "data": event.data, - }, - } - - try: - # Broadcast to all connected clients - count = await ws_manager.broadcast_json(payload) - logger.debug("Broadcasted event %s to %d clients", event.id[:8], count) - return count - except Exception as exc: - logger.error("Failed to broadcast event: %s", exc) - return 0 - - def broadcast_sync(self, event: EventLogEntry) -> None: - """Synchronous wrapper for broadcast. - - Use this from synchronous code - it schedules the async broadcast - in the event loop if one is running. - """ - try: - asyncio.get_running_loop() - # Schedule in background, don't wait - asyncio.create_task(self.broadcast(event)) - except RuntimeError: - # No event loop running, skip broadcast - pass - - -# Global singleton -event_broadcaster = EventBroadcaster() - - -# Event type to icon/emoji mapping -EVENT_ICONS = { - "task.created": "📝", - "task.bidding": "âŗ", - "task.assigned": "👤", - "task.started": "â–ļī¸", - "task.completed": "✅", - "task.failed": "❌", - "agent.joined": "đŸŸĸ", - "agent.left": "🔴", - "agent.status_changed": "🔄", - "bid.submitted": "💰", - "auction.closed": "🏁", - "tool.called": "🔧", - "tool.completed": "âš™ī¸", - "tool.failed": "đŸ’Ĩ", - "system.error": "âš ī¸", - "system.warning": "đŸ”ļ", - "system.info": "â„šī¸", - "error.captured": "🐛", - "bug_report.created": "📋", -} - -EVENT_LABELS = { - "task.created": "New task", - "task.bidding": "Bidding open", - "task.assigned": "Task assigned", - "task.started": "Task started", - "task.completed": "Task completed", - "task.failed": "Task failed", - "agent.joined": "Agent joined", - "agent.left": "Agent left", - "agent.status_changed": "Status changed", - "bid.submitted": "Bid submitted", - "auction.closed": "Auction closed", - "tool.called": "Tool called", - "tool.completed": "Tool completed", - "tool.failed": "Tool failed", - "system.error": "Error", - "system.warning": "Warning", - "system.info": "Info", - "error.captured": "Error captured", - "bug_report.created": "Bug report filed", -} - - -def get_event_icon(event_type: str) -> str: - """Get emoji icon for event type.""" - return EVENT_ICONS.get(event_type, "â€ĸ") - - -def get_event_label(event_type: str) -> str: - """Get human-readable label for event type.""" - return EVENT_LABELS.get(event_type, event_type) - - -def format_event_for_display(event: EventLogEntry) -> dict: - """Format event for display in activity feed. - - Returns dict with display-friendly fields. - """ - data = event.data or {} - - # Build description based on event type - description = "" - if event.event_type.value == "task.created": - desc = data.get("description", "") - description = desc[:60] + "..." if len(desc) > 60 else desc - elif event.event_type.value == "task.assigned": - agent = event.agent_id[:8] if event.agent_id else "unknown" - bid = data.get("bid_sats", "?") - description = f"to {agent} ({bid} sats)" - elif event.event_type.value == "bid.submitted": - bid = data.get("bid_sats", "?") - description = f"{bid} sats" - elif event.event_type.value == "agent.joined": - persona = data.get("persona_id", "") - description = f"Persona: {persona}" if persona else "New agent" - else: - # Generic: use any string data - for key in ["message", "reason", "description"]: - if key in data: - val = str(data[key]) - description = val[:60] + "..." if len(val) > 60 else val - break - - return { - "id": event.id, - "icon": get_event_icon(event.event_type.value), - "label": get_event_label(event.event_type.value), - "type": event.event_type.value, - "source": event.source, - "description": description, - "timestamp": event.timestamp, - "time_short": event.timestamp[11:19] if event.timestamp else "", - "task_id": event.task_id, - "agent_id": event.agent_id, - } diff --git a/src/timmy/agent_core/ollama_adapter.py b/src/timmy/agent_core/ollama_adapter.py deleted file mode 100644 index d1c8607..0000000 --- a/src/timmy/agent_core/ollama_adapter.py +++ /dev/null @@ -1,275 +0,0 @@ -"""Ollama-based implementation of TimAgent interface. - -This adapter wraps the existing Timmy Ollama agent to conform -to the substrate-agnostic TimAgent interface. It's the bridge -between the old codebase and the new embodiment-ready architecture. - -Usage: - from timmy.agent_core import AgentIdentity, Perception - from timmy.agent_core.ollama_adapter import OllamaAgent - - identity = AgentIdentity.generate("Timmy") - agent = OllamaAgent(identity) - - perception = Perception.text("Hello!") - memory = agent.perceive(perception) - action = agent.reason("How should I respond?", [memory]) - result = agent.act(action) -""" - -from typing import Any - -from timmy.agent import _resolve_model_with_fallback, create_timmy -from timmy.agent_core.interface import ( - Action, - ActionType, - AgentCapability, - AgentEffect, - AgentIdentity, - Communication, - Memory, - Perception, - PerceptionType, - TimAgent, -) - - -class OllamaAgent(TimAgent): - """TimAgent implementation using local Ollama LLM. - - This is the production agent for Timmy Time v2. It uses - Ollama for reasoning and SQLite for memory persistence. - - Capabilities: - - REASONING: LLM-based inference - - CODING: Code generation and analysis - - WRITING: Long-form content creation - - ANALYSIS: Data processing and insights - - COMMUNICATION: Multi-agent messaging - """ - - def __init__( - self, - identity: AgentIdentity, - model: str | None = None, - effect_log: str | None = None, - require_vision: bool = False, - ) -> None: - """Initialize Ollama-based agent. - - Args: - identity: Agent identity (persistent across sessions) - model: Ollama model to use (auto-resolves with fallback) - effect_log: Path to log agent effects (optional) - require_vision: Whether to select a vision-capable model - """ - super().__init__(identity) - - # Resolve model with automatic pulling and fallback - resolved_model, is_fallback = _resolve_model_with_fallback( - requested_model=model, - require_vision=require_vision, - auto_pull=True, - ) - - if is_fallback: - import logging - - logging.getLogger(__name__).info( - "OllamaAdapter using fallback model %s", resolved_model - ) - - # Initialize underlying Ollama agent - self._timmy = create_timmy(model=resolved_model) - - # Set capabilities based on what Ollama can do - self._capabilities = { - AgentCapability.REASONING, - AgentCapability.CODING, - AgentCapability.WRITING, - AgentCapability.ANALYSIS, - AgentCapability.COMMUNICATION, - } - - # Effect logging for audit/replay - self._effect_log = AgentEffect(effect_log) if effect_log else None - - # Simple in-memory working memory (short term) - self._working_memory: list[Memory] = [] - self._max_working_memory = 10 - - def perceive(self, perception: Perception) -> Memory: - """Process perception and store in memory. - - For text perceptions, we might do light preprocessing - (summarization, keyword extraction) before storage. - """ - # Create memory from perception - memory = Memory( - id=f"mem_{len(self._working_memory)}", - content={ - "type": perception.type.name, - "data": perception.data, - "source": perception.source, - }, - created_at=perception.timestamp, - tags=self._extract_tags(perception), - ) - - # Add to working memory - self._working_memory.append(memory) - if len(self._working_memory) > self._max_working_memory: - self._working_memory.pop(0) # FIFO eviction - - # Log effect - if self._effect_log: - self._effect_log.log_perceive(perception, memory.id) - - return memory - - def reason(self, query: str, context: list[Memory]) -> Action: - """Use LLM to reason and decide on action. - - This is where the Ollama agent does its work. We construct - a prompt from the query and context, then interpret the - response as an action. - """ - # Build context string from memories - context_str = self._format_context(context) - - # Construct prompt - prompt = f"""You are {self._identity.name}, an AI assistant. - -Context from previous interactions: -{context_str} - -Current query: {query} - -Respond naturally and helpfully.""" - - # Run LLM inference - result = self._timmy.run(prompt, stream=False) - response_text = result.content if hasattr(result, "content") else str(result) - - # Create text response action - action = Action.respond(response_text, confidence=0.9) - - # Log effect - if self._effect_log: - self._effect_log.log_reason(query, action.type) - - return action - - def act(self, action: Action) -> Any: - """Execute action in the Ollama substrate. - - For text actions, the "execution" is just returning the - text (already generated during reasoning). For future - action types (MOVE, SPEAK), this would trigger the - appropriate Ollama tool calls. - """ - result = None - - if action.type == ActionType.TEXT: - result = action.payload - elif action.type == ActionType.SPEAK: - # Would call TTS here - result = {"spoken": action.payload, "tts_engine": "pyttsx3"} - elif action.type == ActionType.CALL: - # Would make API call - result = {"status": "not_implemented", "payload": action.payload} - else: - result = {"error": f"Action type {action.type} not supported by OllamaAgent"} - - # Log effect - if self._effect_log: - self._effect_log.log_act(action, result) - - return result - - def remember(self, memory: Memory) -> None: - """Store memory in working memory. - - Adds the memory to the sliding window and bumps its importance. - """ - memory.touch() - - # Deduplicate by id - self._working_memory = [m for m in self._working_memory if m.id != memory.id] - self._working_memory.append(memory) - - # Evict oldest if over capacity - if len(self._working_memory) > self._max_working_memory: - self._working_memory.pop(0) - - def recall(self, query: str, limit: int = 5) -> list[Memory]: - """Retrieve relevant memories. - - Simple keyword matching for now. Future: vector similarity. - """ - query_lower = query.lower() - scored = [] - - for memory in self._working_memory: - score = 0 - content_str = str(memory.content).lower() - - # Simple keyword overlap - query_words = set(query_lower.split()) - content_words = set(content_str.split()) - overlap = len(query_words & content_words) - score += overlap - - # Boost recent memories - score += memory.importance - - scored.append((score, memory)) - - # Sort by score descending - scored.sort(key=lambda x: x[0], reverse=True) - - # Return top N - return [m for _, m in scored[:limit]] - - def communicate(self, message: Communication) -> bool: - """Send message to another agent. - - Swarm comms removed — inter-agent communication will be handled - by the unified brain memory layer. - """ - return False - - def _extract_tags(self, perception: Perception) -> list[str]: - """Extract searchable tags from perception.""" - tags = [perception.type.name, perception.source] - - if perception.type == PerceptionType.TEXT: - # Simple keyword extraction - text = str(perception.data).lower() - keywords = ["code", "bug", "help", "question", "task"] - for kw in keywords: - if kw in text: - tags.append(kw) - - return tags - - def _format_context(self, memories: list[Memory]) -> str: - """Format memories into context string for prompt.""" - if not memories: - return "No previous context." - - parts = [] - for mem in memories[-5:]: # Last 5 memories - if isinstance(mem.content, dict): - data = mem.content.get("data", "") - parts.append(f"- {data}") - else: - parts.append(f"- {mem.content}") - - return "\n".join(parts) - - def get_effect_log(self) -> list[dict] | None: - """Export effect log if logging is enabled.""" - if self._effect_log: - return self._effect_log.export() - return None diff --git a/src/timmy_serve/inter_agent.py b/src/timmy_serve/inter_agent.py deleted file mode 100644 index 7c5308a..0000000 --- a/src/timmy_serve/inter_agent.py +++ /dev/null @@ -1,105 +0,0 @@ -"""Agent-to-agent messaging for the Timmy serve layer. - -Provides a simple message-passing interface that allows agents to -communicate with each other. Messages are routed through the swarm -comms layer when available, or stored in an in-memory queue for -single-process operation. -""" - -import logging -import uuid -from collections import deque -from dataclasses import dataclass, field -from datetime import UTC, datetime - -logger = logging.getLogger(__name__) - - -@dataclass -class AgentMessage: - id: str = field(default_factory=lambda: str(uuid.uuid4())) - from_agent: str = "" - to_agent: str = "" - content: str = "" - message_type: str = "text" # text | command | response | error - timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat()) - replied: bool = False - - -class InterAgentMessenger: - """In-memory message queue for agent-to-agent communication.""" - - def __init__(self, max_queue_size: int = 1000) -> None: - self._queues: dict[str, deque[AgentMessage]] = {} - self._max_size = max_queue_size - self._all_messages: list[AgentMessage] = [] - - def send( - self, - from_agent: str, - to_agent: str, - content: str, - message_type: str = "text", - ) -> AgentMessage: - """Send a message from one agent to another.""" - msg = AgentMessage( - from_agent=from_agent, - to_agent=to_agent, - content=content, - message_type=message_type, - ) - queue = self._queues.setdefault(to_agent, deque(maxlen=self._max_size)) - queue.append(msg) - self._all_messages.append(msg) - logger.info( - "Message %s → %s: %s (%s)", - from_agent, - to_agent, - content[:50], - message_type, - ) - return msg - - def receive(self, agent_id: str, limit: int = 10) -> list[AgentMessage]: - """Receive pending messages for an agent (FIFO, non-destructive peek).""" - queue = self._queues.get(agent_id, deque()) - return list(queue)[:limit] - - def pop(self, agent_id: str) -> AgentMessage | None: - """Pop the oldest message from an agent's queue.""" - queue = self._queues.get(agent_id, deque()) - if not queue: - return None - return queue.popleft() - - def pop_all(self, agent_id: str) -> list[AgentMessage]: - """Pop all pending messages for an agent.""" - queue = self._queues.get(agent_id, deque()) - messages = list(queue) - queue.clear() - return messages - - def broadcast(self, from_agent: str, content: str, message_type: str = "text") -> int: - """Broadcast a message to all known agents. Returns count sent.""" - count = 0 - for agent_id in list(self._queues.keys()): - if agent_id != from_agent: - self.send(from_agent, agent_id, content, message_type) - count += 1 - return count - - def history(self, limit: int = 50) -> list[AgentMessage]: - """Return recent message history across all agents.""" - return self._all_messages[-limit:] - - def clear(self, agent_id: str | None = None) -> None: - """Clear message queue(s).""" - if agent_id: - self._queues.pop(agent_id, None) - else: - self._queues.clear() - self._all_messages.clear() - - -# Module-level singleton -messenger = InterAgentMessenger() diff --git a/tests/infrastructure/test_event_broadcaster.py b/tests/infrastructure/test_event_broadcaster.py deleted file mode 100644 index 30909eb..0000000 --- a/tests/infrastructure/test_event_broadcaster.py +++ /dev/null @@ -1,191 +0,0 @@ -"""Tests for the event broadcaster (infrastructure.events.broadcaster).""" - -from dataclasses import dataclass -from enum import Enum -from unittest.mock import AsyncMock, MagicMock - -from infrastructure.events.broadcaster import ( - EVENT_ICONS, - EVENT_LABELS, - EventBroadcaster, - format_event_for_display, - get_event_icon, - get_event_label, -) - -# ── Fake EventLogEntry for testing ────────────────────────────────────────── - - -class FakeEventType(Enum): - TASK_CREATED = "task.created" - TASK_ASSIGNED = "task.assigned" - BID_SUBMITTED = "bid.submitted" - AGENT_JOINED = "agent.joined" - SYSTEM_INFO = "system.info" - - -@dataclass -class FakeEventLogEntry: - id: str = "evt-abc123" - event_type: FakeEventType = FakeEventType.TASK_CREATED - source: str = "test" - task_id: str = "task-1" - agent_id: str = "agent-1" - timestamp: str = "2026-03-06T12:00:00Z" - data: dict = None - - def __post_init__(self): - if self.data is None: - self.data = {} - - -class TestEventBroadcaster: - """Test EventBroadcaster class.""" - - def test_init(self): - b = EventBroadcaster() - assert b._ws_manager is None - - async def test_broadcast_no_ws_manager(self): - b = EventBroadcaster() - # _get_ws_manager returns None => returns 0 - count = await b.broadcast(FakeEventLogEntry()) - assert count == 0 - - async def test_broadcast_with_ws_manager(self): - b = EventBroadcaster() - mock_ws = MagicMock() - mock_ws.broadcast_json = AsyncMock(return_value=3) - b._ws_manager = mock_ws - - event = FakeEventLogEntry() - count = await b.broadcast(event) - assert count == 3 - mock_ws.broadcast_json.assert_awaited_once() - - # Verify payload structure - payload = mock_ws.broadcast_json.call_args[0][0] - assert payload["type"] == "event" - assert payload["payload"]["id"] == "evt-abc123" - assert payload["payload"]["event_type"] == "task.created" - - async def test_broadcast_ws_error_returns_zero(self): - b = EventBroadcaster() - mock_ws = MagicMock() - mock_ws.broadcast_json = AsyncMock(side_effect=RuntimeError("ws down")) - b._ws_manager = mock_ws - - count = await b.broadcast(FakeEventLogEntry()) - assert count == 0 - - def test_broadcast_sync_no_loop(self): - """broadcast_sync should not crash when no event loop is running.""" - b = EventBroadcaster() - # This should silently pass (no event loop) - b.broadcast_sync(FakeEventLogEntry()) - - -class TestEventIcons: - """Test icon/label lookup functions.""" - - def test_known_icon(self): - assert get_event_icon("task.created") == "📝" - assert get_event_icon("agent.joined") == "đŸŸĸ" - - def test_unknown_icon_returns_bullet(self): - assert get_event_icon("nonexistent") == "â€ĸ" - - def test_known_label(self): - assert get_event_label("task.created") == "New task" - assert get_event_label("task.failed") == "Task failed" - - def test_unknown_label_returns_type(self): - assert get_event_label("custom.event") == "custom.event" - - def test_all_icons_have_labels(self): - """Every icon key should also have a label.""" - for key in EVENT_ICONS: - assert key in EVENT_LABELS, f"Missing label for icon key: {key}" - - -class TestFormatEventForDisplay: - """Test format_event_for_display helper.""" - - def test_task_created_truncates_description(self): - event = FakeEventLogEntry( - event_type=FakeEventType.TASK_CREATED, - data={"description": "A" * 100}, - ) - result = format_event_for_display(event) - assert result["description"].endswith("...") - assert len(result["description"]) <= 63 - - def test_task_created_short_description(self): - event = FakeEventLogEntry( - event_type=FakeEventType.TASK_CREATED, - data={"description": "Short task"}, - ) - result = format_event_for_display(event) - assert result["description"] == "Short task" - - def test_task_assigned(self): - event = FakeEventLogEntry( - event_type=FakeEventType.TASK_ASSIGNED, - agent_id="agent-12345678-long", - data={"bid_sats": 500}, - ) - result = format_event_for_display(event) - assert "agent-12" in result["description"] - assert "500 sats" in result["description"] - - def test_bid_submitted(self): - event = FakeEventLogEntry( - event_type=FakeEventType.BID_SUBMITTED, - data={"bid_sats": 250}, - ) - result = format_event_for_display(event) - assert "250 sats" in result["description"] - - def test_agent_joined_with_persona(self): - event = FakeEventLogEntry( - event_type=FakeEventType.AGENT_JOINED, - data={"persona_id": "forge"}, - ) - result = format_event_for_display(event) - assert "forge" in result["description"] - - def test_agent_joined_no_persona(self): - event = FakeEventLogEntry( - event_type=FakeEventType.AGENT_JOINED, - data={}, - ) - result = format_event_for_display(event) - assert result["description"] == "New agent" - - def test_generic_event_with_message(self): - event = FakeEventLogEntry( - event_type=FakeEventType.SYSTEM_INFO, - data={"message": "All systems go"}, - ) - result = format_event_for_display(event) - assert result["description"] == "All systems go" - - def test_generic_event_no_data(self): - event = FakeEventLogEntry( - event_type=FakeEventType.SYSTEM_INFO, - data={}, - ) - result = format_event_for_display(event) - assert result["description"] == "" - - def test_output_structure(self): - event = FakeEventLogEntry() - result = format_event_for_display(event) - assert "id" in result - assert "icon" in result - assert "label" in result - assert "type" in result - assert "source" in result - assert "timestamp" in result - assert "time_short" in result - assert result["time_short"] == "12:00:00" diff --git a/tests/timmy/test_agent_core.py b/tests/timmy/test_agent_core.py index a2154e0..92f9d58 100644 --- a/tests/timmy/test_agent_core.py +++ b/tests/timmy/test_agent_core.py @@ -1,12 +1,10 @@ -"""Functional tests for agent_core — interface and ollama_adapter. +"""Functional tests for agent_core — interface. Covers the substrate-agnostic agent contract (data classes, enums, -factory methods, abstract enforcement) and the OllamaAgent adapter -(perceive → reason → act → remember → recall → communicate workflow). +factory methods, abstract enforcement). """ import uuid -from unittest.mock import MagicMock, patch import pytest @@ -343,160 +341,3 @@ class TestAgentEffect: assert len(log) == 3 types = [e["type"] for e in log] assert types == ["perceive", "reason", "act"] - - -# ── OllamaAgent functional tests ───────────────────────────────────────────── - - -class TestOllamaAgent: - """Functional tests for the OllamaAgent adapter. - - Uses mocked Ollama (create_timmy returns a mock) to exercise - the full perceive → reason → act → remember → recall pipeline. - """ - - @pytest.fixture - def agent(self): - with patch("timmy.agent_core.ollama_adapter.create_timmy") as mock_ct: - mock_timmy = MagicMock() - mock_run = MagicMock() - mock_run.content = "Mocked LLM response" - mock_timmy.run.return_value = mock_run - mock_ct.return_value = mock_timmy - - from timmy.agent_core.ollama_adapter import OllamaAgent - - identity = AgentIdentity.generate("TestTimmy") - return OllamaAgent(identity, effect_log="/tmp/test_effects") - - def test_capabilities_set(self, agent): - caps = agent.capabilities - assert AgentCapability.REASONING in caps - assert AgentCapability.CODING in caps - assert AgentCapability.WRITING in caps - assert AgentCapability.ANALYSIS in caps - assert AgentCapability.COMMUNICATION in caps - - def test_perceive_creates_memory(self, agent): - p = Perception.text("Hello Timmy") - mem = agent.perceive(p) - assert mem.id == "mem_0" - assert mem.content["data"] == "Hello Timmy" - assert mem.content["type"] == "TEXT" - - def test_perceive_extracts_tags(self, agent): - p = Perception.text("I need help with a bug in my code") - mem = agent.perceive(p) - assert "TEXT" in mem.tags - assert "user" in mem.tags - assert "help" in mem.tags - assert "bug" in mem.tags - assert "code" in mem.tags - - def test_perceive_fifo_eviction(self, agent): - for i in range(12): - agent.perceive(Perception.text(f"msg {i}")) - assert len(agent._working_memory) == 10 - # oldest two evicted - assert agent._working_memory[0].content["data"] == "msg 2" - - def test_reason_returns_action(self, agent): - mem = agent.perceive(Perception.text("context")) - action = agent.reason("What should I do?", [mem]) - assert action.type == ActionType.TEXT - assert action.payload == "Mocked LLM response" - assert action.confidence == 0.9 - - def test_act_text(self, agent): - action = Action.respond("Hello!") - result = agent.act(action) - assert result == "Hello!" - - def test_act_speak(self, agent): - action = Action(type=ActionType.SPEAK, payload="Speak this") - result = agent.act(action) - assert result["spoken"] == "Speak this" - assert result["tts_engine"] == "pyttsx3" - - def test_act_call(self, agent): - action = Action(type=ActionType.CALL, payload={"url": "http://example.com"}) - result = agent.act(action) - assert result["status"] == "not_implemented" - - def test_act_unsupported(self, agent): - action = Action(type=ActionType.MOVE, payload=(0, 0, 0)) - result = agent.act(action) - assert "error" in result - - def test_remember_stores_and_deduplicates(self, agent): - mem = agent.perceive(Perception.text("original")) - assert len(agent._working_memory) == 1 - agent.remember(mem) - assert len(agent._working_memory) == 1 # deduplicated - assert mem.access_count == 1 - - def test_remember_evicts_on_overflow(self, agent): - for i in range(10): - agent.perceive(Perception.text(f"fill {i}")) - extra = Memory(id="extra", content="overflow", created_at="now") - agent.remember(extra) - assert len(agent._working_memory) == 10 - # first memory evicted - assert agent._working_memory[-1].id == "extra" - - def test_recall_keyword_matching(self, agent): - agent.perceive(Perception.text("python code review")) - agent.perceive(Perception.text("weather forecast")) - agent.perceive(Perception.text("python bug fix")) - results = agent.recall("python", limit=5) - # All memories returned (recall returns up to limit) - assert len(results) == 3 - # Memories containing "python" should score higher and appear first - first_content = str(results[0].content) - assert "python" in first_content.lower() - - def test_recall_respects_limit(self, agent): - for i in range(10): - agent.perceive(Perception.text(f"memory {i}")) - results = agent.recall("memory", limit=3) - assert len(results) == 3 - - def test_communicate_returns_false_comms_removed(self, agent): - """Swarm comms removed — communicate() always returns False until brain wired.""" - msg = Communication(sender="Timmy", recipient="Echo", content="hi") - assert agent.communicate(msg) is False - - def test_effect_logging_full_workflow(self, agent): - p = Perception.text("test input") - mem = agent.perceive(p) - action = agent.reason("respond", [mem]) - agent.act(action) - log = agent.get_effect_log() - assert len(log) == 3 - assert log[0]["type"] == "perceive" - assert log[1]["type"] == "reason" - assert log[2]["type"] == "act" - - def test_no_effect_log_when_disabled(self): - with patch("timmy.agent_core.ollama_adapter.create_timmy") as mock_ct: - mock_timmy = MagicMock() - mock_ct.return_value = mock_timmy - from timmy.agent_core.ollama_adapter import OllamaAgent - - identity = AgentIdentity.generate("NoLog") - agent = OllamaAgent(identity) # no effect_log - assert agent.get_effect_log() is None - - def test_format_context_empty(self, agent): - result = agent._format_context([]) - assert result == "No previous context." - - def test_format_context_with_dict_content(self, agent): - mem = Memory(id="m", content={"data": "hello"}, created_at="now") - result = agent._format_context([mem]) - assert "hello" in result - - def test_format_context_with_string_content(self, agent): - mem = Memory(id="m", content="plain string", created_at="now") - result = agent._format_context([mem]) - assert "plain string" in result diff --git a/tests/timmy_serve/test_inter_agent.py b/tests/timmy_serve/test_inter_agent.py deleted file mode 100644 index 524042b..0000000 --- a/tests/timmy_serve/test_inter_agent.py +++ /dev/null @@ -1,112 +0,0 @@ -"""Tests for inter-agent messaging system.""" - -from timmy_serve.inter_agent import AgentMessage, InterAgentMessenger, messenger - - -class TestAgentMessage: - def test_defaults(self): - msg = AgentMessage() - assert msg.from_agent == "" - assert msg.to_agent == "" - assert msg.content == "" - assert msg.message_type == "text" - assert msg.replied is False - assert msg.id # UUID should be generated - assert msg.timestamp # timestamp should be generated - - def test_custom_fields(self): - msg = AgentMessage( - from_agent="seer", - to_agent="forge", - content="hello", - message_type="command", - ) - assert msg.from_agent == "seer" - assert msg.to_agent == "forge" - assert msg.content == "hello" - assert msg.message_type == "command" - - -class TestInterAgentMessenger: - def setup_method(self): - self.m = InterAgentMessenger(max_queue_size=100) - - def test_send_and_receive(self): - msg = self.m.send("seer", "forge", "build this") - assert msg.from_agent == "seer" - assert msg.to_agent == "forge" - received = self.m.receive("forge") - assert len(received) == 1 - assert received[0].content == "build this" - - def test_receive_empty(self): - assert self.m.receive("nobody") == [] - - def test_pop(self): - self.m.send("a", "b", "first") - self.m.send("a", "b", "second") - msg = self.m.pop("b") - assert msg.content == "first" - msg2 = self.m.pop("b") - assert msg2.content == "second" - assert self.m.pop("b") is None - - def test_pop_empty(self): - assert self.m.pop("nobody") is None - - def test_pop_all(self): - self.m.send("a", "b", "one") - self.m.send("a", "b", "two") - msgs = self.m.pop_all("b") - assert len(msgs) == 2 - assert self.m.receive("b") == [] - - def test_pop_all_empty(self): - assert self.m.pop_all("nobody") == [] - - def test_broadcast(self): - # Set up queues for agents - self.m.send("setup", "forge", "init") - self.m.send("setup", "echo", "init") - self.m.pop_all("forge") - self.m.pop_all("echo") - - count = self.m.broadcast("seer", "alert") - assert count == 2 - assert len(self.m.receive("forge")) == 1 - assert len(self.m.receive("echo")) == 1 - - def test_broadcast_excludes_sender(self): - self.m.send("setup", "seer", "init") - self.m.pop_all("seer") - count = self.m.broadcast("seer", "hello") - assert count == 0 # no other agents - - def test_history(self): - self.m.send("a", "b", "msg1") - self.m.send("b", "a", "msg2") - history = self.m.history(limit=50) - assert len(history) == 2 - - def test_history_limit(self): - for i in range(10): - self.m.send("a", "b", f"msg{i}") - assert len(self.m.history(limit=3)) == 3 - - def test_clear_specific_agent(self): - self.m.send("a", "b", "hello") - self.m.send("a", "c", "world") - self.m.clear("b") - assert self.m.receive("b") == [] - assert len(self.m.receive("c")) == 1 - - def test_clear_all(self): - self.m.send("a", "b", "hello") - self.m.send("a", "c", "world") - self.m.clear() - assert self.m.receive("b") == [] - assert self.m.receive("c") == [] - assert self.m.history() == [] - - def test_module_singleton(self): - assert isinstance(messenger, InterAgentMessenger)