From b12e29b92e4de51bd1a666e17cd800ac8c07a4e6 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Sat, 14 Mar 2026 20:04:18 -0400 Subject: [PATCH] fix: dedup memory consolidation with existing memory search (#105) _maybe_consolidate() now checks get_memories(subject=agent_id) before storing. Skips if a memory of the same type (pattern/anomaly) was created within the last hour. Prevents duplicate consolidation entries on repeated task completion/failure events. Also restructured branching: neutral success rates (0.3-0.8) now return early instead of falling through. 9 new tests. 1465 total passing. --- src/spark/engine.py | 31 ++- tests/spark/test_consolidation_dedup.py | 259 ++++++++++++++++++++++++ 2 files changed, 289 insertions(+), 1 deletion(-) create mode 100644 tests/spark/test_consolidation_dedup.py diff --git a/src/spark/engine.py b/src/spark/engine.py index 89d59d10..0fdc46fe 100644 --- a/src/spark/engine.py +++ b/src/spark/engine.py @@ -273,6 +273,8 @@ class SparkEngine: def _maybe_consolidate(self, agent_id: str) -> None: """Consolidate events into memories when enough data exists.""" + from datetime import UTC, datetime, timedelta + agent_events = spark_memory.get_events(agent_id=agent_id, limit=50) if len(agent_events) < 5: return @@ -286,7 +288,34 @@ class SparkEngine: success_rate = len(completions) / total if total else 0 + # Determine target memory type based on success rate if success_rate >= 0.8: + target_memory_type = "pattern" + elif success_rate <= 0.3: + target_memory_type = "anomaly" + else: + return # No consolidation needed for neutral success rates + + # Check for recent memories of the same type for this agent + existing_memories = spark_memory.get_memories(subject=agent_id, limit=5) + now = datetime.now(UTC) + one_hour_ago = now - timedelta(hours=1) + + for memory in existing_memories: + if memory.memory_type == target_memory_type: + try: + created_at = datetime.fromisoformat(memory.created_at) + if created_at >= one_hour_ago: + logger.info( + "Consolidation: skipping — recent memory exists for %s", + agent_id[:8], + ) + return + except (ValueError, TypeError): + continue + + # Store the new memory + if target_memory_type == "pattern": spark_memory.store_memory( memory_type="pattern", subject=agent_id, @@ -295,7 +324,7 @@ class SparkEngine: confidence=min(0.95, 0.6 + total * 0.05), source_events=total, ) - elif success_rate <= 0.3: + else: # anomaly spark_memory.store_memory( memory_type="anomaly", subject=agent_id, diff --git a/tests/spark/test_consolidation_dedup.py b/tests/spark/test_consolidation_dedup.py new file mode 100644 index 00000000..cbec8da0 --- /dev/null +++ b/tests/spark/test_consolidation_dedup.py @@ -0,0 +1,259 @@ +"""Tests for memory consolidation deduplication (issue #105). + +Verifies that _maybe_consolidate() skips creating duplicate memories +when a recent memory of the same type already exists for the agent. +""" + +from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch + + +class TestConsolidationDedup: + """Test consolidation deduplication behavior.""" + + def _make_event(self, event_type, agent_id="agent-test"): + """Create a mock SparkEvent.""" + event = MagicMock() + event.event_type = event_type + event.agent_id = agent_id + return event + + def _make_memory(self, memory_type, subject, created_at): + """Create a mock SparkMemory.""" + memory = MagicMock() + memory.memory_type = memory_type + memory.subject = subject + memory.created_at = created_at + return memory + + def _get_enough_events_for_consolidation(self, num_completions=4, num_failures=0): + """Return enough events to trigger consolidation (5+ events, 3+ outcomes).""" + events = [] + # Add some non-outcome events first + for _ in range(2): + events.append(self._make_event("task_posted")) + # Add completion/failure events to trigger pattern (>=0.8 success rate) + for _ in range(num_completions): + events.append(self._make_event("task_completed")) + for _ in range(num_failures): + events.append(self._make_event("task_failed")) + return events + + @patch("spark.engine.spark_memory") + def test_creates_memory_when_none_exists(self, mock_spark_memory): + """Test that consolidation creates a memory when none exists.""" + from spark.engine import SparkEngine + + # Setup: enough events to trigger pattern memory, no existing memories + mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation( + num_completions=4 + ) + mock_spark_memory.get_memories.return_value = [] # No existing memories + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should have called store_memory once + mock_spark_memory.store_memory.assert_called_once() + call_kwargs = mock_spark_memory.store_memory.call_args.kwargs + assert call_kwargs["memory_type"] == "pattern" + assert call_kwargs["subject"] == "agent-test" + + @patch("spark.engine.spark_memory") + def test_skips_when_recent_memory_exists(self, mock_spark_memory): + """Test that consolidation SKIPS when a recent memory (< 1 hour) exists.""" + from spark.engine import SparkEngine + + # Setup: enough events to trigger pattern memory + mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation( + num_completions=4 + ) + + # Existing recent pattern memory (30 minutes ago) + recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat() + mock_spark_memory.get_memories.return_value = [ + self._make_memory("pattern", "agent-test", recent_time) + ] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should NOT have called store_memory + mock_spark_memory.store_memory.assert_not_called() + + @patch("spark.engine.spark_memory") + def test_creates_when_existing_memory_is_old(self, mock_spark_memory): + """Test that consolidation creates new memory when existing is old (> 1 hour).""" + from spark.engine import SparkEngine + + # Setup: enough events to trigger pattern memory + mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation( + num_completions=4 + ) + + # Existing old pattern memory (2 hours ago) + old_time = (datetime.now(UTC) - timedelta(hours=2)).isoformat() + mock_spark_memory.get_memories.return_value = [ + self._make_memory("pattern", "agent-test", old_time) + ] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should have called store_memory (old memory doesn't block) + mock_spark_memory.store_memory.assert_called_once() + call_kwargs = mock_spark_memory.store_memory.call_args.kwargs + assert call_kwargs["memory_type"] == "pattern" + + @patch("spark.engine.spark_memory") + def test_pattern_vs_anomaly_type_distinction(self, mock_spark_memory): + """Test that pattern vs anomaly type distinction works correctly.""" + from spark.engine import SparkEngine + + # Setup: events that trigger anomaly (low success rate: 1/4 = 0.25) + events = [] + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_completed")) # 1 success + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) # 3 failures + + mock_spark_memory.get_events.return_value = events + mock_spark_memory.get_memories.return_value = [] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should create an anomaly memory + mock_spark_memory.store_memory.assert_called_once() + call_kwargs = mock_spark_memory.store_memory.call_args.kwargs + assert call_kwargs["memory_type"] == "anomaly" + assert "struggling" in call_kwargs["content"] + + @patch("spark.engine.spark_memory") + def test_anomaly_skips_when_recent_anomaly_exists(self, mock_spark_memory): + """Test that anomaly consolidation skips when recent anomaly exists.""" + from spark.engine import SparkEngine + + # Setup: events that trigger anomaly + events = [] + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_completed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + + mock_spark_memory.get_events.return_value = events + + # Existing recent anomaly memory + recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat() + mock_spark_memory.get_memories.return_value = [ + self._make_memory("anomaly", "agent-test", recent_time) + ] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should NOT have called store_memory + mock_spark_memory.store_memory.assert_not_called() + + @patch("spark.engine.spark_memory") + def test_pattern_does_not_skip_on_recent_anomaly(self, mock_spark_memory): + """Test that pattern consolidation still runs when recent anomaly exists.""" + from spark.engine import SparkEngine + + # Setup: events that trigger pattern (high success rate) + mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation( + num_completions=4 + ) + + # Existing recent anomaly memory (different type) + recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat() + mock_spark_memory.get_memories.return_value = [ + self._make_memory("anomaly", "agent-test", recent_time) + ] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should create pattern memory (different type, so no dedup) + mock_spark_memory.store_memory.assert_called_once() + call_kwargs = mock_spark_memory.store_memory.call_args.kwargs + assert call_kwargs["memory_type"] == "pattern" + + @patch("spark.engine.spark_memory") + def test_anomaly_does_not_skip_on_recent_pattern(self, mock_spark_memory): + """Test that anomaly consolidation still runs when recent pattern exists.""" + from spark.engine import SparkEngine + + # Setup: events that trigger anomaly + events = [] + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_completed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + + mock_spark_memory.get_events.return_value = events + + # Existing recent pattern memory (different type) + recent_time = (datetime.now(UTC) - timedelta(minutes=30)).isoformat() + mock_spark_memory.get_memories.return_value = [ + self._make_memory("pattern", "agent-test", recent_time) + ] + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should create anomaly memory (different type, so no dedup) + mock_spark_memory.store_memory.assert_called_once() + call_kwargs = mock_spark_memory.store_memory.call_args.kwargs + assert call_kwargs["memory_type"] == "anomaly" + + @patch("spark.engine.spark_memory") + def test_no_consolidation_for_neutral_success_rate(self, mock_spark_memory): + """Test that neutral success rates (0.3 < rate < 0.8) don't create memories.""" + from spark.engine import SparkEngine + + # Setup: events that result in neutral success rate (2/4 = 0.5) + events = [] + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_posted")) + events.append(self._make_event("task_completed")) + events.append(self._make_event("task_completed")) + events.append(self._make_event("task_failed")) + events.append(self._make_event("task_failed")) + + mock_spark_memory.get_events.return_value = events + + engine = SparkEngine(enabled=True) + engine._maybe_consolidate("agent-test") + + # Should NOT have called store_memory or get_memories + mock_spark_memory.get_memories.assert_not_called() + mock_spark_memory.store_memory.assert_not_called() + + @patch("spark.engine.spark_memory") + def test_handles_invalid_created_at_gracefully(self, mock_spark_memory): + """Test that invalid created_at timestamps don't crash dedup.""" + from spark.engine import SparkEngine + + # Setup: enough events to trigger pattern memory + mock_spark_memory.get_events.return_value = self._get_enough_events_for_consolidation( + num_completions=4 + ) + + # Existing memory with invalid created_at + mock_spark_memory.get_memories.return_value = [ + self._make_memory("pattern", "agent-test", "invalid-timestamp") + ] + + engine = SparkEngine(enabled=True) + # Should not raise an exception + engine._maybe_consolidate("agent-test") + + # Should still create new memory (invalid timestamp ignored) + mock_spark_memory.store_memory.assert_called_once()