541 lines
20 KiB
Python
541 lines
20 KiB
Python
"""Tests for infrastructure.presence — presence state serializer."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from infrastructure.presence import (
|
|
DEFAULT_PIP_STATE,
|
|
_get_agents_online,
|
|
_get_familiar_state,
|
|
_get_memory_count,
|
|
_get_thinking_active,
|
|
_get_uptime_seconds,
|
|
_get_visitors,
|
|
produce_agent_state,
|
|
produce_bark,
|
|
produce_system_status,
|
|
produce_thought,
|
|
serialize_presence,
|
|
)
|
|
|
|
|
|
class TestSerializePresence:
|
|
"""Round-trip and edge-case tests for serialize_presence()."""
|
|
|
|
@pytest.fixture()
|
|
def full_presence(self):
|
|
"""A complete ADR-023 presence dict."""
|
|
return {
|
|
"version": 1,
|
|
"liveness": "2026-03-21T12:00:00Z",
|
|
"current_focus": "writing tests",
|
|
"mood": "focused",
|
|
"energy": 0.9,
|
|
"confidence": 0.85,
|
|
"active_threads": [
|
|
{"type": "thinking", "ref": "refactor presence", "status": "active"}
|
|
],
|
|
"recent_events": ["committed code"],
|
|
"concerns": ["test coverage"],
|
|
"familiar": {"name": "Pip", "state": "alert"},
|
|
}
|
|
|
|
def test_full_round_trip(self, full_presence):
|
|
"""All ADR-023 fields map to the expected camelCase keys."""
|
|
result = serialize_presence(full_presence)
|
|
|
|
assert result["timmyState"]["mood"] == "focused"
|
|
assert result["timmyState"]["activity"] == "writing tests"
|
|
assert result["timmyState"]["energy"] == 0.9
|
|
assert result["timmyState"]["confidence"] == 0.85
|
|
assert result["familiar"] == {"name": "Pip", "state": "alert"}
|
|
assert result["activeThreads"] == full_presence["active_threads"]
|
|
assert result["recentEvents"] == ["committed code"]
|
|
assert result["concerns"] == ["test coverage"]
|
|
assert result["visitorPresent"] is False
|
|
assert result["updatedAt"] == "2026-03-21T12:00:00Z"
|
|
assert result["version"] == 1
|
|
|
|
def test_defaults_on_empty_dict(self):
|
|
"""Missing fields fall back to safe defaults."""
|
|
result = serialize_presence({})
|
|
|
|
assert result["timmyState"]["mood"] == "calm"
|
|
assert result["timmyState"]["activity"] == "idle"
|
|
assert result["timmyState"]["energy"] == 0.5
|
|
assert result["timmyState"]["confidence"] == 0.7
|
|
assert result["familiar"] is None
|
|
assert result["activeThreads"] == []
|
|
assert result["recentEvents"] == []
|
|
assert result["concerns"] == []
|
|
assert result["visitorPresent"] is False
|
|
assert result["version"] == 1
|
|
# updatedAt should be an ISO timestamp string
|
|
assert "T" in result["updatedAt"]
|
|
|
|
def test_partial_presence(self):
|
|
"""Only some fields provided — others get defaults."""
|
|
result = serialize_presence({"mood": "excited", "energy": 0.3})
|
|
|
|
assert result["timmyState"]["mood"] == "excited"
|
|
assert result["timmyState"]["energy"] == 0.3
|
|
assert result["timmyState"]["confidence"] == 0.7 # default
|
|
assert result["activeThreads"] == [] # default
|
|
|
|
def test_return_type_is_dict(self, full_presence):
|
|
"""serialize_presence always returns a plain dict."""
|
|
result = serialize_presence(full_presence)
|
|
assert isinstance(result, dict)
|
|
assert isinstance(result["timmyState"], dict)
|
|
|
|
def test_visitor_present_always_false(self, full_presence):
|
|
"""visitorPresent is always False — set by the WS layer, not here."""
|
|
assert serialize_presence(full_presence)["visitorPresent"] is False
|
|
assert serialize_presence({})["visitorPresent"] is False
|
|
|
|
|
|
class TestProduceAgentState:
|
|
"""Tests for produce_agent_state() — Matrix agent_state message producer."""
|
|
|
|
@pytest.fixture()
|
|
def full_presence(self):
|
|
"""A presence dict with all agent_state-relevant fields."""
|
|
return {
|
|
"display_name": "Timmy",
|
|
"role": "companion",
|
|
"current_focus": "thinking about tests",
|
|
"mood": "focused",
|
|
"energy": 0.9,
|
|
"bark": "Running test suite...",
|
|
}
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time, full_presence):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_agent_state("timmy", full_presence)
|
|
|
|
assert result["type"] == "agent_state"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self, full_presence):
|
|
"""data dict contains all required presence fields."""
|
|
data = produce_agent_state("timmy", full_presence)["data"]
|
|
|
|
assert data["display_name"] == "Timmy"
|
|
assert data["role"] == "companion"
|
|
assert data["status"] == "thinking"
|
|
assert data["mood"] == "focused"
|
|
assert data["energy"] == 0.9
|
|
assert data["bark"] == "Running test suite..."
|
|
|
|
def test_defaults_on_empty_presence(self):
|
|
"""Missing fields get sensible defaults."""
|
|
result = produce_agent_state("timmy", {})
|
|
data = result["data"]
|
|
|
|
assert data["display_name"] == "Timmy" # agent_id.title()
|
|
assert data["role"] == "assistant"
|
|
assert data["status"] == "idle"
|
|
assert data["mood"] == "calm"
|
|
assert data["energy"] == 0.5
|
|
assert data["bark"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_agent_state("timmy", {})
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
@pytest.mark.parametrize(
|
|
("focus", "expected_status"),
|
|
[
|
|
("thinking about code", "thinking"),
|
|
("speaking to user", "speaking"),
|
|
("talking with agent", "speaking"),
|
|
("idle", "idle"),
|
|
("", "idle"),
|
|
("writing tests", "online"),
|
|
("reviewing PR", "online"),
|
|
],
|
|
)
|
|
def test_status_derivation(self, focus, expected_status):
|
|
"""current_focus maps to the correct Matrix status."""
|
|
data = produce_agent_state("t", {"current_focus": focus})["data"]
|
|
assert data["status"] == expected_status
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_agent_state("spark", {})
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_display_name_from_agent_id(self):
|
|
"""When display_name is missing, it's derived from agent_id.title()."""
|
|
data = produce_agent_state("spark", {})["data"]
|
|
assert data["display_name"] == "Spark"
|
|
|
|
def test_familiar_in_data(self):
|
|
"""agent_state.data includes familiar field with required keys."""
|
|
data = produce_agent_state("timmy", {})["data"]
|
|
|
|
assert "familiar" in data
|
|
familiar = data["familiar"]
|
|
assert familiar["name"] == "Pip"
|
|
assert "mood" in familiar
|
|
assert "energy" in familiar
|
|
assert familiar["color"] == "0x00b450"
|
|
assert familiar["trail_color"] == "0xdaa520"
|
|
|
|
def test_familiar_has_all_required_fields(self):
|
|
"""familiar dict contains all required fields per acceptance criteria."""
|
|
data = produce_agent_state("timmy", {})["data"]
|
|
familiar = data["familiar"]
|
|
|
|
required_fields = {"name", "mood", "energy", "color", "trail_color"}
|
|
assert set(familiar.keys()) >= required_fields
|
|
|
|
|
|
class TestFamiliarState:
|
|
"""Tests for _get_familiar_state() — Pip familiar state retrieval."""
|
|
|
|
def test_get_familiar_state_returns_dict(self):
|
|
"""_get_familiar_state returns a dict."""
|
|
result = _get_familiar_state()
|
|
assert isinstance(result, dict)
|
|
|
|
def test_get_familiar_state_has_required_fields(self):
|
|
"""Result contains name, mood, energy, color, trail_color."""
|
|
result = _get_familiar_state()
|
|
|
|
assert result["name"] == "Pip"
|
|
assert "mood" in result
|
|
assert isinstance(result["energy"], (int, float))
|
|
assert result["color"] == "0x00b450"
|
|
assert result["trail_color"] == "0xdaa520"
|
|
|
|
def test_default_pip_state_constant(self):
|
|
"""DEFAULT_PIP_STATE has expected values."""
|
|
assert DEFAULT_PIP_STATE["name"] == "Pip"
|
|
assert DEFAULT_PIP_STATE["mood"] == "sleepy"
|
|
assert DEFAULT_PIP_STATE["energy"] == 0.5
|
|
assert DEFAULT_PIP_STATE["color"] == "0x00b450"
|
|
assert DEFAULT_PIP_STATE["trail_color"] == "0xdaa520"
|
|
|
|
@patch("infrastructure.presence.logger")
|
|
def test_get_familiar_state_fallback_on_exception(self, mock_logger):
|
|
"""When familiar module raises, falls back to default and logs warning."""
|
|
# Patch inside the function where pip_familiar is imported
|
|
with patch("timmy.familiar.pip_familiar.snapshot") as mock_snapshot:
|
|
mock_snapshot.side_effect = RuntimeError("Pip is napping")
|
|
result = _get_familiar_state()
|
|
|
|
assert result["name"] == "Pip"
|
|
assert result["mood"] == "sleepy"
|
|
mock_logger.warning.assert_called_once()
|
|
assert "Pip is napping" in str(mock_logger.warning.call_args)
|
|
|
|
|
|
class TestProduceBark:
|
|
"""Tests for produce_bark() — Matrix bark message producer."""
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_bark("timmy", "Hello world!")
|
|
|
|
assert result["type"] == "bark"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self):
|
|
"""data dict contains text, reply_to, and style."""
|
|
result = produce_bark("timmy", "Hello world!", reply_to="msg-123", style="shout")
|
|
data = result["data"]
|
|
|
|
assert data["text"] == "Hello world!"
|
|
assert data["reply_to"] == "msg-123"
|
|
assert data["style"] == "shout"
|
|
|
|
def test_default_style_is_speech(self):
|
|
"""When style is not provided, defaults to 'speech'."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert result["data"]["style"] == "speech"
|
|
|
|
def test_default_reply_to_is_none(self):
|
|
"""When reply_to is not provided, defaults to None."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert result["data"]["reply_to"] is None
|
|
|
|
def test_text_truncated_to_280_chars(self):
|
|
"""Text longer than 280 chars is truncated."""
|
|
long_text = "A" * 500
|
|
result = produce_bark("timmy", long_text)
|
|
assert len(result["data"]["text"]) == 280
|
|
assert result["data"]["text"] == "A" * 280
|
|
|
|
def test_text_exactly_280_chars_not_truncated(self):
|
|
"""Text exactly 280 chars is not truncated."""
|
|
text = "B" * 280
|
|
result = produce_bark("timmy", text)
|
|
assert result["data"]["text"] == text
|
|
|
|
def test_text_shorter_than_280_not_padded(self):
|
|
"""Text shorter than 280 chars is not padded."""
|
|
result = produce_bark("timmy", "Short")
|
|
assert result["data"]["text"] == "Short"
|
|
|
|
@pytest.mark.parametrize(
|
|
("style", "expected_style"),
|
|
[
|
|
("speech", "speech"),
|
|
("thought", "thought"),
|
|
("whisper", "whisper"),
|
|
("shout", "shout"),
|
|
],
|
|
)
|
|
def test_valid_styles_preserved(self, style, expected_style):
|
|
"""Valid style values are preserved."""
|
|
result = produce_bark("timmy", "Hello!", style=style)
|
|
assert result["data"]["style"] == expected_style
|
|
|
|
@pytest.mark.parametrize(
|
|
"invalid_style",
|
|
["yell", "scream", "", "SPEECH", "Speech", None, 123],
|
|
)
|
|
def test_invalid_style_defaults_to_speech(self, invalid_style):
|
|
"""Invalid style values fall back to 'speech'."""
|
|
result = produce_bark("timmy", "Hello!", style=invalid_style)
|
|
assert result["data"]["style"] == "speech"
|
|
|
|
def test_empty_text_handled(self):
|
|
"""Empty text is handled gracefully."""
|
|
result = produce_bark("timmy", "")
|
|
assert result["data"]["text"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_bark("spark", "Hello!")
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_with_all_parameters(self):
|
|
"""Full parameter set produces expected output."""
|
|
result = produce_bark(
|
|
agent_id="timmy",
|
|
text="Running test suite...",
|
|
reply_to="parent-msg-456",
|
|
style="thought",
|
|
)
|
|
|
|
assert result["type"] == "bark"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["data"]["text"] == "Running test suite..."
|
|
assert result["data"]["reply_to"] == "parent-msg-456"
|
|
assert result["data"]["style"] == "thought"
|
|
|
|
|
|
class TestProduceThought:
|
|
"""Tests for produce_thought() — Matrix thought message producer."""
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_thought("timmy", "Considering the options...", 42)
|
|
|
|
assert result["type"] == "thought"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self):
|
|
"""data dict contains text, thought_id, and chain_id."""
|
|
result = produce_thought("timmy", "Considering...", 42, chain_id="chain-123")
|
|
data = result["data"]
|
|
|
|
assert data["text"] == "Considering..."
|
|
assert data["thought_id"] == 42
|
|
assert data["chain_id"] == "chain-123"
|
|
|
|
def test_default_chain_id_is_none(self):
|
|
"""When chain_id is not provided, defaults to None."""
|
|
result = produce_thought("timmy", "Thinking...", 1)
|
|
assert result["data"]["chain_id"] is None
|
|
|
|
def test_text_truncated_to_500_chars(self):
|
|
"""Text longer than 500 chars is truncated."""
|
|
long_text = "A" * 600
|
|
result = produce_thought("timmy", long_text, 1)
|
|
assert len(result["data"]["text"]) == 500
|
|
assert result["data"]["text"] == "A" * 500
|
|
|
|
def test_text_exactly_500_chars_not_truncated(self):
|
|
"""Text exactly 500 chars is not truncated."""
|
|
text = "B" * 500
|
|
result = produce_thought("timmy", text, 1)
|
|
assert result["data"]["text"] == text
|
|
|
|
def test_text_shorter_than_500_not_padded(self):
|
|
"""Text shorter than 500 chars is not padded."""
|
|
result = produce_thought("timmy", "Short thought", 1)
|
|
assert result["data"]["text"] == "Short thought"
|
|
|
|
def test_empty_text_handled(self):
|
|
"""Empty text is handled gracefully."""
|
|
result = produce_thought("timmy", "", 1)
|
|
assert result["data"]["text"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_thought("timmy", "Hello!", 1)
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_thought("spark", "Hello!", 1)
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_thought_id_passed_through(self):
|
|
"""thought_id appears in the data."""
|
|
result = produce_thought("timmy", "Hello!", 999)
|
|
assert result["data"]["thought_id"] == 999
|
|
|
|
def test_with_all_parameters(self):
|
|
"""Full parameter set produces expected output."""
|
|
result = produce_thought(
|
|
agent_id="timmy",
|
|
thought_text="Analyzing the situation...",
|
|
thought_id=42,
|
|
chain_id="chain-abc",
|
|
)
|
|
|
|
assert result["type"] == "thought"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["data"]["text"] == "Analyzing the situation..."
|
|
assert result["data"]["thought_id"] == 42
|
|
assert result["data"]["chain_id"] == "chain-abc"
|
|
|
|
|
|
class TestProduceSystemStatus:
|
|
"""Tests for produce_system_status() — Matrix system_status message producer."""
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time):
|
|
"""Returns dict with type, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_system_status()
|
|
|
|
assert result["type"] == "system_status"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_has_required_fields(self):
|
|
"""data dict contains all required system status fields."""
|
|
result = produce_system_status()
|
|
data = result["data"]
|
|
|
|
assert "agents_online" in data
|
|
assert "visitors" in data
|
|
assert "uptime_seconds" in data
|
|
assert "thinking_active" in data
|
|
assert "memory_count" in data
|
|
|
|
def test_data_field_types(self):
|
|
"""All data fields have correct types."""
|
|
result = produce_system_status()
|
|
data = result["data"]
|
|
|
|
assert isinstance(data["agents_online"], int)
|
|
assert isinstance(data["visitors"], int)
|
|
assert isinstance(data["uptime_seconds"], int)
|
|
assert isinstance(data["thinking_active"], bool)
|
|
assert isinstance(data["memory_count"], int)
|
|
|
|
def test_agents_online_is_non_negative(self):
|
|
"""agents_online is never negative."""
|
|
result = produce_system_status()
|
|
assert result["data"]["agents_online"] >= 0
|
|
|
|
def test_visitors_is_non_negative(self):
|
|
"""visitors is never negative."""
|
|
result = produce_system_status()
|
|
assert result["data"]["visitors"] >= 0
|
|
|
|
def test_uptime_seconds_is_non_negative(self):
|
|
"""uptime_seconds is never negative."""
|
|
result = produce_system_status()
|
|
assert result["data"]["uptime_seconds"] >= 0
|
|
|
|
def test_memory_count_is_non_negative(self):
|
|
"""memory_count is never negative."""
|
|
result = produce_system_status()
|
|
assert result["data"]["memory_count"] >= 0
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_ts_is_unix_timestamp(self, mock_time):
|
|
"""ts should be an integer Unix timestamp."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_system_status()
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] == 1742529600
|
|
|
|
@patch("infrastructure.presence.logger")
|
|
def test_graceful_degradation_on_import_errors(self, mock_logger):
|
|
"""Function returns valid dict even when imports fail."""
|
|
# This test verifies the function handles failures gracefully
|
|
# by checking it always returns the expected structure
|
|
result = produce_system_status()
|
|
|
|
assert result["type"] == "system_status"
|
|
assert isinstance(result["data"], dict)
|
|
assert isinstance(result["ts"], int)
|
|
|
|
def test_returns_dict(self):
|
|
"""produce_system_status always returns a plain dict."""
|
|
result = produce_system_status()
|
|
assert isinstance(result, dict)
|
|
|
|
|
|
class TestSystemStatusHelpers:
|
|
"""Tests for the helper functions extracted from produce_system_status()."""
|
|
|
|
def test_get_agents_online_returns_int(self):
|
|
"""_get_agents_online returns a non-negative int."""
|
|
result = _get_agents_online()
|
|
assert isinstance(result, int)
|
|
assert result >= 0
|
|
|
|
def test_get_visitors_returns_int(self):
|
|
"""_get_visitors returns a non-negative int."""
|
|
result = _get_visitors()
|
|
assert isinstance(result, int)
|
|
assert result >= 0
|
|
|
|
def test_get_uptime_seconds_returns_int(self):
|
|
"""_get_uptime_seconds returns a non-negative int."""
|
|
result = _get_uptime_seconds()
|
|
assert isinstance(result, int)
|
|
assert result >= 0
|
|
|
|
def test_get_thinking_active_returns_bool(self):
|
|
"""_get_thinking_active returns a bool."""
|
|
result = _get_thinking_active()
|
|
assert isinstance(result, bool)
|
|
|
|
def test_get_memory_count_returns_int(self):
|
|
"""_get_memory_count returns a non-negative int."""
|
|
result = _get_memory_count()
|
|
assert isinstance(result, int)
|
|
assert result >= 0
|