forked from Rockachopa/Timmy-time-dashboard
360 lines
14 KiB
Python
360 lines
14 KiB
Python
"""Tests for infrastructure.presence — presence state serializer."""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from infrastructure.presence import (
|
|
produce_agent_state,
|
|
produce_bark,
|
|
produce_thought,
|
|
serialize_presence,
|
|
)
|
|
|
|
|
|
class TestSerializePresence:
|
|
"""Round-trip and edge-case tests for serialize_presence()."""
|
|
|
|
@pytest.fixture()
|
|
def full_presence(self):
|
|
"""A complete ADR-023 presence dict."""
|
|
return {
|
|
"version": 1,
|
|
"liveness": "2026-03-21T12:00:00Z",
|
|
"current_focus": "writing tests",
|
|
"mood": "focused",
|
|
"energy": 0.9,
|
|
"confidence": 0.85,
|
|
"active_threads": [
|
|
{"type": "thinking", "ref": "refactor presence", "status": "active"}
|
|
],
|
|
"recent_events": ["committed code"],
|
|
"concerns": ["test coverage"],
|
|
"familiar": {"name": "Pip", "state": "alert"},
|
|
}
|
|
|
|
def test_full_round_trip(self, full_presence):
|
|
"""All ADR-023 fields map to the expected camelCase keys."""
|
|
result = serialize_presence(full_presence)
|
|
|
|
assert result["timmyState"]["mood"] == "focused"
|
|
assert result["timmyState"]["activity"] == "writing tests"
|
|
assert result["timmyState"]["energy"] == 0.9
|
|
assert result["timmyState"]["confidence"] == 0.85
|
|
assert result["familiar"] == {"name": "Pip", "state": "alert"}
|
|
assert result["activeThreads"] == full_presence["active_threads"]
|
|
assert result["recentEvents"] == ["committed code"]
|
|
assert result["concerns"] == ["test coverage"]
|
|
assert result["visitorPresent"] is False
|
|
assert result["updatedAt"] == "2026-03-21T12:00:00Z"
|
|
assert result["version"] == 1
|
|
|
|
def test_defaults_on_empty_dict(self):
|
|
"""Missing fields fall back to safe defaults."""
|
|
result = serialize_presence({})
|
|
|
|
assert result["timmyState"]["mood"] == "calm"
|
|
assert result["timmyState"]["activity"] == "idle"
|
|
assert result["timmyState"]["energy"] == 0.5
|
|
assert result["timmyState"]["confidence"] == 0.7
|
|
assert result["familiar"] is None
|
|
assert result["activeThreads"] == []
|
|
assert result["recentEvents"] == []
|
|
assert result["concerns"] == []
|
|
assert result["visitorPresent"] is False
|
|
assert result["version"] == 1
|
|
# updatedAt should be an ISO timestamp string
|
|
assert "T" in result["updatedAt"]
|
|
|
|
def test_partial_presence(self):
|
|
"""Only some fields provided — others get defaults."""
|
|
result = serialize_presence({"mood": "excited", "energy": 0.3})
|
|
|
|
assert result["timmyState"]["mood"] == "excited"
|
|
assert result["timmyState"]["energy"] == 0.3
|
|
assert result["timmyState"]["confidence"] == 0.7 # default
|
|
assert result["activeThreads"] == [] # default
|
|
|
|
def test_return_type_is_dict(self, full_presence):
|
|
"""serialize_presence always returns a plain dict."""
|
|
result = serialize_presence(full_presence)
|
|
assert isinstance(result, dict)
|
|
assert isinstance(result["timmyState"], dict)
|
|
|
|
def test_visitor_present_always_false(self, full_presence):
|
|
"""visitorPresent is always False — set by the WS layer, not here."""
|
|
assert serialize_presence(full_presence)["visitorPresent"] is False
|
|
assert serialize_presence({})["visitorPresent"] is False
|
|
|
|
|
|
class TestProduceAgentState:
|
|
"""Tests for produce_agent_state() — Matrix agent_state message producer."""
|
|
|
|
@pytest.fixture()
|
|
def full_presence(self):
|
|
"""A presence dict with all agent_state-relevant fields."""
|
|
return {
|
|
"display_name": "Timmy",
|
|
"role": "companion",
|
|
"current_focus": "thinking about tests",
|
|
"mood": "focused",
|
|
"energy": 0.9,
|
|
"bark": "Running test suite...",
|
|
}
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time, full_presence):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_agent_state("timmy", full_presence)
|
|
|
|
assert result["type"] == "agent_state"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self, full_presence):
|
|
"""data dict contains all required presence fields."""
|
|
data = produce_agent_state("timmy", full_presence)["data"]
|
|
|
|
assert data["display_name"] == "Timmy"
|
|
assert data["role"] == "companion"
|
|
assert data["status"] == "thinking"
|
|
assert data["mood"] == "focused"
|
|
assert data["energy"] == 0.9
|
|
assert data["bark"] == "Running test suite..."
|
|
|
|
def test_defaults_on_empty_presence(self):
|
|
"""Missing fields get sensible defaults."""
|
|
result = produce_agent_state("timmy", {})
|
|
data = result["data"]
|
|
|
|
assert data["display_name"] == "Timmy" # agent_id.title()
|
|
assert data["role"] == "assistant"
|
|
assert data["status"] == "idle"
|
|
assert data["mood"] == "calm"
|
|
assert data["energy"] == 0.5
|
|
assert data["bark"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_agent_state("timmy", {})
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
@pytest.mark.parametrize(
|
|
("focus", "expected_status"),
|
|
[
|
|
("thinking about code", "thinking"),
|
|
("speaking to user", "speaking"),
|
|
("talking with agent", "speaking"),
|
|
("idle", "idle"),
|
|
("", "idle"),
|
|
("writing tests", "online"),
|
|
("reviewing PR", "online"),
|
|
],
|
|
)
|
|
def test_status_derivation(self, focus, expected_status):
|
|
"""current_focus maps to the correct Matrix status."""
|
|
data = produce_agent_state("t", {"current_focus": focus})["data"]
|
|
assert data["status"] == expected_status
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_agent_state("spark", {})
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_display_name_from_agent_id(self):
|
|
"""When display_name is missing, it's derived from agent_id.title()."""
|
|
data = produce_agent_state("spark", {})["data"]
|
|
assert data["display_name"] == "Spark"
|
|
|
|
|
|
class TestProduceBark:
|
|
"""Tests for produce_bark() — Matrix bark message producer."""
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_bark("timmy", "Hello world!")
|
|
|
|
assert result["type"] == "bark"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self):
|
|
"""data dict contains text, reply_to, and style."""
|
|
result = produce_bark("timmy", "Hello world!", reply_to="msg-123", style="shout")
|
|
data = result["data"]
|
|
|
|
assert data["text"] == "Hello world!"
|
|
assert data["reply_to"] == "msg-123"
|
|
assert data["style"] == "shout"
|
|
|
|
def test_default_style_is_speech(self):
|
|
"""When style is not provided, defaults to 'speech'."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert result["data"]["style"] == "speech"
|
|
|
|
def test_default_reply_to_is_none(self):
|
|
"""When reply_to is not provided, defaults to None."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert result["data"]["reply_to"] is None
|
|
|
|
def test_text_truncated_to_280_chars(self):
|
|
"""Text longer than 280 chars is truncated."""
|
|
long_text = "A" * 500
|
|
result = produce_bark("timmy", long_text)
|
|
assert len(result["data"]["text"]) == 280
|
|
assert result["data"]["text"] == "A" * 280
|
|
|
|
def test_text_exactly_280_chars_not_truncated(self):
|
|
"""Text exactly 280 chars is not truncated."""
|
|
text = "B" * 280
|
|
result = produce_bark("timmy", text)
|
|
assert result["data"]["text"] == text
|
|
|
|
def test_text_shorter_than_280_not_padded(self):
|
|
"""Text shorter than 280 chars is not padded."""
|
|
result = produce_bark("timmy", "Short")
|
|
assert result["data"]["text"] == "Short"
|
|
|
|
@pytest.mark.parametrize(
|
|
("style", "expected_style"),
|
|
[
|
|
("speech", "speech"),
|
|
("thought", "thought"),
|
|
("whisper", "whisper"),
|
|
("shout", "shout"),
|
|
],
|
|
)
|
|
def test_valid_styles_preserved(self, style, expected_style):
|
|
"""Valid style values are preserved."""
|
|
result = produce_bark("timmy", "Hello!", style=style)
|
|
assert result["data"]["style"] == expected_style
|
|
|
|
@pytest.mark.parametrize(
|
|
"invalid_style",
|
|
["yell", "scream", "", "SPEECH", "Speech", None, 123],
|
|
)
|
|
def test_invalid_style_defaults_to_speech(self, invalid_style):
|
|
"""Invalid style values fall back to 'speech'."""
|
|
result = produce_bark("timmy", "Hello!", style=invalid_style)
|
|
assert result["data"]["style"] == "speech"
|
|
|
|
def test_empty_text_handled(self):
|
|
"""Empty text is handled gracefully."""
|
|
result = produce_bark("timmy", "")
|
|
assert result["data"]["text"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_bark("timmy", "Hello!")
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_bark("spark", "Hello!")
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_with_all_parameters(self):
|
|
"""Full parameter set produces expected output."""
|
|
result = produce_bark(
|
|
agent_id="timmy",
|
|
text="Running test suite...",
|
|
reply_to="parent-msg-456",
|
|
style="thought",
|
|
)
|
|
|
|
assert result["type"] == "bark"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["data"]["text"] == "Running test suite..."
|
|
assert result["data"]["reply_to"] == "parent-msg-456"
|
|
assert result["data"]["style"] == "thought"
|
|
|
|
|
|
class TestProduceThought:
|
|
"""Tests for produce_thought() — Matrix thought message producer."""
|
|
|
|
@patch("infrastructure.presence.time")
|
|
def test_full_message_structure(self, mock_time):
|
|
"""Returns dict with type, agent_id, data, and ts keys."""
|
|
mock_time.time.return_value = 1742529600
|
|
result = produce_thought("timmy", "Considering the options...", 42)
|
|
|
|
assert result["type"] == "thought"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["ts"] == 1742529600
|
|
assert isinstance(result["data"], dict)
|
|
|
|
def test_data_fields(self):
|
|
"""data dict contains text, thought_id, and chain_id."""
|
|
result = produce_thought("timmy", "Considering...", 42, chain_id="chain-123")
|
|
data = result["data"]
|
|
|
|
assert data["text"] == "Considering..."
|
|
assert data["thought_id"] == 42
|
|
assert data["chain_id"] == "chain-123"
|
|
|
|
def test_default_chain_id_is_none(self):
|
|
"""When chain_id is not provided, defaults to None."""
|
|
result = produce_thought("timmy", "Thinking...", 1)
|
|
assert result["data"]["chain_id"] is None
|
|
|
|
def test_text_truncated_to_500_chars(self):
|
|
"""Text longer than 500 chars is truncated."""
|
|
long_text = "A" * 600
|
|
result = produce_thought("timmy", long_text, 1)
|
|
assert len(result["data"]["text"]) == 500
|
|
assert result["data"]["text"] == "A" * 500
|
|
|
|
def test_text_exactly_500_chars_not_truncated(self):
|
|
"""Text exactly 500 chars is not truncated."""
|
|
text = "B" * 500
|
|
result = produce_thought("timmy", text, 1)
|
|
assert result["data"]["text"] == text
|
|
|
|
def test_text_shorter_than_500_not_padded(self):
|
|
"""Text shorter than 500 chars is not padded."""
|
|
result = produce_thought("timmy", "Short thought", 1)
|
|
assert result["data"]["text"] == "Short thought"
|
|
|
|
def test_empty_text_handled(self):
|
|
"""Empty text is handled gracefully."""
|
|
result = produce_thought("timmy", "", 1)
|
|
assert result["data"]["text"] == ""
|
|
|
|
def test_ts_is_unix_timestamp(self):
|
|
"""ts should be an integer Unix timestamp."""
|
|
result = produce_thought("timmy", "Hello!", 1)
|
|
assert isinstance(result["ts"], int)
|
|
assert result["ts"] > 0
|
|
|
|
def test_agent_id_passed_through(self):
|
|
"""agent_id appears in the top-level message."""
|
|
result = produce_thought("spark", "Hello!", 1)
|
|
assert result["agent_id"] == "spark"
|
|
|
|
def test_thought_id_passed_through(self):
|
|
"""thought_id appears in the data."""
|
|
result = produce_thought("timmy", "Hello!", 999)
|
|
assert result["data"]["thought_id"] == 999
|
|
|
|
def test_with_all_parameters(self):
|
|
"""Full parameter set produces expected output."""
|
|
result = produce_thought(
|
|
agent_id="timmy",
|
|
thought_text="Analyzing the situation...",
|
|
thought_id=42,
|
|
chain_id="chain-abc",
|
|
)
|
|
|
|
assert result["type"] == "thought"
|
|
assert result["agent_id"] == "timmy"
|
|
assert result["data"]["text"] == "Analyzing the situation..."
|
|
assert result["data"]["thought_id"] == 42
|
|
assert result["data"]["chain_id"] == "chain-abc"
|