Compare commits

..

2 Commits

Author SHA1 Message Date
kimi
96acac5c5f refactor: break up shell.py::run() into helpers
Extract _build_run_env() and _exec_subprocess() from the 89-line run()
method, reducing it to ~30 lines that validate and delegate.

Fixes #539

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:03:33 -04:00
2849dba756 [loop-cycle-948] refactor: break up _gather_system_snapshot() into helpers (#540) (#549) 2026-03-19 20:52:13 -04:00
2 changed files with 83 additions and 83 deletions

View File

@@ -144,6 +144,60 @@ class ShellHand:
return None
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into the current process environment."""
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
return run_env
async def _exec_subprocess(
self,
command: str,
effective_timeout: int,
cwd: str | None,
run_env: dict,
start: float,
) -> ShellResult:
"""Launch *command*, enforce timeout, and return the result."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=(time.time() - start) * 1000,
timed_out=True,
)
exit_code = proc.returncode if proc.returncode is not None else -1
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout_bytes.decode("utf-8", errors="replace").strip(),
stderr=stderr_bytes.decode("utf-8", errors="replace").strip(),
latency_ms=(time.time() - start) * 1000,
)
async def run(
self,
command: str,
@@ -164,7 +218,6 @@ class ShellHand:
"""
start = time.time()
# Validate
validation_error = self._validate_command(command)
if validation_error:
return ShellResult(
@@ -174,64 +227,21 @@ class ShellHand:
latency_ms=(time.time() - start) * 1000,
)
effective_timeout = timeout or self._default_timeout
cwd = working_dir or self._working_dir
try:
import os
run_env = os.environ.copy()
if env:
run_env.update(env)
proc = await asyncio.create_subprocess_shell(
return await self._exec_subprocess(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=run_env,
effective_timeout=timeout or self._default_timeout,
cwd=working_dir or self._working_dir,
run_env=self._build_run_env(env),
start=start,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=effective_timeout
)
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc)
return ShellResult(
command=command,
success=False,
error=str(exc),
latency_ms=latency,
latency_ms=(time.time() - start) * 1000,
)
def status(self) -> dict:

View File

@@ -772,19 +772,10 @@ class ThinkingEngine:
except Exception as exc:
logger.debug("Thought issue filing skipped: %s", exc)
# -- system-snapshot helpers ----------------------------------------
# ── System snapshot helpers ────────────────────────────────────────────
def _snapshot_time(self) -> tuple[datetime, str]:
"""Return (now, formatted-time-line) for the local clock."""
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
line = (
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
)
return now, line
def _snapshot_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count as a display string, or *None*."""
def _snap_thought_count(self, now: datetime) -> str | None:
"""Return today's thought count, or *None* on failure."""
try:
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
with _get_conn(self._db_path) as conn:
@@ -797,8 +788,8 @@ class ThinkingEngine:
logger.debug("Thought count query failed: %s", exc)
return None
def _snapshot_chat_activity(self) -> list[str]:
"""Return chat-activity lines (may be empty)."""
def _snap_chat_activity(self) -> list[str]:
"""Return chat-activity lines (in-memory, no I/O)."""
try:
from infrastructure.chat_store import message_log
@@ -814,16 +805,14 @@ class ThinkingEngine:
logger.debug("Chat activity query failed: %s", exc)
return []
def _snapshot_task_queue(self) -> str | None:
"""Return a one-line task-queue summary, or *None*."""
def _snap_task_queue(self) -> str | None:
"""Return a one-line task queue summary, or *None*."""
try:
from swarm.task_queue.models import get_task_summary_for_briefing
summary = get_task_summary_for_briefing()
running = summary.get("running", 0)
pending = summary.get("pending_approval", 0)
done = summary.get("completed", 0)
failed = summary.get("failed", 0)
s = get_task_summary_for_briefing()
running, pending = s.get("running", 0), s.get("pending_approval", 0)
done, failed = s.get("completed", 0), s.get("failed", 0)
if running or pending or done or failed:
return (
f"Tasks: {running} running, {pending} pending, "
@@ -833,21 +822,20 @@ class ThinkingEngine:
logger.debug("Task queue query failed: %s", exc)
return None
def _snapshot_workspace(self) -> list[str]:
"""Return workspace-update lines (may be empty)."""
def _snap_workspace(self) -> list[str]:
"""Return workspace-update lines (file-based Hermes comms)."""
try:
from timmy.workspace import workspace_monitor
updates = workspace_monitor.get_pending_updates()
lines: list[str] = []
new_corr = updates.get("new_correspondence")
new_inbox = updates.get("new_inbox_files", [])
if new_corr:
line_count = len([ln for ln in new_corr.splitlines() if ln.strip()])
lines.append(
f"Workspace: {line_count} new correspondence entries (latest from: Hermes)"
)
new_inbox = updates.get("new_inbox_files", [])
if new_inbox:
files_str = ", ".join(new_inbox[:5])
if len(new_inbox) > 5:
@@ -858,8 +846,6 @@ class ThinkingEngine:
logger.debug("Workspace check failed: %s", exc)
return []
# -- public snapshot entry point ------------------------------------
def _gather_system_snapshot(self) -> str:
"""Gather lightweight real system state for grounding thoughts in reality.
@@ -867,20 +853,24 @@ class ThinkingEngine:
recent chat activity, and task queue status. Never crashes — every
section is independently try/excepted.
"""
now, time_line = self._snapshot_time()
parts: list[str] = [time_line]
now = datetime.now().astimezone()
tz = now.strftime("%Z") or "UTC"
thought_line = self._snapshot_thought_count(now)
parts: list[str] = [
f"Local time: {now.strftime('%I:%M %p').lstrip('0')} {tz}, {now.strftime('%A %B %d')}"
]
thought_line = self._snap_thought_count(now)
if thought_line:
parts.append(thought_line)
parts.extend(self._snapshot_chat_activity())
parts.extend(self._snap_chat_activity())
task_line = self._snapshot_task_queue()
task_line = self._snap_task_queue()
if task_line:
parts.append(task_line)
parts.extend(self._snapshot_workspace())
parts.extend(self._snap_workspace())
return "\n".join(parts) if parts else ""