forked from Rockachopa/Timmy-time-dashboard
617 lines
21 KiB
Python
617 lines
21 KiB
Python
"""Unit tests for timmy/tools/system_tools.py.
|
|
|
|
Covers: _safe_eval, calculator, consult_grok, web_fetch,
|
|
create_aider_tool (AiderTool), create_code_tools,
|
|
create_security_tools, create_devops_tools.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import math
|
|
import subprocess
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from timmy.tools.system_tools import (
|
|
_safe_eval,
|
|
calculator,
|
|
consult_grok,
|
|
create_aider_tool,
|
|
web_fetch,
|
|
)
|
|
|
|
pytestmark = pytest.mark.unit
|
|
|
|
# ── _safe_eval ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _parse_eval(expr: str):
|
|
allowed = {k: getattr(math, k) for k in dir(math) if not k.startswith("_")}
|
|
allowed["math"] = math
|
|
allowed["abs"] = abs
|
|
allowed["round"] = round
|
|
allowed["min"] = min
|
|
allowed["max"] = max
|
|
tree = ast.parse(expr, mode="eval")
|
|
return _safe_eval(tree, allowed)
|
|
|
|
|
|
class TestSafeEval:
|
|
@pytest.mark.unit
|
|
def test_integer_constant(self):
|
|
assert _parse_eval("42") == 42
|
|
|
|
@pytest.mark.unit
|
|
def test_float_constant(self):
|
|
assert _parse_eval("3.14") == pytest.approx(3.14)
|
|
|
|
@pytest.mark.unit
|
|
def test_addition(self):
|
|
assert _parse_eval("1 + 2") == 3
|
|
|
|
@pytest.mark.unit
|
|
def test_subtraction(self):
|
|
assert _parse_eval("10 - 4") == 6
|
|
|
|
@pytest.mark.unit
|
|
def test_multiplication(self):
|
|
assert _parse_eval("3 * 7") == 21
|
|
|
|
@pytest.mark.unit
|
|
def test_division(self):
|
|
assert _parse_eval("10 / 4") == 2.5
|
|
|
|
@pytest.mark.unit
|
|
def test_floor_division(self):
|
|
assert _parse_eval("10 // 3") == 3
|
|
|
|
@pytest.mark.unit
|
|
def test_modulo(self):
|
|
assert _parse_eval("10 % 3") == 1
|
|
|
|
@pytest.mark.unit
|
|
def test_power(self):
|
|
assert _parse_eval("2 ** 8") == 256
|
|
|
|
@pytest.mark.unit
|
|
def test_unary_minus(self):
|
|
assert _parse_eval("-5") == -5
|
|
|
|
@pytest.mark.unit
|
|
def test_unary_plus(self):
|
|
assert _parse_eval("+5") == 5
|
|
|
|
@pytest.mark.unit
|
|
def test_math_attribute(self):
|
|
assert _parse_eval("math.pi") == pytest.approx(math.pi)
|
|
|
|
@pytest.mark.unit
|
|
def test_math_function_call(self):
|
|
assert _parse_eval("math.sqrt(16)") == pytest.approx(4.0)
|
|
|
|
@pytest.mark.unit
|
|
def test_allowed_name_abs(self):
|
|
assert _parse_eval("abs(-10)") == 10
|
|
|
|
@pytest.mark.unit
|
|
def test_allowed_name_round(self):
|
|
assert _parse_eval("round(3.7)") == 4
|
|
|
|
@pytest.mark.unit
|
|
def test_allowed_name_min(self):
|
|
assert _parse_eval("min(5, 2, 8)") == 2
|
|
|
|
@pytest.mark.unit
|
|
def test_allowed_name_max(self):
|
|
assert _parse_eval("max(5, 2, 8)") == 8
|
|
|
|
@pytest.mark.unit
|
|
def test_string_constant_rejected(self):
|
|
with pytest.raises(ValueError, match="Unsupported constant"):
|
|
_parse_eval("'hello'")
|
|
|
|
@pytest.mark.unit
|
|
def test_unknown_name_rejected(self):
|
|
with pytest.raises(ValueError, match="Unknown name"):
|
|
_parse_eval("xyz")
|
|
|
|
@pytest.mark.unit
|
|
def test_unsupported_binary_op(self):
|
|
with pytest.raises(ValueError, match="Unsupported"):
|
|
_parse_eval("3 & 5")
|
|
|
|
@pytest.mark.unit
|
|
def test_unsupported_unary_op(self):
|
|
with pytest.raises(ValueError, match="Unsupported"):
|
|
_parse_eval("~5")
|
|
|
|
@pytest.mark.unit
|
|
def test_attribute_on_non_math_rejected(self):
|
|
with pytest.raises(ValueError, match="Attribute access not allowed"):
|
|
_parse_eval("abs.__class__")
|
|
|
|
@pytest.mark.unit
|
|
def test_invalid_math_attr_rejected(self):
|
|
with pytest.raises(ValueError, match="Attribute access not allowed"):
|
|
_parse_eval("math.__builtins__")
|
|
|
|
@pytest.mark.unit
|
|
def test_unsupported_syntax_subscript(self):
|
|
with pytest.raises(ValueError, match="Unsupported syntax"):
|
|
_parse_eval("[1, 2][0]")
|
|
|
|
@pytest.mark.unit
|
|
def test_expression_wrapper(self):
|
|
"""ast.Expression node is unwrapped correctly."""
|
|
allowed = {"abs": abs}
|
|
tree = ast.parse("abs(-1)", mode="eval")
|
|
assert isinstance(tree, ast.Expression)
|
|
assert _safe_eval(tree, allowed) == 1
|
|
|
|
|
|
# ── calculator ────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestCalculator:
|
|
@pytest.mark.unit
|
|
def test_basic_addition(self):
|
|
assert calculator("2 + 3") == "5"
|
|
|
|
@pytest.mark.unit
|
|
def test_multiplication(self):
|
|
assert calculator("6 * 7") == "42"
|
|
|
|
@pytest.mark.unit
|
|
def test_math_function(self):
|
|
assert calculator("math.sqrt(9)") == "3.0"
|
|
|
|
@pytest.mark.unit
|
|
def test_exponent(self):
|
|
assert calculator("2**10") == "1024"
|
|
|
|
@pytest.mark.unit
|
|
def test_error_on_syntax(self):
|
|
result = calculator("2 +")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_error_on_empty(self):
|
|
result = calculator("")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_error_on_division_by_zero(self):
|
|
result = calculator("1 / 0")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_error_message_contains_expression(self):
|
|
result = calculator("bad expr!!!")
|
|
assert "bad expr!!!" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_injection_import(self):
|
|
result = calculator("__import__('os').system('echo hi')")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_injection_builtins(self):
|
|
result = calculator("__builtins__")
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_string_literal_rejected(self):
|
|
result = calculator("'hello'")
|
|
assert "Error" in result
|
|
|
|
|
|
# ── consult_grok ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestConsultGrok:
|
|
@pytest.mark.unit
|
|
def test_grok_not_available(self):
|
|
with patch("timmy.backends.grok_available", return_value=False):
|
|
result = consult_grok("test query")
|
|
assert "not available" in result.lower()
|
|
|
|
@pytest.mark.unit
|
|
def test_grok_free_mode(self):
|
|
mock_backend = MagicMock()
|
|
mock_backend.run.return_value = MagicMock(content="Answer text")
|
|
mock_settings = MagicMock()
|
|
mock_settings.grok_free = True
|
|
|
|
with patch("timmy.backends.grok_available", return_value=True), \
|
|
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
|
|
patch("config.settings", mock_settings):
|
|
result = consult_grok("What is 2+2?")
|
|
|
|
assert result == "Answer text"
|
|
mock_backend.run.assert_called_once_with("What is 2+2?")
|
|
|
|
@pytest.mark.unit
|
|
def test_grok_spark_logging_failure_is_silent(self):
|
|
"""Spark logging failure should not crash consult_grok."""
|
|
mock_backend = MagicMock()
|
|
mock_backend.run.return_value = MagicMock(content="ok")
|
|
mock_settings = MagicMock()
|
|
mock_settings.grok_free = True
|
|
|
|
with patch("timmy.backends.grok_available", return_value=True), \
|
|
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
|
|
patch("config.settings", mock_settings), \
|
|
patch.dict("sys.modules", {"spark.engine": None}):
|
|
result = consult_grok("hello")
|
|
|
|
assert result == "ok"
|
|
|
|
@pytest.mark.unit
|
|
def test_grok_paid_mode_lightning_failure(self):
|
|
"""When Lightning invoice creation fails, return an error message."""
|
|
mock_backend = MagicMock()
|
|
mock_settings = MagicMock()
|
|
mock_settings.grok_free = False
|
|
mock_settings.grok_max_sats_per_query = 10
|
|
mock_settings.grok_sats_hard_cap = 100
|
|
|
|
mock_lightning = MagicMock()
|
|
mock_ln_backend = MagicMock()
|
|
mock_ln_backend.create_invoice.side_effect = OSError("LN down")
|
|
mock_lightning.get_backend.return_value = mock_ln_backend
|
|
|
|
with patch("timmy.backends.grok_available", return_value=True), \
|
|
patch("timmy.backends.get_grok_backend", return_value=mock_backend), \
|
|
patch("config.settings", mock_settings), \
|
|
patch.dict("sys.modules", {"lightning.factory": mock_lightning}):
|
|
result = consult_grok("expensive query")
|
|
|
|
assert "Error" in result
|
|
|
|
|
|
# ── web_fetch ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestWebFetch:
|
|
@pytest.mark.unit
|
|
def test_invalid_scheme_ftp(self):
|
|
result = web_fetch("ftp://example.com")
|
|
assert "Error: invalid URL" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_empty_url(self):
|
|
result = web_fetch("")
|
|
assert "Error: invalid URL" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_no_scheme(self):
|
|
result = web_fetch("example.com/page")
|
|
assert "Error: invalid URL" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_missing_requests_package(self):
|
|
with patch.dict("sys.modules", {"requests": None}):
|
|
result = web_fetch("https://example.com")
|
|
assert "requests" in result and "not installed" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_missing_trafilatura_package(self):
|
|
mock_requests = MagicMock()
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": None}):
|
|
result = web_fetch("https://example.com")
|
|
assert "trafilatura" in result and "not installed" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_extraction_returns_none(self):
|
|
mock_requests = MagicMock()
|
|
mock_trafilatura = MagicMock()
|
|
mock_resp = MagicMock()
|
|
mock_resp.text = "<html></html>"
|
|
mock_requests.get.return_value = mock_resp
|
|
mock_requests.exceptions = _make_request_exceptions()
|
|
mock_trafilatura.extract.return_value = None
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com")
|
|
|
|
assert "Error: could not extract" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_truncation_applied(self):
|
|
mock_requests = MagicMock()
|
|
mock_trafilatura = MagicMock()
|
|
long_text = "x" * 10000
|
|
mock_resp = MagicMock()
|
|
mock_resp.text = "<html><body>" + long_text + "</body></html>"
|
|
mock_requests.get.return_value = mock_resp
|
|
mock_requests.exceptions = _make_request_exceptions()
|
|
mock_trafilatura.extract.return_value = long_text
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com", max_tokens=100)
|
|
|
|
assert "[…truncated" in result
|
|
assert len(result) < 600
|
|
|
|
@pytest.mark.unit
|
|
def test_successful_fetch(self):
|
|
mock_requests = MagicMock()
|
|
mock_trafilatura = MagicMock()
|
|
mock_resp = MagicMock()
|
|
mock_resp.text = "<html><body><p>Hello</p></body></html>"
|
|
mock_requests.get.return_value = mock_resp
|
|
mock_requests.exceptions = _make_request_exceptions()
|
|
mock_trafilatura.extract.return_value = "Hello"
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com")
|
|
|
|
assert result == "Hello"
|
|
|
|
@pytest.mark.unit
|
|
def test_timeout_error(self):
|
|
exc_mod = _make_request_exceptions()
|
|
mock_requests = MagicMock()
|
|
mock_requests.exceptions = exc_mod
|
|
mock_requests.get.side_effect = exc_mod.Timeout("timed out")
|
|
mock_trafilatura = MagicMock()
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com")
|
|
|
|
assert "timed out" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_http_error_404(self):
|
|
exc_mod = _make_request_exceptions()
|
|
mock_requests = MagicMock()
|
|
mock_requests.exceptions = exc_mod
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 404
|
|
mock_requests.get.return_value.raise_for_status.side_effect = exc_mod.HTTPError(
|
|
response=mock_response
|
|
)
|
|
mock_trafilatura = MagicMock()
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com/nope")
|
|
|
|
assert "404" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_request_exception(self):
|
|
exc_mod = _make_request_exceptions()
|
|
mock_requests = MagicMock()
|
|
mock_requests.exceptions = exc_mod
|
|
mock_requests.get.side_effect = exc_mod.RequestException("connection refused")
|
|
mock_trafilatura = MagicMock()
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("https://example.com")
|
|
|
|
assert "Error" in result
|
|
|
|
@pytest.mark.unit
|
|
def test_http_url_accepted(self):
|
|
"""http:// URLs should pass the scheme check."""
|
|
mock_requests = MagicMock()
|
|
mock_trafilatura = MagicMock()
|
|
mock_resp = MagicMock()
|
|
mock_resp.text = "<html><body><p>content</p></body></html>"
|
|
mock_requests.get.return_value = mock_resp
|
|
mock_requests.exceptions = _make_request_exceptions()
|
|
mock_trafilatura.extract.return_value = "content"
|
|
|
|
with patch.dict("sys.modules", {"requests": mock_requests, "trafilatura": mock_trafilatura}):
|
|
result = web_fetch("http://example.com")
|
|
|
|
assert result == "content"
|
|
|
|
|
|
# ── create_aider_tool / AiderTool ─────────────────────────────────────────────
|
|
|
|
|
|
class TestAiderTool:
|
|
@pytest.mark.unit
|
|
def test_factory_returns_tool(self, tmp_path):
|
|
tool = create_aider_tool(tmp_path)
|
|
assert hasattr(tool, "run_aider")
|
|
|
|
@pytest.mark.unit
|
|
def test_base_dir_set(self, tmp_path):
|
|
tool = create_aider_tool(tmp_path)
|
|
assert tool.base_dir == tmp_path
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_success(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="code generated")
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("add a function")
|
|
assert result == "code generated"
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_success_empty_stdout(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="")
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("do something")
|
|
assert "successfully" in result.lower()
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_failure(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=1, stderr="fatal error")
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("bad prompt")
|
|
assert "error" in result.lower()
|
|
assert "fatal error" in result
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_not_installed(self, mock_run, tmp_path):
|
|
mock_run.side_effect = FileNotFoundError
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("task")
|
|
assert "not installed" in result.lower()
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_timeout(self, mock_run, tmp_path):
|
|
mock_run.side_effect = subprocess.TimeoutExpired(cmd="aider", timeout=120)
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("long task")
|
|
assert "timed out" in result.lower()
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_run_aider_os_error(self, mock_run, tmp_path):
|
|
mock_run.side_effect = OSError("permission denied")
|
|
tool = create_aider_tool(tmp_path)
|
|
result = tool.run_aider("task")
|
|
assert "error" in result.lower()
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_custom_model_passed_to_subprocess(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
|
|
tool = create_aider_tool(tmp_path)
|
|
tool.run_aider("task", model="mistral:7b")
|
|
call_args = mock_run.call_args[0][0]
|
|
assert "ollama/mistral:7b" in call_args
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_default_model_is_passed(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
|
|
tool = create_aider_tool(tmp_path)
|
|
tool.run_aider("task")
|
|
call_args = mock_run.call_args[0][0]
|
|
assert "--model" in call_args
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_no_git_flag_present(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
|
|
tool = create_aider_tool(tmp_path)
|
|
tool.run_aider("task")
|
|
call_args = mock_run.call_args[0][0]
|
|
assert "--no-git" in call_args
|
|
|
|
@pytest.mark.unit
|
|
@patch("subprocess.run")
|
|
def test_cwd_is_base_dir(self, mock_run, tmp_path):
|
|
mock_run.return_value = MagicMock(returncode=0, stdout="ok")
|
|
tool = create_aider_tool(tmp_path)
|
|
tool.run_aider("task")
|
|
assert mock_run.call_args[1]["cwd"] == str(tmp_path)
|
|
|
|
|
|
# ── create_code_tools / create_security_tools / create_devops_tools ───────────
|
|
|
|
|
|
class TestToolkitFactories:
|
|
@pytest.mark.unit
|
|
def test_create_code_tools_requires_agno(self):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno is available — ImportError path not testable")
|
|
from timmy.tools.system_tools import create_code_tools
|
|
|
|
with pytest.raises(ImportError):
|
|
create_code_tools()
|
|
|
|
@pytest.mark.unit
|
|
def test_create_security_tools_requires_agno(self):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno is available — ImportError path not testable")
|
|
from timmy.tools.system_tools import create_security_tools
|
|
|
|
with pytest.raises(ImportError):
|
|
create_security_tools()
|
|
|
|
@pytest.mark.unit
|
|
def test_create_devops_tools_requires_agno(self):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno is available — ImportError path not testable")
|
|
from timmy.tools.system_tools import create_devops_tools
|
|
|
|
with pytest.raises(ImportError):
|
|
create_devops_tools()
|
|
|
|
@pytest.mark.unit
|
|
def test_create_code_tools_with_agno(self, tmp_path):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno not available")
|
|
from timmy.tools.system_tools import create_code_tools
|
|
|
|
mock_settings = MagicMock()
|
|
mock_settings.repo_root = str(tmp_path)
|
|
with patch("config.settings", mock_settings):
|
|
toolkit = create_code_tools(base_dir=tmp_path)
|
|
assert toolkit is not None
|
|
assert toolkit.name == "code"
|
|
|
|
@pytest.mark.unit
|
|
def test_create_security_tools_with_agno(self, tmp_path):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno not available")
|
|
from timmy.tools.system_tools import create_security_tools
|
|
|
|
mock_settings = MagicMock()
|
|
mock_settings.repo_root = str(tmp_path)
|
|
with patch("config.settings", mock_settings):
|
|
toolkit = create_security_tools(base_dir=tmp_path)
|
|
assert toolkit is not None
|
|
assert toolkit.name == "security"
|
|
|
|
@pytest.mark.unit
|
|
def test_create_devops_tools_with_agno(self, tmp_path):
|
|
from timmy.tools.system_tools import _AGNO_TOOLS_AVAILABLE
|
|
|
|
if not _AGNO_TOOLS_AVAILABLE:
|
|
pytest.skip("Agno not available")
|
|
from timmy.tools.system_tools import create_devops_tools
|
|
|
|
mock_settings = MagicMock()
|
|
mock_settings.repo_root = str(tmp_path)
|
|
with patch("config.settings", mock_settings):
|
|
toolkit = create_devops_tools(base_dir=tmp_path)
|
|
assert toolkit is not None
|
|
assert toolkit.name == "devops"
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _make_request_exceptions():
|
|
"""Create a mock requests.exceptions module with real exception classes."""
|
|
|
|
class Timeout(Exception):
|
|
pass
|
|
|
|
class HTTPError(Exception):
|
|
def __init__(self, *args, response=None, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.response = response
|
|
|
|
class RequestException(Exception):
|
|
pass
|
|
|
|
mod = MagicMock()
|
|
mod.Timeout = Timeout
|
|
mod.HTTPError = HTTPError
|
|
mod.RequestException = RequestException
|
|
return mod
|