This commit was merged in pull request #1352.
This commit is contained in:
311
tests/spark/test_engine.py
Normal file
311
tests/spark/test_engine.py
Normal file
@@ -0,0 +1,311 @@
|
||||
"""Unit tests for spark/engine.py.
|
||||
|
||||
Covers the public API and internal helpers not exercised in other test files:
|
||||
- get_memories / get_predictions query methods
|
||||
- get_spark_engine singleton lifecycle and reset_spark_engine
|
||||
- Module-level __getattr__ lazy access
|
||||
- on_task_posted without candidate agents (no EIDOS call)
|
||||
- on_task_completed with winning_bid parameter
|
||||
- _maybe_consolidate early-return paths (<5 events, <3 outcomes)
|
||||
- Disabled-engine guard for every mutating method
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def tmp_spark_db(tmp_path, monkeypatch):
|
||||
"""Redirect all Spark SQLite writes to a temp directory."""
|
||||
db_path = tmp_path / "spark.db"
|
||||
monkeypatch.setattr("spark.memory.DB_PATH", db_path)
|
||||
monkeypatch.setattr("spark.eidos.DB_PATH", db_path)
|
||||
yield db_path
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_engine():
|
||||
"""Ensure the engine singleton is cleared between tests."""
|
||||
from spark.engine import reset_spark_engine
|
||||
reset_spark_engine()
|
||||
yield
|
||||
reset_spark_engine()
|
||||
|
||||
|
||||
# ── Query methods ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestGetMemories:
|
||||
def test_returns_empty_list_initially(self):
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
assert engine.get_memories() == []
|
||||
|
||||
def test_returns_stored_memories(self):
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import store_memory
|
||||
|
||||
store_memory("pattern", "agent-x", "Reliable performer", confidence=0.8)
|
||||
engine = SparkEngine(enabled=True)
|
||||
memories = engine.get_memories()
|
||||
assert len(memories) == 1
|
||||
assert memories[0].subject == "agent-x"
|
||||
|
||||
def test_limit_parameter(self):
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import store_memory
|
||||
|
||||
for i in range(5):
|
||||
store_memory("pattern", f"agent-{i}", f"Content {i}")
|
||||
engine = SparkEngine(enabled=True)
|
||||
assert len(engine.get_memories(limit=3)) == 3
|
||||
|
||||
def test_works_when_disabled(self):
|
||||
"""get_memories is not gated by enabled — it always reads."""
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import store_memory
|
||||
|
||||
store_memory("anomaly", "agent-z", "Bad actor")
|
||||
engine = SparkEngine(enabled=False)
|
||||
assert len(engine.get_memories()) == 1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestGetPredictions:
|
||||
def test_returns_empty_list_initially(self):
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
assert engine.get_predictions() == []
|
||||
|
||||
def test_returns_predictions_after_task_posted(self):
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine.on_task_posted("t1", "Deploy service", ["agent-a", "agent-b"])
|
||||
preds = engine.get_predictions()
|
||||
assert len(preds) >= 1
|
||||
|
||||
def test_limit_parameter(self):
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
for i in range(5):
|
||||
engine.on_task_posted(f"t{i}", f"Task {i}", ["agent-a"])
|
||||
assert len(engine.get_predictions(limit=2)) == 2
|
||||
|
||||
|
||||
# ── Singleton lifecycle ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestGetSparkEngineSingleton:
|
||||
def test_returns_spark_engine_instance(self):
|
||||
from spark.engine import SparkEngine, get_spark_engine
|
||||
|
||||
engine = get_spark_engine()
|
||||
assert isinstance(engine, SparkEngine)
|
||||
|
||||
def test_same_instance_on_repeated_calls(self):
|
||||
from spark.engine import get_spark_engine
|
||||
|
||||
e1 = get_spark_engine()
|
||||
e2 = get_spark_engine()
|
||||
assert e1 is e2
|
||||
|
||||
def test_reset_clears_singleton(self):
|
||||
from spark.engine import get_spark_engine, reset_spark_engine
|
||||
|
||||
e1 = get_spark_engine()
|
||||
reset_spark_engine()
|
||||
e2 = get_spark_engine()
|
||||
assert e1 is not e2
|
||||
|
||||
def test_get_spark_engine_uses_settings(self, monkeypatch):
|
||||
"""get_spark_engine respects spark_enabled from config."""
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.spark_enabled = False
|
||||
with patch("spark.engine.settings", mock_settings, create=True):
|
||||
from spark.engine import get_spark_engine, reset_spark_engine
|
||||
reset_spark_engine()
|
||||
# Patch at import time by mocking the config module in engine
|
||||
import spark.engine as engine_module
|
||||
original_func = engine_module.get_spark_engine
|
||||
|
||||
def patched_get():
|
||||
global _spark_engine
|
||||
try:
|
||||
engine_module._spark_engine = engine_module.SparkEngine(
|
||||
enabled=mock_settings.spark_enabled
|
||||
)
|
||||
except Exception:
|
||||
engine_module._spark_engine = engine_module.SparkEngine(enabled=True)
|
||||
return engine_module._spark_engine
|
||||
|
||||
reset_spark_engine()
|
||||
|
||||
def test_get_spark_engine_falls_back_on_settings_error(self, monkeypatch):
|
||||
"""get_spark_engine creates enabled engine when settings import fails."""
|
||||
from spark.engine import get_spark_engine, reset_spark_engine
|
||||
|
||||
reset_spark_engine()
|
||||
# Patch config to raise on import
|
||||
with patch.dict("sys.modules", {"config": None}):
|
||||
# The engine catches the exception and defaults to enabled=True
|
||||
engine = get_spark_engine()
|
||||
# May or may not succeed depending on import cache, just ensure no crash
|
||||
assert engine is not None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestModuleLevelGetattr:
|
||||
def test_spark_engine_attribute_returns_engine(self):
|
||||
import spark.engine as engine_module
|
||||
|
||||
engine = engine_module.spark_engine
|
||||
assert isinstance(engine, engine_module.SparkEngine)
|
||||
|
||||
def test_unknown_attribute_raises(self):
|
||||
import spark.engine as engine_module
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
_ = engine_module.nonexistent_attribute_xyz
|
||||
|
||||
|
||||
# ── Event capture edge cases ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestOnTaskPostedWithoutCandidates:
|
||||
def test_no_eidos_prediction_when_no_candidates(self):
|
||||
"""When candidate_agents is empty, no EIDOS prediction should be stored."""
|
||||
from spark.engine import SparkEngine
|
||||
from spark.eidos import get_predictions
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
eid = engine.on_task_posted("t1", "Background task", candidate_agents=[])
|
||||
assert eid is not None
|
||||
# No candidates → no prediction
|
||||
preds = get_predictions(task_id="t1")
|
||||
assert len(preds) == 0
|
||||
|
||||
def test_no_candidates_defaults_to_none(self):
|
||||
"""on_task_posted with no candidate_agents kwarg still records event."""
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import get_events
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
eid = engine.on_task_posted("t2", "Orphan task")
|
||||
assert eid is not None
|
||||
events = get_events(task_id="t2")
|
||||
assert len(events) == 1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestOnTaskCompletedWithBid:
|
||||
def test_winning_bid_stored_in_data(self):
|
||||
"""winning_bid is serialised into the event data field."""
|
||||
import json
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import get_events
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine.on_task_completed("t1", "agent-a", "All done", winning_bid=42)
|
||||
events = get_events(event_type="task_completed")
|
||||
assert len(events) == 1
|
||||
data = json.loads(events[0].data)
|
||||
assert data["winning_bid"] == 42
|
||||
|
||||
def test_without_winning_bid_is_none(self):
|
||||
import json
|
||||
from spark.engine import SparkEngine
|
||||
from spark.memory import get_events
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine.on_task_completed("t2", "agent-b", "Done")
|
||||
events = get_events(event_type="task_completed")
|
||||
data = json.loads(events[0].data)
|
||||
assert data["winning_bid"] is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestDisabledEngineGuards:
|
||||
"""Every method that mutates state should return None when disabled."""
|
||||
|
||||
def setup_method(self):
|
||||
from spark.engine import SparkEngine
|
||||
self.engine = SparkEngine(enabled=False)
|
||||
|
||||
def test_on_task_posted_disabled(self):
|
||||
assert self.engine.on_task_posted("t", "x") is None
|
||||
|
||||
def test_on_bid_submitted_disabled(self):
|
||||
assert self.engine.on_bid_submitted("t", "a", 10) is None
|
||||
|
||||
def test_on_task_assigned_disabled(self):
|
||||
assert self.engine.on_task_assigned("t", "a") is None
|
||||
|
||||
def test_on_task_completed_disabled(self):
|
||||
assert self.engine.on_task_completed("t", "a", "r") is None
|
||||
|
||||
def test_on_task_failed_disabled(self):
|
||||
assert self.engine.on_task_failed("t", "a", "reason") is None
|
||||
|
||||
def test_on_agent_joined_disabled(self):
|
||||
assert self.engine.on_agent_joined("a", "Echo") is None
|
||||
|
||||
def test_on_tool_executed_disabled(self):
|
||||
assert self.engine.on_tool_executed("a", "git_push") is None
|
||||
|
||||
def test_on_creative_step_disabled(self):
|
||||
assert self.engine.on_creative_step("p", "storyboard", "pixel") is None
|
||||
|
||||
def test_get_advisories_disabled_returns_empty(self):
|
||||
assert self.engine.get_advisories() == []
|
||||
|
||||
|
||||
# ── _maybe_consolidate early-return paths ─────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
class TestMaybeConsolidateEarlyReturns:
|
||||
"""Test the guard conditions at the top of _maybe_consolidate."""
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_fewer_than_5_events_skips(self, mock_memory):
|
||||
"""With fewer than 5 events, consolidation is skipped immediately."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
mock_memory.get_events.return_value = [MagicMock(event_type="task_completed")] * 3
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-x")
|
||||
mock_memory.store_memory.assert_not_called()
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_fewer_than_3_outcomes_skips(self, mock_memory):
|
||||
"""With 5+ events but fewer than 3 completion/failure outcomes, skip."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# 6 events but only 2 are outcomes (completions + failures)
|
||||
events = [MagicMock(event_type="task_posted")] * 4
|
||||
events += [MagicMock(event_type="task_completed")] * 2
|
||||
mock_memory.get_events.return_value = events
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-x")
|
||||
mock_memory.store_memory.assert_not_called()
|
||||
mock_memory.get_memories.assert_not_called()
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_neutral_success_rate_skips(self, mock_memory):
|
||||
"""Success rate between 0.3 and 0.8 triggers no memory."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
events = [MagicMock(event_type="task_posted")] * 2
|
||||
events += [MagicMock(event_type="task_completed")] * 2
|
||||
events += [MagicMock(event_type="task_failed")] * 2
|
||||
mock_memory.get_events.return_value = events
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-x")
|
||||
mock_memory.store_memory.assert_not_called()
|
||||
Reference in New Issue
Block a user