"""Spark Intelligence engine — the top-level API for Spark integration. The engine is the single entry point used by the swarm coordinator and dashboard routes. It wires together memory capture, EIDOS predictions, memory consolidation, and the advisory system. Usage ----- from spark.engine import get_spark_engine spark_engine = get_spark_engine() # Capture a swarm event spark_engine.on_task_posted(task_id, description) spark_engine.on_bid_submitted(task_id, agent_id, bid_sats) spark_engine.on_task_completed(task_id, agent_id, result) spark_engine.on_task_failed(task_id, agent_id, reason) # Query Spark intelligence spark_engine.status() spark_engine.get_advisories() spark_engine.get_timeline() """ import json import logging from spark import advisor as spark_advisor from spark import eidos as spark_eidos from spark import memory as spark_memory from spark.advisor import Advisory from spark.memory import SparkEvent, SparkMemory logger = logging.getLogger(__name__) class SparkEngine: """Top-level Spark Intelligence controller.""" def __init__(self, enabled: bool = True) -> None: self._enabled = enabled if enabled: logger.info("Spark Intelligence engine initialised") @property def enabled(self) -> bool: return self._enabled # ── Event capture (called by coordinator) ──────────────────────────────── def on_task_posted( self, task_id: str, description: str, candidate_agents: list[str] | None = None, ) -> str | None: """Capture a task-posted event and generate a prediction.""" if not self._enabled: return None event_id = spark_memory.record_event( event_type="task_posted", description=description, task_id=task_id, data=json.dumps({"candidates": candidate_agents or []}), ) # Generate EIDOS prediction if candidate_agents: spark_eidos.predict_task_outcome( task_id=task_id, task_description=description, candidate_agents=candidate_agents, ) logger.debug("Spark: captured task_posted %s", task_id[:8]) return event_id def on_bid_submitted( self, task_id: str, agent_id: str, bid_sats: int, ) -> str | None: """Capture a bid event.""" if not self._enabled: return None event_id = spark_memory.record_event( event_type="bid_submitted", description=f"Agent {agent_id[:8]} bid {bid_sats} sats", agent_id=agent_id, task_id=task_id, data=json.dumps({"bid_sats": bid_sats}), ) logger.debug("Spark: captured bid %s→%s (%d sats)", agent_id[:8], task_id[:8], bid_sats) return event_id def on_task_assigned( self, task_id: str, agent_id: str, ) -> str | None: """Capture a task-assigned event.""" if not self._enabled: return None event_id = spark_memory.record_event( event_type="task_assigned", description=f"Task assigned to {agent_id[:8]}", agent_id=agent_id, task_id=task_id, ) logger.debug("Spark: captured assignment %s→%s", task_id[:8], agent_id[:8]) return event_id def on_task_completed( self, task_id: str, agent_id: str, result: str, winning_bid: int | None = None, ) -> str | None: """Capture a task-completed event and evaluate EIDOS prediction.""" if not self._enabled: return None event_id = spark_memory.record_event( event_type="task_completed", description=f"Task completed by {agent_id[:8]}", agent_id=agent_id, task_id=task_id, data=json.dumps( { "result_length": len(result), "winning_bid": winning_bid, } ), ) # Evaluate EIDOS prediction evaluation = spark_eidos.evaluate_prediction( task_id=task_id, actual_winner=agent_id, task_succeeded=True, winning_bid=winning_bid, ) if evaluation: accuracy = evaluation["accuracy"] spark_memory.record_event( event_type="prediction_result", description=f"Prediction accuracy: {accuracy:.0%}", task_id=task_id, data=json.dumps(evaluation, default=str), importance=0.7, ) # Consolidate memory if enough events for this agent self._maybe_consolidate(agent_id) logger.debug("Spark: captured completion %s by %s", task_id[:8], agent_id[:8]) return event_id def on_task_failed( self, task_id: str, agent_id: str, reason: str, ) -> str | None: """Capture a task-failed event and evaluate EIDOS prediction.""" if not self._enabled: return None event_id = spark_memory.record_event( event_type="task_failed", description=f"Task failed by {agent_id[:8]}: {reason[:80]}", agent_id=agent_id, task_id=task_id, data=json.dumps({"reason": reason}), ) # Evaluate EIDOS prediction spark_eidos.evaluate_prediction( task_id=task_id, actual_winner=agent_id, task_succeeded=False, ) # Failures always worth consolidating self._maybe_consolidate(agent_id) logger.debug("Spark: captured failure %s by %s", task_id[:8], agent_id[:8]) return event_id def on_agent_joined(self, agent_id: str, name: str) -> str | None: """Capture an agent-joined event.""" if not self._enabled: return None return spark_memory.record_event( event_type="agent_joined", description=f"Agent {name} ({agent_id[:8]}) joined the swarm", agent_id=agent_id, ) # ── Tool-level event capture ───────────────────────────────────────────── def on_tool_executed( self, agent_id: str, tool_name: str, task_id: str | None = None, success: bool = True, duration_ms: int | None = None, ) -> str | None: """Capture an individual tool invocation. Tracks which tools each agent uses, success rates, and latency so Spark can generate tool-specific advisories. """ if not self._enabled: return None data = {"tool": tool_name, "success": success} if duration_ms is not None: data["duration_ms"] = duration_ms return spark_memory.record_event( event_type="tool_executed", description=f"Agent {agent_id[:8]} used {tool_name} ({'ok' if success else 'FAIL'})", agent_id=agent_id, task_id=task_id, data=json.dumps(data), importance=0.3 if success else 0.6, ) # ── Creative pipeline event capture ────────────────────────────────────── def on_creative_step( self, project_id: str, step_name: str, agent_id: str, output_path: str | None = None, success: bool = True, ) -> str | None: """Capture a creative pipeline step (storyboard, music, video, assembly). Tracks pipeline progress and creative output quality metrics for Spark advisory generation. """ if not self._enabled: return None data = { "project_id": project_id, "step": step_name, "success": success, } if output_path: data["output_path"] = output_path return spark_memory.record_event( event_type="creative_step", description=f"Creative pipeline: {step_name} by {agent_id[:8]} ({'ok' if success else 'FAIL'})", agent_id=agent_id, data=json.dumps(data), importance=0.5, ) # ── Memory consolidation ──────────────────────────────────────────────── def _maybe_consolidate(self, agent_id: str) -> None: """Consolidate events into memories when enough data exists.""" from datetime import UTC, datetime, timedelta agent_events = spark_memory.get_events(agent_id=agent_id, limit=50) if len(agent_events) < 5: return completions = [e for e in agent_events if e.event_type == "task_completed"] failures = [e for e in agent_events if e.event_type == "task_failed"] total = len(completions) + len(failures) if total < 3: return success_rate = len(completions) / total if total else 0 # Determine target memory type based on success rate if success_rate >= 0.8: target_memory_type = "pattern" elif success_rate <= 0.3: target_memory_type = "anomaly" else: return # No consolidation needed for neutral success rates # Check for recent memories of the same type for this agent existing_memories = spark_memory.get_memories(subject=agent_id, limit=5) now = datetime.now(UTC) one_hour_ago = now - timedelta(hours=1) for memory in existing_memories: if memory.memory_type == target_memory_type: try: created_at = datetime.fromisoformat(memory.created_at) if created_at >= one_hour_ago: logger.info( "Consolidation: skipping — recent memory exists for %s", agent_id[:8], ) return except (ValueError, TypeError): continue # Store the new memory if target_memory_type == "pattern": spark_memory.store_memory( memory_type="pattern", subject=agent_id, content=f"Agent {agent_id[:8]} has a strong track record: " f"{len(completions)}/{total} tasks completed successfully.", confidence=min(0.95, 0.6 + total * 0.05), source_events=total, ) else: # anomaly spark_memory.store_memory( memory_type="anomaly", subject=agent_id, content=f"Agent {agent_id[:8]} is struggling: only " f"{len(completions)}/{total} tasks completed.", confidence=min(0.95, 0.6 + total * 0.05), source_events=total, ) # ── Query API ──────────────────────────────────────────────────────────── def status(self) -> dict: """Return a summary of Spark Intelligence state.""" eidos_stats = spark_eidos.get_accuracy_stats() return { "enabled": self._enabled, "events_captured": spark_memory.count_events(), "memories_stored": spark_memory.count_memories(), "predictions": eidos_stats, "event_types": { "task_posted": spark_memory.count_events("task_posted"), "bid_submitted": spark_memory.count_events("bid_submitted"), "task_assigned": spark_memory.count_events("task_assigned"), "task_completed": spark_memory.count_events("task_completed"), "task_failed": spark_memory.count_events("task_failed"), "agent_joined": spark_memory.count_events("agent_joined"), "tool_executed": spark_memory.count_events("tool_executed"), "creative_step": spark_memory.count_events("creative_step"), }, } def get_advisories(self) -> list[Advisory]: """Generate current advisories based on accumulated intelligence.""" if not self._enabled: return [] return spark_advisor.generate_advisories() def get_timeline(self, limit: int = 50) -> list[SparkEvent]: """Return recent events as a timeline.""" return spark_memory.get_events(limit=limit) def get_memories(self, limit: int = 50) -> list[SparkMemory]: """Return consolidated memories.""" return spark_memory.get_memories(limit=limit) def get_predictions(self, limit: int = 20) -> list: """Return recent EIDOS predictions.""" return spark_eidos.get_predictions(limit=limit) # ── Lazy singleton ──────────────────────────────────────────────────────────── _spark_engine: SparkEngine | None = None def get_spark_engine() -> SparkEngine: """Return the module-level SparkEngine, creating it on first access.""" global _spark_engine if _spark_engine is None: try: from config import settings _spark_engine = SparkEngine(enabled=settings.spark_enabled) except Exception as exc: logger.debug("Spark engine settings load error: %s", exc) _spark_engine = SparkEngine(enabled=True) return _spark_engine def reset_spark_engine() -> None: """Reset the singleton for test isolation.""" global _spark_engine _spark_engine = None def __getattr__(name: str): """Module-level __getattr__ for lazy backward-compatible access to spark_engine.""" if name == "spark_engine": return get_spark_engine() raise AttributeError(f"module {__name__!r} has no attribute {name!r}")