#!/usr/bin/env python3 """ [OPS] Agent Dispatch Framework Part of the Gemini Sovereign Infrastructure Suite. Replaces ad-hoc dispatch scripts with a unified framework for tasking agents. """ import os import sys import argparse SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPT_DIR not in sys.path: sys.path.insert(0, SCRIPT_DIR) from ssh_trust import VerifiedSSHExecutor # --- CONFIGURATION --- FLEET = { "allegro": "167.99.126.228", "bezalel": "159.203.146.185" } class Dispatcher: def __init__(self, executor=None): self.executor = executor or VerifiedSSHExecutor() def log(self, message: str): print(f"[*] {message}") def dispatch(self, host: str, agent_name: str, task: str): self.log(f"Dispatching task to {agent_name} on {host}...") ip = FLEET[host] try: res = self.executor.run( ip, ['python3', 'run_agent.py', '--agent', agent_name, '--task', task], cwd='/opt/hermes', timeout=30, ) if res.returncode == 0: self.log(f"[SUCCESS] {agent_name} completed task.") print(res.stdout) else: self.log(f"[FAILURE] {agent_name} failed task.") print(res.stderr) except Exception as e: self.log(f"[ERROR] Dispatch failed: {e}") def main(): parser = argparse.ArgumentParser(description="Gemini Agent Dispatcher") parser.add_argument("host", choices=list(FLEET.keys()), help="Host to dispatch to") parser.add_argument("agent", help="Agent name") parser.add_argument("task", help="Task description") args = parser.parse_args() dispatcher = Dispatcher() dispatcher.dispatch(args.host, args.agent, args.task) if __name__ == "__main__": main()