"""Workspace monitor — tracks file-based communication between Hermes and Timmy. The workspace/ directory provides file-based communication: - workspace/correspondence.md — append-only journal - workspace/inbox/ — files from Hermes to Timmy - workspace/outbox/ — files from Timmy to Hermes This module tracks what Timmy has seen and detects new content. """ import json import logging from pathlib import Path from config import settings logger = logging.getLogger(__name__) _DEFAULT_STATE_PATH = Path("data/workspace_state.json") class WorkspaceMonitor: """Monitors workspace/ directory for new correspondence and inbox files.""" def __init__(self, state_path: Path = _DEFAULT_STATE_PATH) -> None: self._state_path = state_path self._state: dict = {"last_correspondence_line": 0, "seen_inbox_files": []} self._load_state() def _get_workspace_path(self) -> Path: """Get the workspace directory path.""" return Path(settings.repo_root) / "workspace" def _load_state(self) -> None: """Load persisted state from JSON file.""" try: if self._state_path.exists(): with open(self._state_path, encoding="utf-8") as f: loaded = json.load(f) self._state = { "last_correspondence_line": loaded.get("last_correspondence_line", 0), "seen_inbox_files": loaded.get("seen_inbox_files", []), } except Exception as exc: logger.debug("Failed to load workspace state: %s", exc) self._state = {"last_correspondence_line": 0, "seen_inbox_files": []} def _save_state(self) -> None: """Persist state to JSON file.""" try: self._state_path.parent.mkdir(parents=True, exist_ok=True) with open(self._state_path, "w", encoding="utf-8") as f: json.dump(self._state, f, indent=2) except Exception as exc: logger.debug("Failed to save workspace state: %s", exc) def check_correspondence(self) -> str | None: """Read workspace/correspondence.md and return new entries. Returns everything after the last seen line, or None if no new content. """ try: workspace = self._get_workspace_path() correspondence_file = workspace / "correspondence.md" if not correspondence_file.exists(): return None content = correspondence_file.read_text(encoding="utf-8") lines = content.splitlines() last_seen = self._state.get("last_correspondence_line", 0) if len(lines) <= last_seen: return None new_lines = lines[last_seen:] return "\n".join(new_lines) except Exception as exc: logger.debug("Failed to check correspondence: %s", exc) return None def check_inbox(self) -> list[str]: """List workspace/inbox/ files and return any not in seen list. Returns a list of filenames that are new. """ try: workspace = self._get_workspace_path() inbox_dir = workspace / "inbox" if not inbox_dir.exists(): return [] seen = set(self._state.get("seen_inbox_files", [])) current_files = {f.name for f in inbox_dir.iterdir() if f.is_file()} new_files = sorted(current_files - seen) return new_files except Exception as exc: logger.debug("Failed to check inbox: %s", exc) return [] def get_pending_updates(self) -> dict: """Get all pending workspace updates. Returns a dict with keys: - 'new_correspondence': str or None — new entries from correspondence.md - 'new_inbox_files': list[str] — new files in inbox/ """ return { "new_correspondence": self.check_correspondence(), "new_inbox_files": self.check_inbox(), } def mark_seen(self) -> None: """Update state file after processing current content.""" try: workspace = self._get_workspace_path() # Update correspondence line count correspondence_file = workspace / "correspondence.md" if correspondence_file.exists(): content = correspondence_file.read_text(encoding="utf-8") self._state["last_correspondence_line"] = len(content.splitlines()) # Update inbox seen list inbox_dir = workspace / "inbox" if inbox_dir.exists(): current_files = [f.name for f in inbox_dir.iterdir() if f.is_file()] self._state["seen_inbox_files"] = sorted(current_files) else: self._state["seen_inbox_files"] = [] self._save_state() except Exception as exc: logger.debug("Failed to mark workspace as seen: %s", exc) # Module-level singleton workspace_monitor = WorkspaceMonitor()