[kimi] Add system_status message producer (#681) #743
@@ -240,3 +240,94 @@ def produce_agent_state(agent_id: str, presence: dict) -> dict:
|
||||
},
|
||||
"ts": int(time.time()),
|
||||
}
|
||||
|
||||
|
||||
def produce_system_status() -> dict:
|
||||
"""Generate a system_status message for the Matrix.
|
||||
|
||||
Returns a dict with system health metrics including agent count,
|
||||
visitor count, uptime, thinking engine status, and memory count.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
Message with keys ``type``, ``data`` (containing ``agents_online``,
|
||||
``visitors``, ``uptime_seconds``, ``thinking_active``, ``memory_count``),
|
||||
and ``ts``.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> produce_system_status()
|
||||
{
|
||||
"type": "system_status",
|
||||
"data": {
|
||||
"agents_online": 5,
|
||||
"visitors": 2,
|
||||
"uptime_seconds": 3600,
|
||||
"thinking_active": True,
|
||||
"memory_count": 150,
|
||||
},
|
||||
"ts": 1742529600,
|
||||
}
|
||||
"""
|
||||
# Count agents with status != offline
|
||||
agents_online = 0
|
||||
try:
|
||||
from timmy.agents.loader import list_agents
|
||||
|
||||
agents = list_agents()
|
||||
agents_online = sum(1 for a in agents if a.get("status", "") not in ("offline", ""))
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count agents: %s", exc)
|
||||
|
||||
# Count visitors from WebSocket clients
|
||||
visitors = 0
|
||||
try:
|
||||
from dashboard.routes.world import _ws_clients
|
||||
|
||||
visitors = len(_ws_clients)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count visitors: %s", exc)
|
||||
|
||||
# Calculate uptime
|
||||
uptime_seconds = 0
|
||||
try:
|
||||
from datetime import UTC
|
||||
|
||||
from config import APP_START_TIME
|
||||
|
||||
uptime_seconds = int((datetime.now(UTC) - APP_START_TIME).total_seconds())
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to calculate uptime: %s", exc)
|
||||
|
||||
# Check thinking engine status
|
||||
thinking_active = False
|
||||
try:
|
||||
from config import settings
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thinking_active = settings.thinking_enabled and thinking_engine is not None
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to check thinking status: %s", exc)
|
||||
|
||||
# Count memories in vector store
|
||||
memory_count = 0
|
||||
try:
|
||||
from timmy.memory_system import get_memory_stats
|
||||
|
||||
stats = get_memory_stats()
|
||||
memory_count = stats.get("total_entries", 0)
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to count memories: %s", exc)
|
||||
|
||||
return {
|
||||
"type": "system_status",
|
||||
"data": {
|
||||
"agents_online": agents_online,
|
||||
"visitors": visitors,
|
||||
"uptime_seconds": uptime_seconds,
|
||||
"thinking_active": thinking_active,
|
||||
"memory_count": memory_count,
|
||||
},
|
||||
"ts": int(time.time()),
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ from infrastructure.presence import (
|
||||
_get_familiar_state,
|
||||
produce_agent_state,
|
||||
produce_bark,
|
||||
produce_system_status,
|
||||
produce_thought,
|
||||
serialize_presence,
|
||||
)
|
||||
@@ -419,3 +420,83 @@ class TestProduceThought:
|
||||
assert result["data"]["text"] == "Analyzing the situation..."
|
||||
assert result["data"]["thought_id"] == 42
|
||||
assert result["data"]["chain_id"] == "chain-abc"
|
||||
|
||||
|
||||
class TestProduceSystemStatus:
|
||||
"""Tests for produce_system_status() — Matrix system_status message producer."""
|
||||
|
||||
@patch("infrastructure.presence.time")
|
||||
def test_full_message_structure(self, mock_time):
|
||||
"""Returns dict with type, data, and ts keys."""
|
||||
mock_time.time.return_value = 1742529600
|
||||
result = produce_system_status()
|
||||
|
||||
assert result["type"] == "system_status"
|
||||
assert result["ts"] == 1742529600
|
||||
assert isinstance(result["data"], dict)
|
||||
|
||||
def test_data_has_required_fields(self):
|
||||
"""data dict contains all required system status fields."""
|
||||
result = produce_system_status()
|
||||
data = result["data"]
|
||||
|
||||
assert "agents_online" in data
|
||||
assert "visitors" in data
|
||||
assert "uptime_seconds" in data
|
||||
assert "thinking_active" in data
|
||||
assert "memory_count" in data
|
||||
|
||||
def test_data_field_types(self):
|
||||
"""All data fields have correct types."""
|
||||
result = produce_system_status()
|
||||
data = result["data"]
|
||||
|
||||
assert isinstance(data["agents_online"], int)
|
||||
assert isinstance(data["visitors"], int)
|
||||
assert isinstance(data["uptime_seconds"], int)
|
||||
assert isinstance(data["thinking_active"], bool)
|
||||
assert isinstance(data["memory_count"], int)
|
||||
|
||||
def test_agents_online_is_non_negative(self):
|
||||
"""agents_online is never negative."""
|
||||
result = produce_system_status()
|
||||
assert result["data"]["agents_online"] >= 0
|
||||
|
||||
def test_visitors_is_non_negative(self):
|
||||
"""visitors is never negative."""
|
||||
result = produce_system_status()
|
||||
assert result["data"]["visitors"] >= 0
|
||||
|
||||
def test_uptime_seconds_is_non_negative(self):
|
||||
"""uptime_seconds is never negative."""
|
||||
result = produce_system_status()
|
||||
assert result["data"]["uptime_seconds"] >= 0
|
||||
|
||||
def test_memory_count_is_non_negative(self):
|
||||
"""memory_count is never negative."""
|
||||
result = produce_system_status()
|
||||
assert result["data"]["memory_count"] >= 0
|
||||
|
||||
@patch("infrastructure.presence.time")
|
||||
def test_ts_is_unix_timestamp(self, mock_time):
|
||||
"""ts should be an integer Unix timestamp."""
|
||||
mock_time.time.return_value = 1742529600
|
||||
result = produce_system_status()
|
||||
assert isinstance(result["ts"], int)
|
||||
assert result["ts"] == 1742529600
|
||||
|
||||
@patch("infrastructure.presence.logger")
|
||||
def test_graceful_degradation_on_import_errors(self, mock_logger):
|
||||
"""Function returns valid dict even when imports fail."""
|
||||
# This test verifies the function handles failures gracefully
|
||||
# by checking it always returns the expected structure
|
||||
result = produce_system_status()
|
||||
|
||||
assert result["type"] == "system_status"
|
||||
assert isinstance(result["data"], dict)
|
||||
assert isinstance(result["ts"], int)
|
||||
|
||||
def test_returns_dict(self):
|
||||
"""produce_system_status always returns a plain dict."""
|
||||
result = produce_system_status()
|
||||
assert isinstance(result, dict)
|
||||
|
||||
Reference in New Issue
Block a user