"""Unit tests for timmy/tools/system_tools.py. Covers: _safe_eval, calculator, consult_grok, web_fetch, create_aider_tool (AiderTool), create_code_tools, create_security_tools, create_devops_tools. """ from __future__ import annotations import ast import math import subprocess from pathlib import Path from unittest.mock import MagicMock, patch import pytest from timmy.tools.system_tools import ( _safe_eval, calculator, consult_grok, create_aider_tool, web_fetch, ) pytestmark = pytest.mark.unit # ── _safe_eval ──────────────────────────────────────────────────────────────── def _parse_eval(expr: str): allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")} allowed["math"] = math allowed["abs"] = abs allowed["round"] = round allowed["min"] = min allowed["max"] = max tree = ast.parse(expr, mode="eval") return _safe_eval(tree, allowed) class TestSafeEval: @pytest.mark.unit def test_integer_constant(self): assert _parse_eval("42") == 42 @pytest.mark.unit def test_float_constant(self): assert _parse_eval("3.14") == pytest.approx(3.14) @pytest.mark.unit def test_addition(self): assert _parse_eval("1 + 2") == 3 @pytest.mark.unit def test_subtraction(self): assert _parse_eval("10 - 4") == 6 @pytest.mark.unit def test_multiplication(self): assert _parse_eval("3 * 7") == 21 @pytest.mark.unit def test_division(self): assert _parse_eval("10 / 4") == 2.5 @pytest.mark.unit def test_floor_division(self): assert _parse_eval("10 // 3") == 3 @pytest.mark.unit def test_modulo(self): assert _parse_eval("10 % 3") == 1 @pytest.mark.unit def test_power(self): assert _parse_eval("2 ** 8") == 256 @pytest.mark.unit def test_unary_minus(self): assert _parse_eval("-5") == -5 @pytest.mark.unit def test_unary_plus(self): assert _parse_eval("+5") == 5 @pytest.mark.unit def test_math_attribute(self): assert _parse_eval("math.pi") == pytest.approx(math.pi) @pytest.mark.unit def test_math_function_call(self): assert _parse_eval("math.sqrt(16)") == pytest.approx(4.0) @pytest.mark.unit def test_allowed_name_abs(self): assert _parse_eval("abs(-10)") == 10 @pytest.mark.unit def test_allowed_name_round(self): assert _parse_eval("round(3.7)") == 4 @pytest.mark.unit def test_allowed_name_min(self): assert _parse_eval("min(5, 2, 8)") == 2 @pytest.mark.unit def test_allowed_name_max(self): assert _parse_eval("max(5, 2, 8)") == 8 @pytest.mark.unit def test_string_constant_rejected(self): with pytest.raises(ValueError, match="Unsupported constant"): _parse_eval("'hello'") @pytest.mark.unit def test_unknown_name_rejected(self): with pytest.raises(ValueError, match="Unknown name"): _parse_eval("xyz") @pytest.mark.unit def test_unsupported_binary_op(self): with pytest.raises(ValueError, match="Unsupported"): _parse_eval("3 & 5") @pytest.mark.unit def test_unsupported_unary_op(self): with pytest.raises(ValueError, match="Unsupported"): _parse_eval("~5") @pytest.mark.unit def test_attribute_on_non_math_rejected(self): with pytest.raises(ValueError, match="Attribute access not allowed"): _parse_eval("abs.__class__") @pytest.mark.unit def test_invalid_math_attr_rejected(self): with pytest.raises(ValueError, match="Attribute access not allowed"): _parse_eval("math.__builtins__") @pytest.mark.unit def test_unsupported_syntax_subscript(self): with pytest.raises(ValueError, match="Unsupported syntax"): _parse_eval("[1, 2][0]") @pytest.mark.unit def test_expression_wrapper(self): """ast.Expression node is unwrapped correctly.""" allowed = {"abs": abs} tree = ast.parse("abs(-1)", mode="eval") assert isinstance(tree, ast.Expression) assert _safe_eval(tree, allowed) == 1 # ── calculator ──────────────────────────────────────────────────────────────── class TestCalculator: @pytest.mark.unit def test_basic_addition(self): assert calculator("2 + 3") == "5" @pytest.mark.unit def test_multiplication(self): assert calculator("6 * 7") == "42" @pytest.mark.unit def test_math_function(self): assert calculator("math.sqrt(9)") == "3.0" @pytest.mark.unit def test_exponent(self): assert calculator("2**10") == "1024" @pytest.mark.unit def test_error_on_syntax(self): result = calculator("2 +") assert "Error" in result @pytest.mark.unit def test_error_on_empty(self): result = calculator("") assert "Error" in result @pytest.mark.unit def test_error_on_division_by_zero(self): result = calculator("1 / 0") assert "Error" in result @pytest.mark.unit def test_error_message_contains_expression(self): result = calculator("bad expr!!!") assert "bad expr!!!" in result @pytest.mark.unit def test_injection_import(self): result = calculator("__import__('os').system('echo hi')") assert "Error" in result @pytest.mark.unit def test_injection_builtins(self): result = calculator("__builtins__") assert "Error" in result @pytest.mark.unit def test_string_literal_rejected(self): result = calculator("'hello'") assert "Error" in result # ── consult_grok ────────────────────────────────────────────────────────────── class TestConsultGrok: @pytest.mark.unit def test_grok_not_available(self): with patch("timmy.backends.grok_available", return_value=False): result = consult_grok("test query") assert "not available" in result.lower() @pytest.mark.unit def test_grok_free_mode(self): mock_backend = MagicMock() mock_backend.run.return_value = MagicMock(content="Answer text") mock_settings = MagicMock() mock_settings.grok_free = True with patch("timmy.backends.grok_available", return_value=True), \ patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ patch("config.settings", mock_settings): result = consult_grok("What is 2+2?") assert result == "Answer text" mock_backend.run.assert_called_once_with("What is 2+2?") @pytest.mark.unit def test_grok_spark_logging_failure_is_silent(self): """Spark logging failure should not crash consult_grok.""" mock_backend = MagicMock() mock_backend.run.return_value = MagicMock(content="ok") mock_settings = MagicMock() mock_settings.grok_free = True with patch("timmy.backends.grok_available", return_value=True), \ patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ patch("config.settings", mock_settings), \ patch.dict("sys.modules", {"spark.engine": None}): result = consult_grok("hello") assert result == "ok" @pytest.mark.unit def test_grok_paid_mode_lightning_failure(self): """When Lightning invoice creation fails, return an error message.""" mock_backend = MagicMock() mock_settings = MagicMock() mock_settings.grok_free = False mock_settings.grok_max_sats_per_query = 10 mock_settings.grok_sats_hard_cap = 100 mock_lightning = MagicMock() mock_ln_backend = MagicMock() mock_ln_backend.create_invoice.side_effect = OSError("LN down") mock_lightning.get_backend.return_value = mock_ln_backend with patch("timmy.backends.grok_available", return_value=True), \ patch("timmy.backends.get_grok_backend", return_value=mock_backend), \ patch("config.settings", mock_settings), \ patch.dict("sys.modules", {"lightning.factory": mock_lightning}): result = consult_grok("expensive query") assert "Error" in result # ── web_fetch ───────────────────────────────────────────────────────────────── class TestWebFetch: @pytest.mark.unit def test_invalid_scheme_ftp(self): result = web_fetch("ftp://example.com") assert "Error: invalid URL" in result @pytest.mark.unit def test_empty_url(self): result = web_fetch("") assert "Error: invalid URL" in result @pytest.mark.unit def test_no_scheme(self): result = web_fetch("example.com/page") assert "Error: invalid URL" in result @pytest.mark.unit def test_missing_requests_package(self): with patch.dict("sys.modules", {"requests": None}): result = web_fetch("https://example.com") assert "requests" in result and "not installed" in result @pytest.mark.unit def test_missing_trafilatura_package(self): mock_requests = MagicMock() with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}): result = web_fetch("https://example.com") assert "trafilatura" in result and "not installed" in result @pytest.mark.unit def test_extraction_returns_none(self): mock_requests = MagicMock() mock_trafilatura = MagicMock() mock_resp = MagicMock() mock_resp.text = "" mock_requests.get.return_value = mock_resp mock_requests.exceptions = _make_request_exceptions() mock_trafilatura.extract.return_value = None with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com") assert "Error: could not extract" in result @pytest.mark.unit def test_truncation_applied(self): mock_requests = MagicMock() mock_trafilatura = MagicMock() long_text = "x" * 10000 mock_resp = MagicMock() mock_resp.text = "" + long_text + "" mock_requests.get.return_value = mock_resp mock_requests.exceptions = _make_request_exceptions() mock_trafilatura.extract.return_value = long_text with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com", max_tokens=100) assert "[…truncated" in result assert len(result) < 600 @pytest.mark.unit def test_successful_fetch(self): mock_requests = MagicMock() mock_trafilatura = MagicMock() mock_resp = MagicMock() mock_resp.text = "

Hello

" mock_requests.get.return_value = mock_resp mock_requests.exceptions = _make_request_exceptions() mock_trafilatura.extract.return_value = "Hello" with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com") assert result == "Hello" @pytest.mark.unit def test_timeout_error(self): exc_mod = _make_request_exceptions() mock_requests = MagicMock() mock_requests.exceptions = exc_mod mock_requests.get.side_effect = exc_mod.Timeout("timed out") mock_trafilatura = MagicMock() with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com") assert "timed out" in result @pytest.mark.unit def test_http_error_404(self): exc_mod = _make_request_exceptions() mock_requests = MagicMock() mock_requests.exceptions = exc_mod mock_response = MagicMock() mock_response.status_code = 404 mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError( response=mock_response ) mock_trafilatura = MagicMock() with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com/nope") assert "404" in result @pytest.mark.unit def test_request_exception(self): exc_mod = _make_request_exceptions() mock_requests = MagicMock() mock_requests.exceptions = exc_mod mock_requests.get.side_effect = exc_mod.RequestException("connection refused") mock_trafilatura = MagicMock() with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("https://example.com") assert "Error" in result @pytest.mark.unit def test_http_url_accepted(self): """http:// URLs should pass the scheme check.""" mock_requests = MagicMock() mock_trafilatura = MagicMock() mock_resp = MagicMock() mock_resp.text = "

content

" mock_requests.get.return_value = mock_resp mock_requests.exceptions = _make_request_exceptions() mock_trafilatura.extract.return_value = "content" with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}): result = web_fetch("http://example.com") assert result == "content" # ── create_aider_tool / AiderTool ───────────────────────────────────────────── class TestAiderTool: @pytest.mark.unit def test_factory_returns_tool(self, tmp_path): tool = create_aider_tool(tmp_path) assert hasattr(tool, "run_aider") @pytest.mark.unit def test_base_dir_set(self, tmp_path): tool = create_aider_tool(tmp_path) assert tool.base_dir == tmp_path @pytest.mark.unit @patch("subprocess.run") def test_run_aider_success(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="code generated") tool = create_aider_tool(tmp_path) result = tool.run_aider("add a function") assert result == "code generated" @pytest.mark.unit @patch("subprocess.run") def test_run_aider_success_empty_stdout(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="") tool = create_aider_tool(tmp_path) result = tool.run_aider("do something") assert "successfully" in result.lower() @pytest.mark.unit @patch("subprocess.run") def test_run_aider_failure(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=1, stderr="fatal error") tool = create_aider_tool(tmp_path) result = tool.run_aider("bad prompt") assert "error" in result.lower() assert "fatal error" in result @pytest.mark.unit @patch("subprocess.run") def test_run_aider_not_installed(self, mock_run, tmp_path): mock_run.side_effect = FileNotFoundError tool = create_aider_tool(tmp_path) result = tool.run_aider("task") assert "not installed" in result.lower() @pytest.mark.unit @patch("subprocess.run") def test_run_aider_timeout(self, mock_run, tmp_path): mock_run.side_effect = subprocess.TimeoutExpired(cmd="aider", timeout=120) tool = create_aider_tool(tmp_path) result = tool.run_aider("long task") assert "timed out" in result.lower() @pytest.mark.unit @patch("subprocess.run") def test_run_aider_os_error(self, mock_run, tmp_path): mock_run.side_effect = OSError("permission denied") tool = create_aider_tool(tmp_path) result = tool.run_aider("task") assert "error" in result.lower() @pytest.mark.unit @patch("subprocess.run") def test_custom_model_passed_to_subprocess(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="ok") tool = create_aider_tool(tmp_path) tool.run_aider("task", model="mistral:7b") call_args = mock_run.call_args[0][0] assert "ollama/mistral:7b" in call_args @pytest.mark.unit @patch("subprocess.run") def test_default_model_is_passed(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="ok") tool = create_aider_tool(tmp_path) tool.run_aider("task") call_args = mock_run.call_args[0][0] assert "--model" in call_args @pytest.mark.unit @patch("subprocess.run") def test_no_git_flag_present(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="ok") tool = create_aider_tool(tmp_path) tool.run_aider("task") call_args = mock_run.call_args[0][0] assert "--no-git" in call_args @pytest.mark.unit @patch("subprocess.run") def test_cwd_is_base_dir(self, mock_run, tmp_path): mock_run.return_value = MagicMock(returncode=0, stdout="ok") tool = create_aider_tool(tmp_path) tool.run_aider("task") assert mock_run.call_args[1]["cwd"] == str(tmp_path) # ── create_code_tools / create_security_tools / create_devops_tools ─────────── class TestToolkitFactories: @pytest.mark.unit def test_create_code_tools_requires_agno(self): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno is available — ImportError path not testable") from timmy.tools.system_tools import create_code_tools with pytest.raises(ImportError): create_code_tools() @pytest.mark.unit def test_create_security_tools_requires_agno(self): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno is available — ImportError path not testable") from timmy.tools.system_tools import create_security_tools with pytest.raises(ImportError): create_security_tools() @pytest.mark.unit def test_create_devops_tools_requires_agno(self): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno is available — ImportError path not testable") from timmy.tools.system_tools import create_devops_tools with pytest.raises(ImportError): create_devops_tools() @pytest.mark.unit def test_create_code_tools_with_agno(self, tmp_path): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if not _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno not available") from timmy.tools.system_tools import create_code_tools mock_settings = MagicMock() mock_settings.repo_root = str(tmp_path) with patch("config.settings", mock_settings): toolkit = create_code_tools(base_dir=tmp_path) assert toolkit is not None assert toolkit.name == "code" @pytest.mark.unit def test_create_security_tools_with_agno(self, tmp_path): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if not _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno not available") from timmy.tools.system_tools import create_security_tools mock_settings = MagicMock() mock_settings.repo_root = str(tmp_path) with patch("config.settings", mock_settings): toolkit = create_security_tools(base_dir=tmp_path) assert toolkit is not None assert toolkit.name == "security" @pytest.mark.unit def test_create_devops_tools_with_agno(self, tmp_path): from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE if not _AGNO_TOOLS_AVAILABLE: pytest.skip("Agno not available") from timmy.tools.system_tools import create_devops_tools mock_settings = MagicMock() mock_settings.repo_root = str(tmp_path) with patch("config.settings", mock_settings): toolkit = create_devops_tools(base_dir=tmp_path) assert toolkit is not None assert toolkit.name == "devops" # ── Helpers ─────────────────────────────────────────────────────────────────── def _make_request_exceptions(): """Create a mock requests.exceptions module with real exception classes.""" class Timeout(Exception): pass class HTTPError(Exception): def __init__(self, *args, response=None, **kwargs): super().__init__(*args, **kwargs) self.response = response class RequestException(Exception): pass mod = MagicMock() mod.Timeout = Timeout mod.HTTPError = HTTPError mod.RequestException = RequestException return mod