Compare commits
10 Commits
claude/iss
...
feat/mnemo
| Author | SHA1 | Date | |
|---|---|---|---|
| bd69018f4a | |||
| 66c0433fc3 | |||
| 332789199f | |||
| 2e0853510d | |||
| 31fdd01931 | |||
| a0964a2fbf | |||
| 1e7bb2a453 | |||
| 847c4d50d4 | |||
|
|
220f20c794 | ||
| e85cefd9c0 |
116
app.js
116
app.js
@@ -44,6 +44,7 @@ let particles, dustParticles;
|
||||
let debugOverlay;
|
||||
let frameCount = 0, lastFPSTime = 0, fps = 0;
|
||||
let chatOpen = true;
|
||||
let memoryFeedEntries = []; // Mnemosyne: recent memory events for feed panel
|
||||
let loadProgress = 0;
|
||||
let performanceTier = 'high';
|
||||
|
||||
@@ -2041,6 +2042,14 @@ function connectHermes() {
|
||||
addChatMessage('system', 'Hermes link established.');
|
||||
updateWsHudStatus(true);
|
||||
refreshWorkshopPanel();
|
||||
|
||||
// Mnemosyne: request memory sync from Hermes
|
||||
try {
|
||||
hermesWs.send(JSON.stringify({ type: 'memory', action: 'sync_request' }));
|
||||
console.info('[Mnemosyne] Sent sync_request to Hermes');
|
||||
} catch (e) {
|
||||
console.warn('[Mnemosyne] Failed to send sync_request:', e);
|
||||
}
|
||||
};
|
||||
|
||||
// Initialize MemPalace
|
||||
@@ -2091,6 +2100,8 @@ function handleHermesMessage(data) {
|
||||
recentToolOutputs.push({ type: 'result', agent: data.agent || 'SYSTEM', content });
|
||||
addToolMessage(data.agent || 'SYSTEM', 'result', content);
|
||||
refreshWorkshopPanel();
|
||||
} else if (data.type === 'memory') {
|
||||
handleMemoryMessage(data);
|
||||
} else if (data.type === 'history') {
|
||||
const container = document.getElementById('chat-messages');
|
||||
container.innerHTML = '';
|
||||
@@ -2102,6 +2113,111 @@ function handleHermesMessage(data) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ═══════════════════════════════════════════
|
||||
// MNEMOSYNE — LIVE MEMORY BRIDGE
|
||||
// ═══════════════════════════════════════════
|
||||
|
||||
/**
|
||||
* Handle incoming memory messages from Hermes WS.
|
||||
* Actions: place, remove, update, sync_response
|
||||
*/
|
||||
|
||||
/**
|
||||
* Clear all entries from the memory feed.
|
||||
*/
|
||||
function clearMemoryFeed() {
|
||||
memoryFeedEntries = [];
|
||||
renderMemoryFeed();
|
||||
console.info('[Mnemosyne] Memory feed cleared');
|
||||
}
|
||||
|
||||
function handleMemoryMessage(data) {
|
||||
const action = data.action;
|
||||
const memory = data.memory;
|
||||
const memories = data.memories;
|
||||
|
||||
if (action === 'place' && memory) {
|
||||
const placed = SpatialMemory.placeMemory(memory);
|
||||
if (placed) {
|
||||
addMemoryFeedEntry('place', memory);
|
||||
console.info('[Mnemosyne] Memory placed via WS:', memory.id);
|
||||
}
|
||||
} else if (action === 'remove' && memory) {
|
||||
SpatialMemory.removeMemory(memory.id);
|
||||
addMemoryFeedEntry('remove', memory);
|
||||
console.info('[Mnemosyne] Memory removed via WS:', memory.id);
|
||||
} else if (action === 'update' && memory) {
|
||||
SpatialMemory.updateMemory(memory.id, memory);
|
||||
addMemoryFeedEntry('update', memory);
|
||||
console.info('[Mnemosyne] Memory updated via WS:', memory.id);
|
||||
} else if (action === 'sync_response' && Array.isArray(memories)) {
|
||||
const count = SpatialMemory.importMemories(memories);
|
||||
addMemoryFeedEntry('sync', { content: count + ' memories synced', id: 'sync' });
|
||||
console.info('[Mnemosyne] Synced', count, 'memories from Hermes');
|
||||
} else {
|
||||
console.warn('[Mnemosyne] Unknown memory action:', action);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an entry to the memory activity feed panel.
|
||||
*/
|
||||
function addMemoryFeedEntry(action, memory) {
|
||||
const entry = {
|
||||
action,
|
||||
content: memory.content || memory.id || '(unknown)',
|
||||
category: memory.category || 'working',
|
||||
timestamp: new Date().toISOString()
|
||||
};
|
||||
|
||||
memoryFeedEntries.unshift(entry);
|
||||
if (memoryFeedEntries.length > 5) memoryFeedEntries.pop();
|
||||
|
||||
renderMemoryFeed();
|
||||
|
||||
// Auto-dismiss entries older than 5 minutes
|
||||
setTimeout(() => {
|
||||
const idx = memoryFeedEntries.indexOf(entry);
|
||||
if (idx > -1) {
|
||||
memoryFeedEntries.splice(idx, 1);
|
||||
renderMemoryFeed();
|
||||
}
|
||||
}, 300000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Render the memory feed panel.
|
||||
*/
|
||||
function renderMemoryFeed() {
|
||||
const container = document.getElementById('memory-feed-list');
|
||||
if (!container) return;
|
||||
|
||||
container.innerHTML = '';
|
||||
memoryFeedEntries.forEach(entry => {
|
||||
const el = document.createElement('div');
|
||||
el.className = 'memory-feed-entry memory-feed-' + entry.action;
|
||||
|
||||
const regionDef = SpatialMemory.REGIONS[entry.category] || SpatialMemory.REGIONS.working;
|
||||
const dotColor = '#' + regionDef.color.toString(16).padStart(6, '0');
|
||||
const time = new Date(entry.timestamp).toLocaleTimeString();
|
||||
const truncated = entry.content.length > 40 ? entry.content.slice(0, 40) + '\u2026' : entry.content;
|
||||
const actionIcon = { place: '\u2795', remove: '\u2796', update: '\u270F', sync: '\u21C4' }[entry.action] || '\u2022';
|
||||
|
||||
el.innerHTML = '<span class="memory-feed-dot" style="background:' + dotColor + '"></span>' +
|
||||
'<span class="memory-feed-action">' + actionIcon + '</span>' +
|
||||
'<span class="memory-feed-content">' + truncated + '</span>' +
|
||||
'<span class="memory-feed-time">' + time + '</span>';
|
||||
|
||||
container.appendChild(el);
|
||||
});
|
||||
|
||||
// Show feed if there are entries
|
||||
const panel = document.getElementById('memory-feed');
|
||||
if (panel) panel.style.display = memoryFeedEntries.length > 0 ? 'block' : 'none';
|
||||
}
|
||||
|
||||
|
||||
function updateWsHudStatus(connected) {
|
||||
// Update MemPalace status alongside regular WS status
|
||||
updateMemPalaceStatus();
|
||||
|
||||
46
docker-compose.desktop.yml
Normal file
46
docker-compose.desktop.yml
Normal file
@@ -0,0 +1,46 @@
|
||||
version: "3.9"
|
||||
|
||||
# Sandboxed desktop environment for Hermes computer-use primitives.
|
||||
# Provides Xvfb (virtual framebuffer) + noVNC (browser-accessible VNC).
|
||||
#
|
||||
# Usage:
|
||||
# docker compose -f docker-compose.desktop.yml up -d
|
||||
# # Visit http://localhost:6080 to see the virtual desktop
|
||||
#
|
||||
# docker compose -f docker-compose.desktop.yml run hermes-desktop \
|
||||
# python -m nexus.computer_use_demo
|
||||
#
|
||||
# docker compose -f docker-compose.desktop.yml down
|
||||
|
||||
services:
|
||||
hermes-desktop:
|
||||
image: dorowu/ubuntu-desktop-lxde-vnc:focal
|
||||
environment:
|
||||
# Resolution for the virtual display
|
||||
RESOLUTION: "1280x800"
|
||||
# VNC password (change in production)
|
||||
VNC_PASSWORD: "hermes"
|
||||
# Disable HTTP password for development convenience
|
||||
HTTP_PASSWORD: ""
|
||||
ports:
|
||||
# noVNC web interface
|
||||
- "6080:80"
|
||||
# Raw VNC port (optional)
|
||||
- "5900:5900"
|
||||
volumes:
|
||||
# Mount repo into container so scripts are available
|
||||
- .:/workspace
|
||||
# Persist nexus runtime data (heartbeats, logs, evidence)
|
||||
- nexus_data:/root/.nexus
|
||||
working_dir: /workspace
|
||||
shm_size: "256mb"
|
||||
# Install Python deps on startup then keep container alive
|
||||
command: >
|
||||
bash -c "
|
||||
pip install --quiet pyautogui Pillow &&
|
||||
/startup.sh
|
||||
"
|
||||
|
||||
volumes:
|
||||
nexus_data:
|
||||
driver: local
|
||||
174
docs/computer-use.md
Normal file
174
docs/computer-use.md
Normal file
@@ -0,0 +1,174 @@
|
||||
# Computer Use — Desktop Automation Primitives for Hermes
|
||||
|
||||
Issue: [#1125](https://forge.alexanderwhitestone.com/Timmy_Foundation/the-nexus/issues/1125)
|
||||
|
||||
## Overview
|
||||
|
||||
`nexus/computer_use.py` adds desktop automation primitives to the Hermes fleet. Agents can take screenshots, click, type, and scroll — enough to drive a browser, validate a UI, or diagnose a failed workflow page visually.
|
||||
|
||||
All actions are logged to a JSONL audit trail at `~/.nexus/computer_use_actions.jsonl`.
|
||||
|
||||
---
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Local (requires a real display or Xvfb)
|
||||
|
||||
```bash
|
||||
# Install dependencies
|
||||
pip install pyautogui Pillow
|
||||
|
||||
# Run the Phase 1 demo
|
||||
python -m nexus.computer_use_demo
|
||||
```
|
||||
|
||||
### Sandboxed (Docker + Xvfb + noVNC)
|
||||
|
||||
```bash
|
||||
docker compose -f docker-compose.desktop.yml up -d
|
||||
# Visit http://localhost:6080 in your browser to see the virtual desktop
|
||||
|
||||
docker compose -f docker-compose.desktop.yml run hermes-desktop \
|
||||
python -m nexus.computer_use_demo
|
||||
|
||||
docker compose -f docker-compose.desktop.yml down
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## API Reference
|
||||
|
||||
### `computer_screenshot(save_path=None, log_path=...)`
|
||||
|
||||
Capture the current desktop.
|
||||
|
||||
| Param | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `save_path` | `str \| None` | Path to save PNG. If `None`, returns base64 string. |
|
||||
| `log_path` | `Path` | Audit log file. |
|
||||
|
||||
**Returns** `dict`:
|
||||
```json
|
||||
{
|
||||
"ok": true,
|
||||
"image_b64": "<base64 PNG or null>",
|
||||
"saved_to": "<path or null>",
|
||||
"error": null
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `computer_click(x, y, button="left", confirm=False, log_path=...)`
|
||||
|
||||
Click the mouse at screen coordinates.
|
||||
|
||||
| Param | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `x` | `int` | Horizontal coordinate |
|
||||
| `y` | `int` | Vertical coordinate |
|
||||
| `button` | `str` | `"left"` \| `"right"` \| `"middle"` |
|
||||
| `confirm` | `bool` | Required `True` for `right` / `middle` (poka-yoke) |
|
||||
|
||||
**Returns** `dict`:
|
||||
```json
|
||||
{"ok": true, "error": null}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `computer_type(text, confirm=False, interval=0.02, log_path=...)`
|
||||
|
||||
Type text using the keyboard.
|
||||
|
||||
| Param | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `text` | `str` | Text to type |
|
||||
| `confirm` | `bool` | Required `True` when text contains a sensitive keyword |
|
||||
| `interval` | `float` | Delay between keystrokes (seconds) |
|
||||
|
||||
**Sensitive keywords** (require `confirm=True`): `password`, `passwd`, `secret`, `token`, `api_key`, `apikey`, `key`, `auth`
|
||||
|
||||
> Note: the actual `text` value is never written to the audit log — only its length and whether it was flagged as sensitive.
|
||||
|
||||
**Returns** `dict`:
|
||||
```json
|
||||
{"ok": true, "error": null}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `computer_scroll(x, y, amount=3, log_path=...)`
|
||||
|
||||
Scroll the mouse wheel at screen coordinates.
|
||||
|
||||
| Param | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `x` | `int` | Horizontal coordinate |
|
||||
| `y` | `int` | Vertical coordinate |
|
||||
| `amount` | `int` | Scroll units. Positive = up, negative = down. |
|
||||
|
||||
**Returns** `dict`:
|
||||
```json
|
||||
{"ok": true, "error": null}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### `read_action_log(n=20, log_path=...)`
|
||||
|
||||
Return the most recent `n` audit log entries, newest first.
|
||||
|
||||
```python
|
||||
from nexus.computer_use import read_action_log
|
||||
|
||||
for entry in read_action_log(n=5):
|
||||
print(entry["ts"], entry["action"], entry["result"]["ok"])
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Safety Model
|
||||
|
||||
| Action | Safety gate |
|
||||
|--------|-------------|
|
||||
| `computer_click(button="right")` | Requires `confirm=True` |
|
||||
| `computer_click(button="middle")` | Requires `confirm=True` |
|
||||
| `computer_type` with sensitive text | Requires `confirm=True` |
|
||||
| Mouse to top-left corner | pyautogui FAILSAFE — aborts immediately |
|
||||
| All actions | Written to JSONL audit log with timestamp |
|
||||
| Headless environment | All tools degrade gracefully — return `ok=False` with error message |
|
||||
|
||||
---
|
||||
|
||||
## Phase Roadmap
|
||||
|
||||
### Phase 1 — Environment & Primitives ✅
|
||||
- Sandboxed desktop via Xvfb + noVNC (`docker-compose.desktop.yml`)
|
||||
- `computer_screenshot`, `computer_click`, `computer_type`, `computer_scroll`
|
||||
- Poka-yoke safety checks on all destructive actions
|
||||
- JSONL audit log for all actions
|
||||
- Demo: baseline screenshot → open browser → navigate to Gitea → evidence screenshot
|
||||
- 32 unit tests, fully headless (pyautogui mocked)
|
||||
|
||||
### Phase 2 — Tool Integration (planned)
|
||||
- Register tools in the Hermes tool registry
|
||||
- LLM-based planner loop using screenshots as context
|
||||
- Destructive action confirmation UI
|
||||
|
||||
### Phase 3 — Use-Case Pilots (planned)
|
||||
- Pilot 1: Automated visual regression test for fleet dashboard
|
||||
- Pilot 2: Screenshot-based diagnosis of failed CI workflow page
|
||||
|
||||
---
|
||||
|
||||
## File Locations
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `nexus/computer_use.py` | Core tool primitives |
|
||||
| `nexus/computer_use_demo.py` | Phase 1 end-to-end demo |
|
||||
| `tests/test_computer_use.py` | 32 unit tests |
|
||||
| `docker-compose.desktop.yml` | Sandboxed desktop container |
|
||||
| `~/.nexus/computer_use_actions.jsonl` | Runtime audit log |
|
||||
| `~/.nexus/computer_use_evidence/` | Screenshot evidence (demo output) |
|
||||
10
index.html
10
index.html
@@ -439,5 +439,15 @@ index.html
|
||||
setInterval(poll, INTERVAL);
|
||||
})();
|
||||
</script>
|
||||
|
||||
<!-- Memory Activity Feed (Mnemosyne) -->
|
||||
<div id="memory-feed" class="memory-feed" style="display:none;">
|
||||
<div class="memory-feed-header">
|
||||
<span class="memory-feed-title">✨ Memory Feed</span>
|
||||
<div class="memory-feed-actions"><button class="memory-feed-clear" onclick="clearMemoryFeed()">Clear</button><button class="memory-feed-toggle" onclick="document.getElementById('memory-feed').style.display='none'">✕</button></div>
|
||||
</div>
|
||||
<div id="memory-feed-list" class="memory-feed-list"></div>
|
||||
</div>
|
||||
|
||||
</body>
|
||||
</html>
|
||||
|
||||
@@ -76,6 +76,12 @@ const SpatialMemory = (() => {
|
||||
}
|
||||
};
|
||||
|
||||
// ─── PERSISTENCE CONFIG ──────────────────────────────
|
||||
const STORAGE_KEY = 'mnemosyne_spatial_memory';
|
||||
const STORAGE_VERSION = 1;
|
||||
let _dirty = false;
|
||||
let _lastSavedHash = '';
|
||||
|
||||
// ─── STATE ────────────────────────────────────────────
|
||||
let _scene = null;
|
||||
let _regionMarkers = {};
|
||||
@@ -183,6 +189,8 @@ const SpatialMemory = (() => {
|
||||
_drawConnections(mem.id, mem.connections);
|
||||
}
|
||||
|
||||
_dirty = true;
|
||||
saveToStorage();
|
||||
console.info('[Mnemosyne] Spatial memory placed:', mem.id, 'in', region.label);
|
||||
return crystal;
|
||||
}
|
||||
@@ -247,6 +255,8 @@ const SpatialMemory = (() => {
|
||||
}
|
||||
|
||||
delete _memoryObjects[memId];
|
||||
_dirty = true;
|
||||
saveToStorage();
|
||||
}
|
||||
|
||||
// ─── ANIMATE ─────────────────────────────────────────
|
||||
@@ -286,7 +296,9 @@ const SpatialMemory = (() => {
|
||||
_regionMarkers[key] = createRegionMarker(key, region);
|
||||
});
|
||||
|
||||
console.info('[Mnemosyne] Spatial Memory Schema initialized —', Object.keys(REGIONS).length, 'regions');
|
||||
// Restore persisted memories
|
||||
const restored = loadFromStorage();
|
||||
console.info('[Mnemosyne] Spatial Memory Schema initialized —', Object.keys(REGIONS).length, 'regions,', restored, 'memories restored');
|
||||
return REGIONS;
|
||||
}
|
||||
|
||||
@@ -320,6 +332,99 @@ const SpatialMemory = (() => {
|
||||
return Object.values(_memoryObjects).map(o => o.data);
|
||||
}
|
||||
|
||||
// ─── LOCALSTORAGE PERSISTENCE ────────────────────────
|
||||
function _indexHash(index) {
|
||||
// Simple hash of memory IDs + count to detect changes
|
||||
const ids = (index.memories || []).map(m => m.id).sort().join(',');
|
||||
return index.memories.length + ':' + ids;
|
||||
}
|
||||
|
||||
function saveToStorage() {
|
||||
if (typeof localStorage === 'undefined') {
|
||||
console.warn('[Mnemosyne] localStorage unavailable — skipping save');
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
const index = exportIndex();
|
||||
const hash = _indexHash(index);
|
||||
if (hash === _lastSavedHash) return false; // no change
|
||||
|
||||
const payload = JSON.stringify(index);
|
||||
localStorage.setItem(STORAGE_KEY, payload);
|
||||
_lastSavedHash = hash;
|
||||
_dirty = false;
|
||||
console.info('[Mnemosyne] Saved', index.memories.length, 'memories to localStorage');
|
||||
return true;
|
||||
} catch (e) {
|
||||
if (e.name === 'QuotaExceededError' || e.code === 22) {
|
||||
console.warn('[Mnemosyne] localStorage quota exceeded — pruning archive memories');
|
||||
_pruneArchiveMemories();
|
||||
try {
|
||||
const index = exportIndex();
|
||||
localStorage.setItem(STORAGE_KEY, JSON.stringify(index));
|
||||
_lastSavedHash = _indexHash(index);
|
||||
console.info('[Mnemosyne] Saved after prune:', index.memories.length, 'memories');
|
||||
return true;
|
||||
} catch (e2) {
|
||||
console.error('[Mnemosyne] Save failed even after prune:', e2);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
console.error('[Mnemosyne] Save failed:', e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function loadFromStorage() {
|
||||
if (typeof localStorage === 'undefined') {
|
||||
console.warn('[Mnemosyne] localStorage unavailable — starting empty');
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
const raw = localStorage.getItem(STORAGE_KEY);
|
||||
if (!raw) {
|
||||
console.info('[Mnemosyne] No saved state found — starting fresh');
|
||||
return 0;
|
||||
}
|
||||
const index = JSON.parse(raw);
|
||||
if (index.version !== STORAGE_VERSION) {
|
||||
console.warn('[Mnemosyne] Saved version mismatch (got', index.version, 'expected', + STORAGE_VERSION + ') — starting fresh');
|
||||
return 0;
|
||||
}
|
||||
const count = importIndex(index);
|
||||
_lastSavedHash = _indexHash(index);
|
||||
return count;
|
||||
} catch (e) {
|
||||
console.error('[Mnemosyne] Load failed:', e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
function _pruneArchiveMemories() {
|
||||
// Remove oldest archive-region memories first
|
||||
const archive = getMemoriesInRegion('archive');
|
||||
const working = Object.values(_memoryObjects).filter(o => o.region !== 'archive');
|
||||
// Sort archive by timestamp ascending (oldest first)
|
||||
archive.sort((a, b) => {
|
||||
const ta = a.data.timestamp || a.mesh.userData.createdAt || '';
|
||||
const tb = b.data.timestamp || b.mesh.userData.createdAt || '';
|
||||
return ta.localeCompare(tb);
|
||||
});
|
||||
const toRemove = Math.max(1, Math.ceil(archive.length * 0.25));
|
||||
for (let i = 0; i < toRemove && i < archive.length; i++) {
|
||||
removeMemory(archive[i].data.id);
|
||||
}
|
||||
console.info('[Mnemosyne] Pruned', toRemove, 'archive memories');
|
||||
}
|
||||
|
||||
function clearStorage() {
|
||||
if (typeof localStorage !== 'undefined') {
|
||||
localStorage.removeItem(STORAGE_KEY);
|
||||
_lastSavedHash = '';
|
||||
console.info('[Mnemosyne] Cleared localStorage');
|
||||
}
|
||||
}
|
||||
|
||||
// ─── PERSISTENCE ─────────────────────────────────────
|
||||
function exportIndex() {
|
||||
return {
|
||||
@@ -366,10 +471,86 @@ const SpatialMemory = (() => {
|
||||
return results.slice(0, maxResults);
|
||||
}
|
||||
|
||||
// ─── BULK IMPORT (WebSocket sync) ───────────────────
|
||||
/**
|
||||
* Import an array of memories in batch — for WebSocket sync.
|
||||
* Skips duplicates (same id). Returns count of newly placed.
|
||||
* @param {Array} memories - Array of memory objects { id, content, category, ... }
|
||||
* @returns {number} Count of newly placed memories
|
||||
*/
|
||||
function importMemories(memories) {
|
||||
if (!Array.isArray(memories) || memories.length === 0) return 0;
|
||||
let count = 0;
|
||||
memories.forEach(mem => {
|
||||
if (mem.id && !_memoryObjects[mem.id]) {
|
||||
placeMemory(mem);
|
||||
count++;
|
||||
}
|
||||
});
|
||||
if (count > 0) {
|
||||
_dirty = true;
|
||||
saveToStorage();
|
||||
console.info('[Mnemosyne] Bulk imported', count, 'new memories (total:', Object.keys(_memoryObjects).length, ')');
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
// ─── UPDATE MEMORY ──────────────────────────────────
|
||||
/**
|
||||
* Update an existing memory's visual properties (strength, connections).
|
||||
* Does not move the crystal — only updates metadata and re-renders.
|
||||
* @param {string} memId - Memory ID to update
|
||||
* @param {object} updates - Fields to update: { strength, connections, content }
|
||||
* @returns {boolean} True if updated
|
||||
*/
|
||||
function updateMemory(memId, updates) {
|
||||
const obj = _memoryObjects[memId];
|
||||
if (!obj) return false;
|
||||
|
||||
if (updates.strength != null) {
|
||||
const strength = Math.max(0.05, Math.min(1, updates.strength));
|
||||
obj.mesh.userData.strength = strength;
|
||||
obj.mesh.material.emissiveIntensity = 1.5 * strength;
|
||||
obj.mesh.material.opacity = 0.5 + strength * 0.4;
|
||||
}
|
||||
if (updates.content != null) {
|
||||
obj.data.content = updates.content;
|
||||
}
|
||||
if (updates.connections != null) {
|
||||
obj.data.connections = updates.connections;
|
||||
// Rebuild connection lines
|
||||
_rebuildConnections(memId);
|
||||
}
|
||||
_dirty = true;
|
||||
saveToStorage();
|
||||
return true;
|
||||
}
|
||||
|
||||
function _rebuildConnections(memId) {
|
||||
// Remove existing lines for this memory
|
||||
for (let i = _connectionLines.length - 1; i >= 0; i--) {
|
||||
const line = _connectionLines[i];
|
||||
if (line.userData.from === memId || line.userData.to === memId) {
|
||||
if (line.parent) line.parent.remove(line);
|
||||
line.geometry.dispose();
|
||||
line.material.dispose();
|
||||
_connectionLines.splice(i, 1);
|
||||
}
|
||||
}
|
||||
// Recreate lines for current connections
|
||||
const obj = _memoryObjects[memId];
|
||||
if (!obj || !obj.data.connections) return;
|
||||
obj.data.connections.forEach(targetId => {
|
||||
const target = _memoryObjects[targetId];
|
||||
if (target) _createConnectionLine(obj, target);
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
init, placeMemory, removeMemory, update,
|
||||
init, placeMemory, removeMemory, update, importMemories, updateMemory,
|
||||
getMemoryAtPosition, getRegionAtPosition, getMemoriesInRegion, getAllMemories,
|
||||
exportIndex, importIndex, searchNearby, REGIONS
|
||||
exportIndex, importIndex, searchNearby, REGIONS,
|
||||
saveToStorage, loadFromStorage, clearStorage
|
||||
};
|
||||
})();
|
||||
|
||||
|
||||
313
nexus/computer_use.py
Normal file
313
nexus/computer_use.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""
|
||||
Hermes Desktop Automation Primitives — Computer Use (#1125)
|
||||
|
||||
Provides sandboxed desktop control tools for Hermes agents:
|
||||
- computer_screenshot() — capture current desktop
|
||||
- computer_click() — mouse click with poka-yoke on non-primary buttons
|
||||
- computer_type() — keyboard input with poka-yoke on sensitive text
|
||||
- computer_scroll() — scroll wheel action
|
||||
- read_action_log() — inspect recent action audit trail
|
||||
|
||||
All actions are logged to a JSONL audit file.
|
||||
pyautogui.FAILSAFE is enabled globally — move mouse to top-left corner to abort.
|
||||
|
||||
Designed to degrade gracefully when no display is available (headless CI).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Safety globals
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Poka-yoke: require confirmation for dangerous inputs
|
||||
_SENSITIVE_KEYWORDS = frozenset(
|
||||
["password", "passwd", "secret", "token", "api_key", "apikey", "key", "auth"]
|
||||
)
|
||||
|
||||
# Destructive mouse buttons (non-primary)
|
||||
_DANGEROUS_BUTTONS = frozenset(["right", "middle"])
|
||||
|
||||
# Default log location
|
||||
DEFAULT_ACTION_LOG = Path.home() / ".nexus" / "computer_use_actions.jsonl"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Lazy pyautogui import — fails gracefully in headless environments
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_PYAUTOGUI_AVAILABLE = False
|
||||
_pyautogui = None
|
||||
|
||||
|
||||
def _get_pyautogui():
|
||||
"""Return pyautogui, enabling FAILSAFE. Returns None if unavailable."""
|
||||
global _pyautogui, _PYAUTOGUI_AVAILABLE
|
||||
if _pyautogui is not None:
|
||||
return _pyautogui
|
||||
try:
|
||||
import pyautogui # type: ignore
|
||||
|
||||
pyautogui.FAILSAFE = True
|
||||
pyautogui.PAUSE = 0.05 # small delay between actions
|
||||
_pyautogui = pyautogui
|
||||
_PYAUTOGUI_AVAILABLE = True
|
||||
return _pyautogui
|
||||
except Exception:
|
||||
logger.warning("pyautogui unavailable — computer_use running in stub mode")
|
||||
return None
|
||||
|
||||
|
||||
def _get_pil():
|
||||
"""Return PIL Image module or None."""
|
||||
try:
|
||||
from PIL import Image # type: ignore
|
||||
|
||||
return Image
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Audit log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _log_action(action: str, params: dict, result: dict, log_path: Path = DEFAULT_ACTION_LOG):
|
||||
"""Append one action record to the JSONL audit log."""
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
record = {
|
||||
"ts": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
"action": action,
|
||||
"params": params,
|
||||
"result": result,
|
||||
}
|
||||
with open(log_path, "a") as fh:
|
||||
fh.write(json.dumps(record) + "\n")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public tool API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def computer_screenshot(
|
||||
save_path: Optional[str] = None,
|
||||
log_path: Path = DEFAULT_ACTION_LOG,
|
||||
) -> dict:
|
||||
"""Capture a screenshot of the current desktop.
|
||||
|
||||
Args:
|
||||
save_path: Optional file path to save the PNG. If omitted the image
|
||||
is returned as a base64-encoded string.
|
||||
log_path: Audit log file (default ~/.nexus/computer_use_actions.jsonl).
|
||||
|
||||
Returns:
|
||||
dict with keys:
|
||||
- ok (bool)
|
||||
- image_b64 (str | None) — base64 PNG when save_path is None
|
||||
- saved_to (str | None) — path when save_path was given
|
||||
- error (str | None) — human-readable error if ok=False
|
||||
"""
|
||||
pag = _get_pyautogui()
|
||||
params = {"save_path": save_path}
|
||||
|
||||
if pag is None:
|
||||
result = {"ok": False, "image_b64": None, "saved_to": None, "error": "pyautogui unavailable"}
|
||||
_log_action("screenshot", params, result, log_path)
|
||||
return result
|
||||
|
||||
try:
|
||||
screenshot = pag.screenshot()
|
||||
if save_path:
|
||||
screenshot.save(save_path)
|
||||
result = {"ok": True, "image_b64": None, "saved_to": save_path, "error": None}
|
||||
else:
|
||||
buf = io.BytesIO()
|
||||
screenshot.save(buf, format="PNG")
|
||||
b64 = base64.b64encode(buf.getvalue()).decode()
|
||||
result = {"ok": True, "image_b64": b64, "saved_to": None, "error": None}
|
||||
except Exception as exc:
|
||||
result = {"ok": False, "image_b64": None, "saved_to": None, "error": str(exc)}
|
||||
|
||||
_log_action("screenshot", params, {k: v for k, v in result.items() if k != "image_b64"}, log_path)
|
||||
return result
|
||||
|
||||
|
||||
def computer_click(
|
||||
x: int,
|
||||
y: int,
|
||||
button: str = "left",
|
||||
confirm: bool = False,
|
||||
log_path: Path = DEFAULT_ACTION_LOG,
|
||||
) -> dict:
|
||||
"""Click the mouse at screen coordinates (x, y).
|
||||
|
||||
Poka-yoke: right/middle clicks require confirm=True.
|
||||
|
||||
Args:
|
||||
x: Horizontal screen coordinate.
|
||||
y: Vertical screen coordinate.
|
||||
button: "left" | "right" | "middle"
|
||||
confirm: Must be True for non-left buttons.
|
||||
log_path: Audit log file.
|
||||
|
||||
Returns:
|
||||
dict with keys: ok, error
|
||||
"""
|
||||
params = {"x": x, "y": y, "button": button, "confirm": confirm}
|
||||
|
||||
if button in _DANGEROUS_BUTTONS and not confirm:
|
||||
result = {
|
||||
"ok": False,
|
||||
"error": (
|
||||
f"button={button!r} requires confirm=True (poka-yoke). "
|
||||
"Pass confirm=True only after verifying this action is intentional."
|
||||
),
|
||||
}
|
||||
_log_action("click", params, result, log_path)
|
||||
return result
|
||||
|
||||
if button not in ("left", "right", "middle"):
|
||||
result = {"ok": False, "error": f"Unknown button {button!r}. Use 'left', 'right', or 'middle'."}
|
||||
_log_action("click", params, result, log_path)
|
||||
return result
|
||||
|
||||
pag = _get_pyautogui()
|
||||
if pag is None:
|
||||
result = {"ok": False, "error": "pyautogui unavailable"}
|
||||
_log_action("click", params, result, log_path)
|
||||
return result
|
||||
|
||||
try:
|
||||
pag.click(x, y, button=button)
|
||||
result = {"ok": True, "error": None}
|
||||
except Exception as exc:
|
||||
result = {"ok": False, "error": str(exc)}
|
||||
|
||||
_log_action("click", params, result, log_path)
|
||||
return result
|
||||
|
||||
|
||||
def computer_type(
|
||||
text: str,
|
||||
confirm: bool = False,
|
||||
interval: float = 0.02,
|
||||
log_path: Path = DEFAULT_ACTION_LOG,
|
||||
) -> dict:
|
||||
"""Type text using the keyboard.
|
||||
|
||||
Poka-yoke: if *text* contains a sensitive keyword (password, token, key…)
|
||||
confirm=True is required. The actual text value is never written to the
|
||||
audit log.
|
||||
|
||||
Args:
|
||||
text: The string to type.
|
||||
confirm: Must be True when the text looks sensitive.
|
||||
interval: Delay between keystrokes (seconds).
|
||||
log_path: Audit log file.
|
||||
|
||||
Returns:
|
||||
dict with keys: ok, error
|
||||
"""
|
||||
lower = text.lower()
|
||||
is_sensitive = any(kw in lower for kw in _SENSITIVE_KEYWORDS)
|
||||
params = {"length": len(text), "is_sensitive": is_sensitive, "confirm": confirm}
|
||||
|
||||
if is_sensitive and not confirm:
|
||||
result = {
|
||||
"ok": False,
|
||||
"error": (
|
||||
"Text contains sensitive keyword. Pass confirm=True to proceed. "
|
||||
"Ensure no secrets are being typed into unintended windows."
|
||||
),
|
||||
}
|
||||
_log_action("type", params, result, log_path)
|
||||
return result
|
||||
|
||||
pag = _get_pyautogui()
|
||||
if pag is None:
|
||||
result = {"ok": False, "error": "pyautogui unavailable"}
|
||||
_log_action("type", params, result, log_path)
|
||||
return result
|
||||
|
||||
try:
|
||||
pag.typewrite(text, interval=interval)
|
||||
result = {"ok": True, "error": None}
|
||||
except Exception as exc:
|
||||
result = {"ok": False, "error": str(exc)}
|
||||
|
||||
_log_action("type", params, result, log_path)
|
||||
return result
|
||||
|
||||
|
||||
def computer_scroll(
|
||||
x: int,
|
||||
y: int,
|
||||
amount: int = 3,
|
||||
log_path: Path = DEFAULT_ACTION_LOG,
|
||||
) -> dict:
|
||||
"""Scroll the mouse wheel at screen coordinates (x, y).
|
||||
|
||||
Args:
|
||||
x: Horizontal screen coordinate.
|
||||
y: Vertical screen coordinate.
|
||||
amount: Number of scroll units. Positive = scroll up, negative = down.
|
||||
log_path: Audit log file.
|
||||
|
||||
Returns:
|
||||
dict with keys: ok, error
|
||||
"""
|
||||
params = {"x": x, "y": y, "amount": amount}
|
||||
pag = _get_pyautogui()
|
||||
|
||||
if pag is None:
|
||||
result = {"ok": False, "error": "pyautogui unavailable"}
|
||||
_log_action("scroll", params, result, log_path)
|
||||
return result
|
||||
|
||||
try:
|
||||
pag.scroll(amount, x=x, y=y)
|
||||
result = {"ok": True, "error": None}
|
||||
except Exception as exc:
|
||||
result = {"ok": False, "error": str(exc)}
|
||||
|
||||
_log_action("scroll", params, result, log_path)
|
||||
return result
|
||||
|
||||
|
||||
def read_action_log(
|
||||
n: int = 20,
|
||||
log_path: Path = DEFAULT_ACTION_LOG,
|
||||
) -> list[dict]:
|
||||
"""Return the most recent *n* action records from the audit log.
|
||||
|
||||
Args:
|
||||
n: Maximum number of records to return.
|
||||
log_path: Audit log file.
|
||||
|
||||
Returns:
|
||||
List of action dicts, newest first.
|
||||
"""
|
||||
if not log_path.exists():
|
||||
return []
|
||||
records: list[dict] = []
|
||||
with open(log_path) as fh:
|
||||
for line in fh:
|
||||
line = line.strip()
|
||||
if line:
|
||||
try:
|
||||
records.append(json.loads(line))
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return list(reversed(records[-n:]))
|
||||
118
nexus/computer_use_demo.py
Normal file
118
nexus/computer_use_demo.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
Phase 1 Demo — Desktop Automation via Hermes (#1125)
|
||||
|
||||
Demonstrates the computer_use primitives end-to-end:
|
||||
1. Take a baseline screenshot
|
||||
2. Open a browser and navigate to the Gitea forge
|
||||
3. Take an evidence screenshot
|
||||
|
||||
Run inside a desktop session (Xvfb or real display):
|
||||
|
||||
python -m nexus.computer_use_demo
|
||||
|
||||
Or via Docker:
|
||||
|
||||
docker compose -f docker-compose.desktop.yml run hermes-desktop \
|
||||
python -m nexus.computer_use_demo
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from nexus.computer_use import (
|
||||
computer_click,
|
||||
computer_screenshot,
|
||||
computer_type,
|
||||
read_action_log,
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
||||
EVIDENCE_DIR = Path.home() / ".nexus" / "computer_use_evidence"
|
||||
|
||||
|
||||
def run_demo() -> bool:
|
||||
"""Execute the Phase 1 demo. Returns True on success."""
|
||||
EVIDENCE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
log.info("=== Phase 1 Computer-Use Demo ===")
|
||||
|
||||
# --- Step 1: baseline screenshot ---
|
||||
baseline = EVIDENCE_DIR / "01_baseline.png"
|
||||
log.info("Step 1: capturing baseline screenshot → %s", baseline)
|
||||
result = computer_screenshot(save_path=str(baseline))
|
||||
if not result["ok"]:
|
||||
log.error("Baseline screenshot failed: %s", result["error"])
|
||||
return False
|
||||
log.info(" ✓ baseline saved")
|
||||
|
||||
# --- Step 2: open browser ---
|
||||
log.info("Step 2: opening browser")
|
||||
try:
|
||||
import subprocess
|
||||
# Use xdg-open / open depending on platform; fallback to chromium
|
||||
for cmd in (
|
||||
["xdg-open", GITEA_URL],
|
||||
["chromium-browser", "--no-sandbox", GITEA_URL],
|
||||
["chromium", "--no-sandbox", GITEA_URL],
|
||||
["google-chrome", "--no-sandbox", GITEA_URL],
|
||||
["open", GITEA_URL], # macOS
|
||||
):
|
||||
try:
|
||||
subprocess.Popen(cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
|
||||
log.info(" ✓ browser opened with: %s", cmd[0])
|
||||
break
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
else:
|
||||
log.warning(" ⚠ no browser found — skipping open step")
|
||||
except Exception as exc:
|
||||
log.warning(" ⚠ could not open browser: %s", exc)
|
||||
|
||||
# Give the browser time to load
|
||||
time.sleep(3)
|
||||
|
||||
# --- Step 3: click address bar and navigate (best-effort) ---
|
||||
log.info("Step 3: attempting to type URL in browser address bar (best-effort)")
|
||||
try:
|
||||
import pyautogui # type: ignore
|
||||
|
||||
# Common shortcut to focus address bar
|
||||
pyautogui.hotkey("ctrl", "l")
|
||||
time.sleep(0.3)
|
||||
result_type = computer_type(GITEA_URL)
|
||||
if result_type["ok"]:
|
||||
pyautogui.press("enter")
|
||||
time.sleep(2)
|
||||
log.info(" ✓ URL typed")
|
||||
else:
|
||||
log.warning(" ⚠ type failed: %s", result_type["error"])
|
||||
except ImportError:
|
||||
log.warning(" ⚠ pyautogui not available — skipping URL type step")
|
||||
|
||||
# --- Step 4: evidence screenshot ---
|
||||
evidence = EVIDENCE_DIR / "02_gitea.png"
|
||||
log.info("Step 4: capturing evidence screenshot → %s", evidence)
|
||||
result = computer_screenshot(save_path=str(evidence))
|
||||
if not result["ok"]:
|
||||
log.error("Evidence screenshot failed: %s", result["error"])
|
||||
return False
|
||||
log.info(" ✓ evidence saved")
|
||||
|
||||
# --- Step 5: summary ---
|
||||
log.info("Step 5: recent action log")
|
||||
for entry in read_action_log(n=10):
|
||||
log.info(" %s %s ok=%s", entry["ts"], entry["action"], entry["result"].get("ok"))
|
||||
|
||||
log.info("=== Demo complete — evidence in %s ===", EVIDENCE_DIR)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_demo()
|
||||
sys.exit(0 if success else 1)
|
||||
121
style.css
121
style.css
@@ -1223,3 +1223,124 @@ canvas#nexus-canvas {
|
||||
.l402-msg { color: #fff; }
|
||||
|
||||
.pse-status { color: #4af0c0; font-weight: 600; }
|
||||
|
||||
/* ═══════════════════════════════════════════
|
||||
MNEMOSYNE — Memory Activity Feed
|
||||
═══════════════════════════════════════════ */
|
||||
|
||||
.memory-feed {
|
||||
position: fixed;
|
||||
bottom: 20px;
|
||||
left: 20px;
|
||||
width: 320px;
|
||||
background: rgba(10, 15, 40, 0.92);
|
||||
border: 1px solid rgba(74, 240, 192, 0.25);
|
||||
border-radius: 10px;
|
||||
backdrop-filter: blur(12px);
|
||||
-webkit-backdrop-filter: blur(12px);
|
||||
z-index: 900;
|
||||
font-family: 'JetBrains Mono', monospace;
|
||||
animation: memoryFeedIn 0.3s ease-out;
|
||||
}
|
||||
|
||||
@keyframes memoryFeedIn {
|
||||
from { opacity: 0; transform: translateY(10px); }
|
||||
to { opacity: 1; transform: translateY(0); }
|
||||
}
|
||||
|
||||
.memory-feed-header {
|
||||
display: flex;
|
||||
justify-content: space-between;
|
||||
align-items: center;
|
||||
padding: 8px 12px;
|
||||
border-bottom: 1px solid rgba(74, 240, 192, 0.15);
|
||||
}
|
||||
|
||||
.memory-feed-title {
|
||||
color: #4af0c0;
|
||||
font-size: 12px;
|
||||
font-weight: 600;
|
||||
letter-spacing: 0.5px;
|
||||
}
|
||||
|
||||
|
||||
.memory-feed-actions {
|
||||
display: flex;
|
||||
gap: 8px;
|
||||
align-items: center;
|
||||
}
|
||||
|
||||
.memory-feed-clear {
|
||||
background: rgba(74, 240, 192, 0.1);
|
||||
border: 1px solid rgba(74, 240, 192, 0.3);
|
||||
color: #4af0c0;
|
||||
font-size: 10px;
|
||||
padding: 2px 6px;
|
||||
border-radius: 4px;
|
||||
cursor: pointer;
|
||||
transition: all 0.2s;
|
||||
font-family: inherit;
|
||||
}
|
||||
.memory-feed-clear:hover {
|
||||
background: rgba(74, 240, 192, 0.2);
|
||||
border-color: #4af0c0;
|
||||
}
|
||||
|
||||
.memory-feed-toggle {
|
||||
background: none;
|
||||
border: none;
|
||||
color: rgba(255, 255, 255, 0.4);
|
||||
cursor: pointer;
|
||||
font-size: 14px;
|
||||
padding: 0 4px;
|
||||
}
|
||||
.memory-feed-toggle:hover { color: #fff; }
|
||||
|
||||
.memory-feed-list {
|
||||
max-height: 200px;
|
||||
overflow-y: auto;
|
||||
padding: 6px 0;
|
||||
}
|
||||
|
||||
.memory-feed-entry {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 8px;
|
||||
padding: 6px 12px;
|
||||
font-size: 11px;
|
||||
color: rgba(255, 255, 255, 0.75);
|
||||
transition: background 0.2s;
|
||||
}
|
||||
.memory-feed-entry:hover {
|
||||
background: rgba(255, 255, 255, 0.05);
|
||||
}
|
||||
|
||||
.memory-feed-dot {
|
||||
width: 8px;
|
||||
height: 8px;
|
||||
border-radius: 50%;
|
||||
flex-shrink: 0;
|
||||
}
|
||||
|
||||
.memory-feed-action {
|
||||
flex-shrink: 0;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
.memory-feed-content {
|
||||
flex: 1;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
.memory-feed-time {
|
||||
flex-shrink: 0;
|
||||
color: rgba(255, 255, 255, 0.3);
|
||||
font-size: 10px;
|
||||
}
|
||||
|
||||
.memory-feed-place { border-left: 2px solid #4af0c0; }
|
||||
.memory-feed-remove { border-left: 2px solid #ff4466; }
|
||||
.memory-feed-update { border-left: 2px solid #ffd700; }
|
||||
.memory-feed-sync { border-left: 2px solid #7b5cff; }
|
||||
|
||||
362
tests/test_computer_use.py
Normal file
362
tests/test_computer_use.py
Normal file
@@ -0,0 +1,362 @@
|
||||
"""
|
||||
Tests for nexus.computer_use — Desktop Automation Primitives (#1125)
|
||||
|
||||
All tests run fully headless: pyautogui is mocked throughout.
|
||||
No display is required.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch, call
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from nexus.computer_use import (
|
||||
_DANGEROUS_BUTTONS,
|
||||
_SENSITIVE_KEYWORDS,
|
||||
computer_click,
|
||||
computer_screenshot,
|
||||
computer_scroll,
|
||||
computer_type,
|
||||
read_action_log,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers / fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_log(tmp_path):
|
||||
"""Return a temporary JSONL audit log path."""
|
||||
return tmp_path / "actions.jsonl"
|
||||
|
||||
|
||||
def _last_log_entry(log_path: Path) -> dict:
|
||||
lines = [l.strip() for l in log_path.read_text().splitlines() if l.strip()]
|
||||
return json.loads(lines[-1])
|
||||
|
||||
|
||||
def _make_mock_pag(screenshot_raises=None):
|
||||
"""Build a minimal pyautogui mock."""
|
||||
mock = MagicMock()
|
||||
mock.FAILSAFE = True
|
||||
mock.PAUSE = 0.05
|
||||
if screenshot_raises:
|
||||
mock.screenshot.side_effect = screenshot_raises
|
||||
else:
|
||||
img_mock = MagicMock()
|
||||
img_mock.save = MagicMock()
|
||||
mock.screenshot.return_value = img_mock
|
||||
return mock
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# computer_screenshot
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestComputerScreenshot:
|
||||
def test_returns_b64_when_no_save_path(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
# Make save() write fake PNG bytes
|
||||
import io
|
||||
buf = io.BytesIO(b"\x89PNG\r\n\x1a\n" + b"\x00" * 20)
|
||||
|
||||
def fake_save(obj, format=None):
|
||||
obj.write(buf.getvalue())
|
||||
|
||||
mock_pag.screenshot.return_value.save = MagicMock(side_effect=fake_save)
|
||||
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_screenshot(log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
assert result["image_b64"] is not None
|
||||
assert result["saved_to"] is None
|
||||
assert result["error"] is None
|
||||
|
||||
def test_saves_to_path(self, tmp_log, tmp_path):
|
||||
mock_pag = _make_mock_pag()
|
||||
out_png = tmp_path / "shot.png"
|
||||
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_screenshot(save_path=str(out_png), log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
assert result["saved_to"] == str(out_png)
|
||||
assert result["image_b64"] is None
|
||||
mock_pag.screenshot.return_value.save.assert_called_once_with(str(out_png))
|
||||
|
||||
def test_logs_action(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_screenshot(log_path=tmp_log)
|
||||
|
||||
entry = _last_log_entry(tmp_log)
|
||||
assert entry["action"] == "screenshot"
|
||||
assert "ts" in entry
|
||||
|
||||
def test_returns_error_when_headless(self, tmp_log):
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=None):
|
||||
result = computer_screenshot(log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "unavailable" in result["error"]
|
||||
|
||||
def test_handles_screenshot_exception(self, tmp_log):
|
||||
mock_pag = _make_mock_pag(screenshot_raises=RuntimeError("display error"))
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_screenshot(log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "display error" in result["error"]
|
||||
|
||||
def test_image_b64_not_written_to_log(self, tmp_log):
|
||||
"""The (potentially huge) base64 blob must NOT appear in the audit log."""
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_screenshot(log_path=tmp_log)
|
||||
|
||||
raw = tmp_log.read_text()
|
||||
assert "image_b64" not in raw
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# computer_click
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestComputerClick:
|
||||
def test_left_click_succeeds(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(100, 200, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
mock_pag.click.assert_called_once_with(100, 200, button="left")
|
||||
|
||||
def test_right_click_blocked_without_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(100, 200, button="right", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "confirm=True" in result["error"]
|
||||
mock_pag.click.assert_not_called()
|
||||
|
||||
def test_right_click_allowed_with_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(100, 200, button="right", confirm=True, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
mock_pag.click.assert_called_once_with(100, 200, button="right")
|
||||
|
||||
def test_middle_click_blocked_without_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(50, 50, button="middle", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
|
||||
def test_middle_click_allowed_with_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(50, 50, button="middle", confirm=True, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
|
||||
def test_unknown_button_rejected(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(0, 0, button="turbo", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "Unknown button" in result["error"]
|
||||
|
||||
def test_logs_click_action(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_click(10, 20, log_path=tmp_log)
|
||||
|
||||
entry = _last_log_entry(tmp_log)
|
||||
assert entry["action"] == "click"
|
||||
assert entry["params"]["x"] == 10
|
||||
assert entry["params"]["y"] == 20
|
||||
|
||||
def test_returns_error_when_headless(self, tmp_log):
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=None):
|
||||
result = computer_click(0, 0, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
|
||||
def test_handles_click_exception(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
mock_pag.click.side_effect = Exception("out of bounds")
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_click(99999, 99999, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "out of bounds" in result["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# computer_type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestComputerType:
|
||||
def test_plain_text_succeeds(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_type("hello world", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
mock_pag.typewrite.assert_called_once_with("hello world", interval=0.02)
|
||||
|
||||
def test_sensitive_text_blocked_without_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_type("mypassword123", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "confirm=True" in result["error"]
|
||||
mock_pag.typewrite.assert_not_called()
|
||||
|
||||
def test_sensitive_text_allowed_with_confirm(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_type("mypassword123", confirm=True, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
|
||||
def test_sensitive_keywords_all_blocked(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
for keyword in _SENSITIVE_KEYWORDS:
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_type(f"my{keyword}value", log_path=tmp_log)
|
||||
assert result["ok"] is False, f"keyword {keyword!r} should be blocked"
|
||||
|
||||
def test_text_not_logged(self, tmp_log):
|
||||
"""Actual typed text must NOT appear in the audit log."""
|
||||
mock_pag = _make_mock_pag()
|
||||
secret = "super_secret_value_xyz"
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_type(secret, confirm=True, log_path=tmp_log)
|
||||
|
||||
raw = tmp_log.read_text()
|
||||
assert secret not in raw
|
||||
|
||||
def test_logs_length_not_content(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_type("hello", log_path=tmp_log)
|
||||
|
||||
entry = _last_log_entry(tmp_log)
|
||||
assert entry["params"]["length"] == 5
|
||||
|
||||
def test_returns_error_when_headless(self, tmp_log):
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=None):
|
||||
result = computer_type("abc", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
|
||||
def test_handles_type_exception(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
mock_pag.typewrite.side_effect = Exception("keyboard error")
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_type("hello", log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
assert "keyboard error" in result["error"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# computer_scroll
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestComputerScroll:
|
||||
def test_scroll_up(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_scroll(400, 300, amount=5, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
mock_pag.scroll.assert_called_once_with(5, x=400, y=300)
|
||||
|
||||
def test_scroll_down_negative(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_scroll(400, 300, amount=-3, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is True
|
||||
mock_pag.scroll.assert_called_once_with(-3, x=400, y=300)
|
||||
|
||||
def test_logs_scroll_action(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_scroll(10, 20, amount=2, log_path=tmp_log)
|
||||
|
||||
entry = _last_log_entry(tmp_log)
|
||||
assert entry["action"] == "scroll"
|
||||
assert entry["params"]["amount"] == 2
|
||||
|
||||
def test_returns_error_when_headless(self, tmp_log):
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=None):
|
||||
result = computer_scroll(0, 0, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
|
||||
def test_handles_scroll_exception(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
mock_pag.scroll.side_effect = Exception("scroll error")
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
result = computer_scroll(0, 0, log_path=tmp_log)
|
||||
|
||||
assert result["ok"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# read_action_log
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestReadActionLog:
|
||||
def test_returns_empty_list_when_no_log(self, tmp_path):
|
||||
missing = tmp_path / "nonexistent.jsonl"
|
||||
assert read_action_log(log_path=missing) == []
|
||||
|
||||
def test_returns_recent_entries(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_click(1, 1, log_path=tmp_log)
|
||||
computer_click(2, 2, log_path=tmp_log)
|
||||
computer_click(3, 3, log_path=tmp_log)
|
||||
|
||||
entries = read_action_log(n=2, log_path=tmp_log)
|
||||
assert len(entries) == 2
|
||||
|
||||
def test_newest_first(self, tmp_log):
|
||||
mock_pag = _make_mock_pag()
|
||||
with patch("nexus.computer_use._get_pyautogui", return_value=mock_pag):
|
||||
computer_click(1, 1, log_path=tmp_log)
|
||||
computer_scroll(5, 5, log_path=tmp_log)
|
||||
|
||||
entries = read_action_log(log_path=tmp_log)
|
||||
# Most recent action (scroll) should be first
|
||||
assert entries[0]["action"] == "scroll"
|
||||
assert entries[1]["action"] == "click"
|
||||
|
||||
def test_skips_malformed_lines(self, tmp_log):
|
||||
tmp_log.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_log.write_text('{"action": "click", "ts": "2026-01-01", "params": {}, "result": {}}\nNOT JSON\n')
|
||||
entries = read_action_log(log_path=tmp_log)
|
||||
assert len(entries) == 1
|
||||
Reference in New Issue
Block a user