From 36f3f1b3a792c5095fcc5c3d4a3e076df7089d91 Mon Sep 17 00:00:00 2001 From: "Claude (Opus 4.6)" Date: Tue, 24 Mar 2026 02:56:35 +0000 Subject: [PATCH] [claude] Add unit tests for tools/system_tools.py (#1345) (#1354) --- tests/timmy/test_system_tools.py | 617 +++++++++++++++++++++++++++++++ 1 file changed, 617 insertions(+) create mode 100644 tests/timmy/test_system_tools.py diff --git a/tests/timmy/test_system_tools.py b/tests/timmy/test_system_tools.py new file mode 100644 index 00000000..b8293546 --- /dev/null +++ b/tests/timmy/test_system_tools.py @@ -0,0 +1,617 @@ +"""Unit tests for timmy/tools/system_tools.py. + +Covers: _safe_eval, calculator, consult_grok, web_fetch, +create_aider_tool (AiderTool), create_code_tools, +create_security_tools, create_devops_tools. +""" + +from __future__ import annotations + +import ast +import math +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from timmy.tools.system_tools import ( + _safe_eval, + calculator, + consult_grok, + create_aider_tool, + web_fetch, +) + +pytestmark = pytest.mark.unit + +# ── _safe_eval ──────────────────────────────────────────────────────────────── + + +def _parse_eval(expr: str): + allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} + allowed["math"] = math + allowed["abs"] = abs + allowed["round"] = round + allowed["min"] = min + allowed["max"] = max + tree = ast.parse(expr, mode="eval") + return _safe_eval(tree, allowed) + + +class TestSafeEval: + @pytest.mark.unit + def test_integer_constant(self): + assert _parse_eval("42") == 42 + + @pytest.mark.unit + def test_float_constant(self): + assert _parse_eval("3.14") == pytest.approx(3.14) + + @pytest.mark.unit + def test_addition(self): + assert _parse_eval("1 + 2") == 3 + + @pytest.mark.unit + def test_subtraction(self): + assert _parse_eval("10 - 4") == 6 + + @pytest.mark.unit + def test_multiplication(self): + assert _parse_eval("3 * 7") == 21 + + @pytest.mark.unit + def test_division(self): + assert _parse_eval("10 / 4") == 2.5 + + @pytest.mark.unit + def test_floor_division(self): + assert _parse_eval("10 // 3") == 3 + + @pytest.mark.unit + def test_modulo(self): + assert _parse_eval("10 % 3") == 1 + + @pytest.mark.unit + def test_power(self): + assert _parse_eval("2 ** 8") == 256 + + @pytest.mark.unit + def test_unary_minus(self): + assert _parse_eval("-5") == -5 + + @pytest.mark.unit + def test_unary_plus(self): + assert _parse_eval("+5") == 5 + + @pytest.mark.unit + def test_math_attribute(self): + assert _parse_eval("math.pi") == pytest.approx(math.pi) + + @pytest.mark.unit + def test_math_function_call(self): + assert _parse_eval("math.sqrt(16)") == pytest.approx(4.0) + + @pytest.mark.unit + def test_allowed_name_abs(self): + assert _parse_eval("abs(-10)") == 10 + + @pytest.mark.unit + def test_allowed_name_round(self): + assert _parse_eval("round(3.7)") == 4 + + @pytest.mark.unit + def test_allowed_name_min(self): + assert _parse_eval("min(5, 2, 8)") == 2 + + @pytest.mark.unit + def test_allowed_name_max(self): + assert _parse_eval("max(5, 2, 8)") == 8 + + @pytest.mark.unit + def test_string_constant_rejected(self): + with pytest.raises(ValueError, match="Unsupported constant"): + _parse_eval("'hello'") + + @pytest.mark.unit + def test_unknown_name_rejected(self): + with pytest.raises(ValueError, match="Unknown name"): + _parse_eval("xyz") + + @pytest.mark.unit + def test_unsupported_binary_op(self): + with pytest.raises(ValueError, match="Unsupported"): + _parse_eval("3 & 5") + + @pytest.mark.unit + def test_unsupported_unary_op(self): + with pytest.raises(ValueError, match="Unsupported"): + _parse_eval("~5") + + @pytest.mark.unit + def test_attribute_on_non_math_rejected(self): + with pytest.raises(ValueError, match="Attribute access not allowed"): + _parse_eval("abs.__class__") + + @pytest.mark.unit + def test_invalid_math_attr_rejected(self): + with pytest.raises(ValueError, match="Attribute access not allowed"): + _parse_eval("math.__builtins__") + + @pytest.mark.unit + def test_unsupported_syntax_subscript(self): + with pytest.raises(ValueError, match="Unsupported syntax"): + _parse_eval("[1, 2][0]") + + @pytest.mark.unit + def test_expression_wrapper(self): + """ast.Expression node is unwrapped correctly.""" + allowed = {"abs": abs} + tree = ast.parse("abs(-1)", mode="eval") + assert isinstance(tree, ast.Expression) + assert _safe_eval(tree, allowed) == 1 + + +# ── calculator ──────────────────────────────────────────────────────────────── + + +class TestCalculator: + @pytest.mark.unit + def test_basic_addition(self): + assert calculator("2 + 3") == "5" + + @pytest.mark.unit + def test_multiplication(self): + assert calculator("6 * 7") == "42" + + @pytest.mark.unit + def test_math_function(self): + assert calculator("math.sqrt(9)") == "3.0" + + @pytest.mark.unit + def test_exponent(self): + assert calculator("2**10") == "1024" + + @pytest.mark.unit + def test_error_on_syntax(self): + result = calculator("2 +") + assert "Error" in result + + @pytest.mark.unit + def test_error_on_empty(self): + result = calculator("") + assert "Error" in result + + @pytest.mark.unit + def test_error_on_division_by_zero(self): + result = calculator("1 / 0") + assert "Error" in result + + @pytest.mark.unit + def test_error_message_contains_expression(self): + result = calculator("bad expr!!!") + assert "bad expr!!!" in result + + @pytest.mark.unit + def test_injection_import(self): + result = calculator("__import__('os').system('echo hi')") + assert "Error" in result + + @pytest.mark.unit + def test_injection_builtins(self): + result = calculator("__builtins__") + assert "Error" in result + + @pytest.mark.unit + def test_string_literal_rejected(self): + result = calculator("'hello'") + assert "Error" in result + + +# ── consult_grok ────────────────────────────────────────────────────────────── + + +class TestConsultGrok: + @pytest.mark.unit + def test_grok_not_available(self): + with patch("timmy.backends.grok_available", return_value=False): + result = consult_grok("test query") + assert "not available" in result.lower() + + @pytest.mark.unit + def test_grok_free_mode(self): + mock_backend = MagicMock() + mock_backend.run.return_value = MagicMock(content="Answer text") + mock_settings = MagicMock() + mock_settings.grok_free = True + + with patch("timmy.backends.grok_available", return_value=True), \ + patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ + patch("config.settings", mock_settings): + result = consult_grok("What is 2+2?") + + assert result == "Answer text" + mock_backend.run.assert_called_once_with("What is 2+2?") + + @pytest.mark.unit + def test_grok_spark_logging_failure_is_silent(self): + """Spark logging failure should not crash consult_grok.""" + mock_backend = MagicMock() + mock_backend.run.return_value = MagicMock(content="ok") + mock_settings = MagicMock() + mock_settings.grok_free = True + + with patch("timmy.backends.grok_available", return_value=True), \ + patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ + patch("config.settings", mock_settings), \ + patch.dict("sys.modules", {"spark.engine": None}): + result = consult_grok("hello") + + assert result == "ok" + + @pytest.mark.unit + def test_grok_paid_mode_lightning_failure(self): + """When Lightning invoice creation fails, return an error message.""" + mock_backend = MagicMock() + mock_settings = MagicMock() + mock_settings.grok_free = False + mock_settings.grok_max_sats_per_query = 10 + mock_settings.grok_sats_hard_cap = 100 + + mock_lightning = MagicMock() + mock_ln_backend = MagicMock() + mock_ln_backend.create_invoice.side_effect = OSError("LN down") + mock_lightning.get_backend.return_value = mock_ln_backend + + with patch("timmy.backends.grok_available", return_value=True), \ + patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ + patch("config.settings", mock_settings), \ + patch.dict("sys.modules", {"lightning.factory": mock_lightning}): + result = consult_grok("expensive query") + + assert "Error" in result + + +# ── web_fetch ───────────────────────────────────────────────────────────────── + + +class TestWebFetch: + @pytest.mark.unit + def test_invalid_scheme_ftp(self): + result = web_fetch("ftp://example.com") + assert "Error: invalid URL" in result + + @pytest.mark.unit + def test_empty_url(self): + result = web_fetch("") + assert "Error: invalid URL" in result + + @pytest.mark.unit + def test_no_scheme(self): + result = web_fetch("example.com/page") + assert "Error: invalid URL" in result + + @pytest.mark.unit + def test_missing_requests_package(self): + with patch.dict("sys.modules", {"requests": None}): + result = web_fetch("https://example.com") + assert "requests" in result and "not installed" in result + + @pytest.mark.unit + def test_missing_trafilatura_package(self): + mock_requests = MagicMock() + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}): + result = web_fetch("https://example.com") + assert "trafilatura" in result and "not installed" in result + + @pytest.mark.unit + def test_extraction_returns_none(self): + mock_requests = MagicMock() + mock_trafilatura = MagicMock() + mock_resp = MagicMock() + mock_resp.text = "" + mock_requests.get.return_value = mock_resp + mock_requests.exceptions = _make_request_exceptions() + mock_trafilatura.extract.return_value = None + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com") + + assert "Error: could not extract" in result + + @pytest.mark.unit + def test_truncation_applied(self): + mock_requests = MagicMock() + mock_trafilatura = MagicMock() + long_text = "x" * 10000 + mock_resp = MagicMock() + mock_resp.text = "" + long_text + "" + mock_requests.get.return_value = mock_resp + mock_requests.exceptions = _make_request_exceptions() + mock_trafilatura.extract.return_value = long_text + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com", max_tokens=100) + + assert "[…truncated" in result + assert len(result) < 600 + + @pytest.mark.unit + def test_successful_fetch(self): + mock_requests = MagicMock() + mock_trafilatura = MagicMock() + mock_resp = MagicMock() + mock_resp.text = "

Hello

" + mock_requests.get.return_value = mock_resp + mock_requests.exceptions = _make_request_exceptions() + mock_trafilatura.extract.return_value = "Hello" + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com") + + assert result == "Hello" + + @pytest.mark.unit + def test_timeout_error(self): + exc_mod = _make_request_exceptions() + mock_requests = MagicMock() + mock_requests.exceptions = exc_mod + mock_requests.get.side_effect = exc_mod.Timeout("timed out") + mock_trafilatura = MagicMock() + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com") + + assert "timed out" in result + + @pytest.mark.unit + def test_http_error_404(self): + exc_mod = _make_request_exceptions() + mock_requests = MagicMock() + mock_requests.exceptions = exc_mod + mock_response = MagicMock() + mock_response.status_code = 404 + mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError( + response=mock_response + ) + mock_trafilatura = MagicMock() + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com/nope") + + assert "404" in result + + @pytest.mark.unit + def test_request_exception(self): + exc_mod = _make_request_exceptions() + mock_requests = MagicMock() + mock_requests.exceptions = exc_mod + mock_requests.get.side_effect = exc_mod.RequestException("connection refused") + mock_trafilatura = MagicMock() + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("https://example.com") + + assert "Error" in result + + @pytest.mark.unit + def test_http_url_accepted(self): + """http:// URLs should pass the scheme check.""" + mock_requests = MagicMock() + mock_trafilatura = MagicMock() + mock_resp = MagicMock() + mock_resp.text = "

content

" + mock_requests.get.return_value = mock_resp + mock_requests.exceptions = _make_request_exceptions() + mock_trafilatura.extract.return_value = "content" + + with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): + result = web_fetch("http://example.com") + + assert result == "content" + + +# ── create_aider_tool / AiderTool ───────────────────────────────────────────── + + +class TestAiderTool: + @pytest.mark.unit + def test_factory_returns_tool(self, tmp_path): + tool = create_aider_tool(tmp_path) + assert hasattr(tool, "run_aider") + + @pytest.mark.unit + def test_base_dir_set(self, tmp_path): + tool = create_aider_tool(tmp_path) + assert tool.base_dir == tmp_path + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_success(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="code generated") + tool = create_aider_tool(tmp_path) + result = tool.run_aider("add a function") + assert result == "code generated" + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_success_empty_stdout(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="") + tool = create_aider_tool(tmp_path) + result = tool.run_aider("do something") + assert "successfully" in result.lower() + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_failure(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=1, stderr="fatal error") + tool = create_aider_tool(tmp_path) + result = tool.run_aider("bad prompt") + assert "error" in result.lower() + assert "fatal error" in result + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_not_installed(self, mock_run, tmp_path): + mock_run.side_effect = FileNotFoundError + tool = create_aider_tool(tmp_path) + result = tool.run_aider("task") + assert "not installed" in result.lower() + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_timeout(self, mock_run, tmp_path): + mock_run.side_effect = subprocess.TimeoutExpired(cmd="aider", timeout=120) + tool = create_aider_tool(tmp_path) + result = tool.run_aider("long task") + assert "timed out" in result.lower() + + @pytest.mark.unit + @patch("subprocess.run") + def test_run_aider_os_error(self, mock_run, tmp_path): + mock_run.side_effect = OSError("permission denied") + tool = create_aider_tool(tmp_path) + result = tool.run_aider("task") + assert "error" in result.lower() + + @pytest.mark.unit + @patch("subprocess.run") + def test_custom_model_passed_to_subprocess(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="ok") + tool = create_aider_tool(tmp_path) + tool.run_aider("task", model="mistral:7b") + call_args = mock_run.call_args[0][0] + assert "ollama/mistral:7b" in call_args + + @pytest.mark.unit + @patch("subprocess.run") + def test_default_model_is_passed(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="ok") + tool = create_aider_tool(tmp_path) + tool.run_aider("task") + call_args = mock_run.call_args[0][0] + assert "--model" in call_args + + @pytest.mark.unit + @patch("subprocess.run") + def test_no_git_flag_present(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="ok") + tool = create_aider_tool(tmp_path) + tool.run_aider("task") + call_args = mock_run.call_args[0][0] + assert "--no-git" in call_args + + @pytest.mark.unit + @patch("subprocess.run") + def test_cwd_is_base_dir(self, mock_run, tmp_path): + mock_run.return_value = MagicMock(returncode=0, stdout="ok") + tool = create_aider_tool(tmp_path) + tool.run_aider("task") + assert mock_run.call_args[1]["cwd"] == str(tmp_path) + + +# ── create_code_tools / create_security_tools / create_devops_tools ─────────── + + +class TestToolkitFactories: + @pytest.mark.unit + def test_create_code_tools_requires_agno(self): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno is available — ImportError path not testable") + from timmy.tools.system_tools import create_code_tools + + with pytest.raises(ImportError): + create_code_tools() + + @pytest.mark.unit + def test_create_security_tools_requires_agno(self): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno is available — ImportError path not testable") + from timmy.tools.system_tools import create_security_tools + + with pytest.raises(ImportError): + create_security_tools() + + @pytest.mark.unit + def test_create_devops_tools_requires_agno(self): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno is available — ImportError path not testable") + from timmy.tools.system_tools import create_devops_tools + + with pytest.raises(ImportError): + create_devops_tools() + + @pytest.mark.unit + def test_create_code_tools_with_agno(self, tmp_path): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if not _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno not available") + from timmy.tools.system_tools import create_code_tools + + mock_settings = MagicMock() + mock_settings.repo_root = str(tmp_path) + with patch("config.settings", mock_settings): + toolkit = create_code_tools(base_dir=tmp_path) + assert toolkit is not None + assert toolkit.name == "code" + + @pytest.mark.unit + def test_create_security_tools_with_agno(self, tmp_path): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if not _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno not available") + from timmy.tools.system_tools import create_security_tools + + mock_settings = MagicMock() + mock_settings.repo_root = str(tmp_path) + with patch("config.settings", mock_settings): + toolkit = create_security_tools(base_dir=tmp_path) + assert toolkit is not None + assert toolkit.name == "security" + + @pytest.mark.unit + def test_create_devops_tools_with_agno(self, tmp_path): + from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE + + if not _AGNO_TOOLS_AVAILABLE: + pytest.skip("Agno not available") + from timmy.tools.system_tools import create_devops_tools + + mock_settings = MagicMock() + mock_settings.repo_root = str(tmp_path) + with patch("config.settings", mock_settings): + toolkit = create_devops_tools(base_dir=tmp_path) + assert toolkit is not None + assert toolkit.name == "devops" + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + + +def _make_request_exceptions(): + """Create a mock requests.exceptions module with real exception classes.""" + + class Timeout(Exception): + pass + + class HTTPError(Exception): + def __init__(self, *args, response=None, **kwargs): + super().__init__(*args, **kwargs) + self.response = response + + class RequestException(Exception): + pass + + mod = MagicMock() + mod.Timeout = Timeout + mod.HTTPError = HTTPError + mod.RequestException = RequestException + return mod