#!/usr/bin/env python3 """Stress-test simulation for portable Hermes agent (10 concurrent tasks). This script validates thread-safety and resource stability without needing a real Windows environment. It mimics the agent's internal task model. """ import concurrent.futures, hashlib, os, random, tempfile, time def simulated_hermes_task(task_id: int) -> dict: start = time.time() tmpdir = tempfile.mkdtemp() try: # Simulate file I/O (YAML read/write) for i in range(3): fpath = os.path.join(tmpdir, f'config_{i}.yaml') with open(fpath, 'w') as f: f.write(f'model: hermes-4-14b\ntemp: {random.random()}\n') with open(fpath) as f: _ = f.read() # Simulate network latency (HTTP call placeholder) delay = random.uniform(0.3, 2.0) time.sleep(delay) # Simulate CPU-bound work (hashing) data = os.urandom(5 * 1024 * 1024) # 5 MB _ = hashlib.sha256(data).hexdigest() return { 'task_id': task_id, 'success': True, 'duration': time.time() - start, 'file_ops': 6, 'network_delay': delay, } except Exception as e: return {'task_id': task_id, 'success': False, 'error': str(e)} finally: # Cleanup try: import shutil; shutil.rmtree(tmpdir) except Exception: pass def main(): N = 10 print(f'[stress-test] Launching {N} concurrent simulated Hermes tasks...') start_all = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=N) as pool: futures = [pool.submit(simulated_hermes_task, i) for i in range(N)] results = [f.result() for f in concurrent.futures.as_completed(futures)] elapsed = time.time() - start_all passed = sum(1 for r in results if r['success']) durations = [r['duration'] for r in results if r['success']] print(f'[stress-test] {passed}/{N} tasks succeeded in {elapsed:.2f}s') if passed == N: print(f'[stress-test] mean task time: {sum(durations)/len(durations):.2f}s') print('[stress-test] ✅ PASS — no crashes, all tasks completed') return 0 else: print('[stress-test] ❌ FAIL — some tasks errored:') for r in results: if not r['success']: print(f' task {r["task_id"]}: {r.get("error")}') return 1 if __name__ == '__main__': raise SystemExit(main())