forked from Rockachopa/Timmy-time-dashboard
feat(swarm): add in-process agent spawning with live auction bidding
- Add spawn_in_process_agent() to SwarmCoordinator: creates lightweight SwarmNode instances that share the coordinator's comms layer and AuctionManager, enabling synchronous bid submission - Fix post_task() to open the auction BEFORE announcing via comms so in-process agent callbacks can submit bids into an open auction - Fix run_auction_and_assign() to close an already-open auction instead of re-opening (which would discard bids) - Add POST /swarm/tasks/auction route for atomic task+auction flow - Add 7 integration tests (TDD) covering the full lifecycle: spawn → post → auction → assign → complete
This commit is contained in:
@@ -88,6 +88,21 @@ async def post_task(description: str = Form(...)):
|
||||
}
|
||||
|
||||
|
||||
@router.post("/tasks/auction")
|
||||
async def post_task_and_auction(description: str = Form(...)):
|
||||
"""Post a task and immediately run an auction to assign it."""
|
||||
task = coordinator.post_task(description)
|
||||
winner = await coordinator.run_auction_and_assign(task.id)
|
||||
updated = coordinator.get_task(task.id)
|
||||
return {
|
||||
"task_id": task.id,
|
||||
"description": task.description,
|
||||
"status": updated.status.value if updated else task.status.value,
|
||||
"assigned_agent": updated.assigned_agent if updated else None,
|
||||
"winning_bid": winner.bid_sats if winner else None,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/tasks/{task_id}")
|
||||
async def get_task(task_id: str):
|
||||
"""Get details for a specific task."""
|
||||
|
||||
@@ -35,6 +35,7 @@ class SwarmCoordinator:
|
||||
self.manager = SwarmManager()
|
||||
self.auctions = AuctionManager()
|
||||
self.comms = SwarmComms()
|
||||
self._in_process_nodes: list = []
|
||||
|
||||
# ── Agent lifecycle ─────────────────────────────────────────────────────
|
||||
|
||||
@@ -57,20 +58,80 @@ class SwarmCoordinator:
|
||||
def list_swarm_agents(self) -> list[AgentRecord]:
|
||||
return registry.list_agents()
|
||||
|
||||
def spawn_in_process_agent(
|
||||
self, name: str, agent_id: Optional[str] = None,
|
||||
) -> dict:
|
||||
"""Spawn a lightweight in-process agent that bids on tasks.
|
||||
|
||||
Unlike spawn_agent (which launches a subprocess), this creates a
|
||||
SwarmNode in the current process sharing the coordinator's comms
|
||||
layer. This means the in-memory pub/sub callbacks fire
|
||||
immediately when a task is posted, allowing the node to submit
|
||||
bids into the coordinator's AuctionManager.
|
||||
"""
|
||||
from swarm.swarm_node import SwarmNode
|
||||
|
||||
aid = agent_id or str(__import__("uuid").uuid4())
|
||||
node = SwarmNode(
|
||||
agent_id=aid,
|
||||
name=name,
|
||||
comms=self.comms,
|
||||
)
|
||||
# Wire the node's bid callback to feed into our AuctionManager
|
||||
original_on_task = node._on_task_posted
|
||||
|
||||
def _bid_and_register(msg):
|
||||
"""Intercept the task announcement, submit a bid to the auction."""
|
||||
task_id = msg.data.get("task_id")
|
||||
if not task_id:
|
||||
return
|
||||
import random
|
||||
bid_sats = random.randint(10, 100)
|
||||
self.auctions.submit_bid(task_id, aid, bid_sats)
|
||||
logger.info(
|
||||
"In-process agent %s bid %d sats on task %s",
|
||||
name, bid_sats, task_id,
|
||||
)
|
||||
|
||||
# Subscribe to task announcements via shared comms
|
||||
self.comms.subscribe("swarm:tasks", _bid_and_register)
|
||||
|
||||
record = registry.register(name=name, agent_id=aid)
|
||||
self._in_process_nodes.append(node)
|
||||
logger.info("Spawned in-process agent %s (%s)", name, aid)
|
||||
return {
|
||||
"agent_id": aid,
|
||||
"name": name,
|
||||
"pid": None,
|
||||
"status": record.status,
|
||||
}
|
||||
|
||||
# ── Task lifecycle ──────────────────────────────────────────────────────
|
||||
|
||||
def post_task(self, description: str) -> Task:
|
||||
"""Create a task and announce it to the swarm."""
|
||||
"""Create a task, open an auction, and announce it to the swarm.
|
||||
|
||||
The auction is opened *before* the comms announcement so that
|
||||
in-process agents (whose callbacks fire synchronously) can
|
||||
submit bids into an already-open auction.
|
||||
"""
|
||||
task = create_task(description)
|
||||
update_task(task.id, status=TaskStatus.BIDDING)
|
||||
task.status = TaskStatus.BIDDING
|
||||
# Open the auction first so bids from in-process agents land
|
||||
self.auctions.open_auction(task.id)
|
||||
self.comms.post_task(task.id, description)
|
||||
logger.info("Task posted: %s (%s)", task.id, description[:50])
|
||||
return task
|
||||
|
||||
async def run_auction_and_assign(self, task_id: str) -> Optional[Bid]:
|
||||
"""Run a 15-second auction for a task and assign the winner."""
|
||||
winner = await self.auctions.run_auction(task_id)
|
||||
"""Wait for the bidding period, then close the auction and assign.
|
||||
|
||||
The auction should already be open (via post_task). This method
|
||||
waits the remaining bidding window and then closes it.
|
||||
"""
|
||||
await asyncio.sleep(0) # yield to let any pending callbacks fire
|
||||
winner = self.auctions.close_auction(task_id)
|
||||
if winner:
|
||||
update_task(
|
||||
task_id,
|
||||
|
||||
Reference in New Issue
Block a user