"""Phase 5: Real-time Multi-Agent Consensus. Implements a "Council of Timmys" for high-stakes decision making. """ import logging import asyncio from typing import List, Dict, Any from agent.gemini_adapter import GeminiAdapter logger = logging.getLogger(__name__) class ConsensusModerator: def __init__(self): self.adapter = GeminiAdapter() async def reach_consensus(self, task: str, agent_count: int = 3) -> Dict[str, Any]: """Spawns multiple agents to debate a task and reaches consensus.""" logger.info(f"Reaching consensus for task: {task} with {agent_count} agents.") # 1. Spawn agents and get their perspectives tasks = [] for i in range(agent_count): prompt = f"Provide your perspective on the following task: {task}" tasks.append(self.adapter.generate( model="gemini-3.1-pro-preview", prompt=prompt, system_instruction=f"You are Timmy Agent #{i+1}. Provide a unique perspective on the task." )) perspectives = await asyncio.gather(*tasks) # 2. Moderate the debate debate_prompt = "The following are different perspectives on the task:\n" for i, p in enumerate(perspectives): debate_prompt += f"Agent #{i+1}: {p['text']}\n" debate_prompt += "\nSynthesize these perspectives and provide a final, consensus-based decision." result = self.adapter.generate( model="gemini-3.1-pro-preview", prompt=debate_prompt, system_instruction="You are the Council Moderator. Your goal is to synthesize multiple perspectives into a single, high-fidelity decision.", thinking=True ) return { "task": task, "perspectives": [p['text'] for p in perspectives], "consensus": result["text"] }