forked from Rockachopa/Timmy-time-dashboard
fix: dedup memory consolidation with existing memory search (#105)
_maybe_consolidate() now checks get_memories(subject=agent_id) before storing. Skips if a memory of the same type (pattern/anomaly) was created within the last hour. Prevents duplicate consolidation entries on repeated task completion/failure events. Also restructured branching: neutral success rates (0.3-0.8) now return early instead of falling through. 9 new tests. 1465 total passing.
This commit is contained in:
@@ -273,6 +273,8 @@ class SparkEngine:
|
||||
|
||||
def _maybe_consolidate(self, agent_id: str) -> None:
|
||||
"""Consolidate events into memories when enough data exists."""
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
agent_events = spark_memory.get_events(agent_id=agent_id, limit=50)
|
||||
if len(agent_events) < 5:
|
||||
return
|
||||
@@ -286,7 +288,34 @@ class SparkEngine:
|
||||
|
||||
success_rate = len(completions) / total if total else 0
|
||||
|
||||
# Determine target memory type based on success rate
|
||||
if success_rate >= 0.8:
|
||||
target_memory_type = "pattern"
|
||||
elif success_rate <= 0.3:
|
||||
target_memory_type = "anomaly"
|
||||
else:
|
||||
return # No consolidation needed for neutral success rates
|
||||
|
||||
# Check for recent memories of the same type for this agent
|
||||
existing_memories = spark_memory.get_memories(subject=agent_id, limit=5)
|
||||
now = datetime.now(UTC)
|
||||
one_hour_ago = now - timedelta(hours=1)
|
||||
|
||||
for memory in existing_memories:
|
||||
if memory.memory_type == target_memory_type:
|
||||
try:
|
||||
created_at = datetime.fromisoformat(memory.created_at)
|
||||
if created_at >= one_hour_ago:
|
||||
logger.info(
|
||||
"Consolidation: skipping — recent memory exists for %s",
|
||||
agent_id[:8],
|
||||
)
|
||||
return
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
# Store the new memory
|
||||
if target_memory_type == "pattern":
|
||||
spark_memory.store_memory(
|
||||
memory_type="pattern",
|
||||
subject=agent_id,
|
||||
@@ -295,7 +324,7 @@ class SparkEngine:
|
||||
confidence=min(0.95, 0.6 + total * 0.05),
|
||||
source_events=total,
|
||||
)
|
||||
elif success_rate <= 0.3:
|
||||
else: # anomaly
|
||||
spark_memory.store_memory(
|
||||
memory_type="anomaly",
|
||||
subject=agent_id,
|
||||
|
||||
259
tests/spark/test_consolidation_dedup.py
Normal file
259
tests/spark/test_consolidation_dedup.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""Tests for memory consolidation deduplication (issue #105).
|
||||
|
||||
Verifies that _maybe_consolidate() skips creating duplicate memories
|
||||
when a recent memory of the same type already exists for the agent.
|
||||
"""
|
||||
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestConsolidationDedup:
|
||||
"""Test consolidation deduplication behavior."""
|
||||
|
||||
def _make_event(self, event_type, agent_id="agent-test"):
|
||||
"""Create a mock SparkEvent."""
|
||||
event = MagicMock()
|
||||
event.event_type = event_type
|
||||
event.agent_id = agent_id
|
||||
return event
|
||||
|
||||
def _make_memory(self, memory_type, subject, created_at):
|
||||
"""Create a mock SparkMemory."""
|
||||
memory = MagicMock()
|
||||
memory.memory_type = memory_type
|
||||
memory.subject = subject
|
||||
memory.created_at = created_at
|
||||
return memory
|
||||
|
||||
def _get_enough_events_for_consolidation(self, num_completions=4, num_failures=0):
|
||||
"""Return enough events to trigger consolidation (5+ events, 3+ outcomes)."""
|
||||
events = []
|
||||
# Add some non-outcome events first
|
||||
for _ in range(2):
|
||||
events.append(self._make_event("task_posted"))
|
||||
# Add completion/failure events to trigger pattern (>=0.8 success rate)
|
||||
for _ in range(num_completions):
|
||||
events.append(self._make_event("task_completed"))
|
||||
for _ in range(num_failures):
|
||||
events.append(self._make_event("task_failed"))
|
||||
return events
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_creates_memory_when_none_exists(self, mock_spark_memory):
|
||||
"""Test that consolidation creates a memory when none exists."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: enough events to trigger pattern memory, no existing memories
|
||||
mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation(
|
||||
num_completions=4
|
||||
)
|
||||
mock_spark_memory.get_memories.return_value = [] # No existing memories
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should have called store_memory once
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
call_kwargs = mock_spark_memory.store_memory.call_args.kwargs
|
||||
assert call_kwargs["memory_type"] == "pattern"
|
||||
assert call_kwargs["subject"] == "agent-test"
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_skips_when_recent_memory_exists(self, mock_spark_memory):
|
||||
"""Test that consolidation SKIPS when a recent memory (< 1 hour) exists."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: enough events to trigger pattern memory
|
||||
mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation(
|
||||
num_completions=4
|
||||
)
|
||||
|
||||
# Existing recent pattern memory (30 minutes ago)
|
||||
recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat()
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("pattern", "agent-test", recent_time)
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should NOT have called store_memory
|
||||
mock_spark_memory.store_memory.assert_not_called()
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_creates_when_existing_memory_is_old(self, mock_spark_memory):
|
||||
"""Test that consolidation creates new memory when existing is old (> 1 hour)."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: enough events to trigger pattern memory
|
||||
mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation(
|
||||
num_completions=4
|
||||
)
|
||||
|
||||
# Existing old pattern memory (2 hours ago)
|
||||
old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat()
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("pattern", "agent-test", old_time)
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should have called store_memory (old memory doesn't block)
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
call_kwargs = mock_spark_memory.store_memory.call_args.kwargs
|
||||
assert call_kwargs["memory_type"] == "pattern"
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_pattern_vs_anomaly_type_distinction(self, mock_spark_memory):
|
||||
"""Test that pattern vs anomaly type distinction works correctly."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: events that trigger anomaly (low success rate: 1/4 = 0.25)
|
||||
events = []
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_completed")) # 1 success
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed")) # 3 failures
|
||||
|
||||
mock_spark_memory.get_events.return_value = events
|
||||
mock_spark_memory.get_memories.return_value = []
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should create an anomaly memory
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
call_kwargs = mock_spark_memory.store_memory.call_args.kwargs
|
||||
assert call_kwargs["memory_type"] == "anomaly"
|
||||
assert "struggling" in call_kwargs["content"]
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_anomaly_skips_when_recent_anomaly_exists(self, mock_spark_memory):
|
||||
"""Test that anomaly consolidation skips when recent anomaly exists."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: events that trigger anomaly
|
||||
events = []
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_completed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
|
||||
mock_spark_memory.get_events.return_value = events
|
||||
|
||||
# Existing recent anomaly memory
|
||||
recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat()
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("anomaly", "agent-test", recent_time)
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should NOT have called store_memory
|
||||
mock_spark_memory.store_memory.assert_not_called()
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_pattern_does_not_skip_on_recent_anomaly(self, mock_spark_memory):
|
||||
"""Test that pattern consolidation still runs when recent anomaly exists."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: events that trigger pattern (high success rate)
|
||||
mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation(
|
||||
num_completions=4
|
||||
)
|
||||
|
||||
# Existing recent anomaly memory (different type)
|
||||
recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat()
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("anomaly", "agent-test", recent_time)
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should create pattern memory (different type, so no dedup)
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
call_kwargs = mock_spark_memory.store_memory.call_args.kwargs
|
||||
assert call_kwargs["memory_type"] == "pattern"
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_anomaly_does_not_skip_on_recent_pattern(self, mock_spark_memory):
|
||||
"""Test that anomaly consolidation still runs when recent pattern exists."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: events that trigger anomaly
|
||||
events = []
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_completed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
|
||||
mock_spark_memory.get_events.return_value = events
|
||||
|
||||
# Existing recent pattern memory (different type)
|
||||
recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat()
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("pattern", "agent-test", recent_time)
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should create anomaly memory (different type, so no dedup)
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
call_kwargs = mock_spark_memory.store_memory.call_args.kwargs
|
||||
assert call_kwargs["memory_type"] == "anomaly"
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_no_consolidation_for_neutral_success_rate(self, mock_spark_memory):
|
||||
"""Test that neutral success rates (0.3 < rate < 0.8) don't create memories."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: events that result in neutral success rate (2/4 = 0.5)
|
||||
events = []
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_posted"))
|
||||
events.append(self._make_event("task_completed"))
|
||||
events.append(self._make_event("task_completed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
events.append(self._make_event("task_failed"))
|
||||
|
||||
mock_spark_memory.get_events.return_value = events
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should NOT have called store_memory or get_memories
|
||||
mock_spark_memory.get_memories.assert_not_called()
|
||||
mock_spark_memory.store_memory.assert_not_called()
|
||||
|
||||
@patch("spark.engine.spark_memory")
|
||||
def test_handles_invalid_created_at_gracefully(self, mock_spark_memory):
|
||||
"""Test that invalid created_at timestamps don't crash dedup."""
|
||||
from spark.engine import SparkEngine
|
||||
|
||||
# Setup: enough events to trigger pattern memory
|
||||
mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation(
|
||||
num_completions=4
|
||||
)
|
||||
|
||||
# Existing memory with invalid created_at
|
||||
mock_spark_memory.get_memories.return_value = [
|
||||
self._make_memory("pattern", "agent-test", "invalid-timestamp")
|
||||
]
|
||||
|
||||
engine = SparkEngine(enabled=True)
|
||||
# Should not raise an exception
|
||||
engine._maybe_consolidate("agent-test")
|
||||
|
||||
# Should still create new memory (invalid timestamp ignored)
|
||||
mock_spark_memory.store_memory.assert_called_once()
|
||||
Reference in New Issue
Block a user