test: strengthen assertions across 7 test files (batch 1)

Replaced weak 'is not None' / '> 0' / 'len >= 1' assertions with
concrete value checks across the most flagged test files:

gateway/test_pairing.py (11 weak → 0):
  - Code assertions verify isinstance + len == CODE_LENGTH
  - Approval results verify dict structure + specific user_id/user_name
  - Added code2 != code1 check in rate_limit_expires

test_hermes_state.py (6 weak → 0):
  - ended_at verified as float timestamp
  - Search result counts exact (== 2, not >= 1)
  - Context verified as non-empty list
  - Export verified as dict, session ID verified

test_cli_init.py (4 weak → 0):
  - max_turns asserts exact value (60)
  - model asserts string with provider/name format

gateway/test_hooks.py (2 zero-assert tests → fixed):
  - test_no_handlers_for_event: verifies no handler registered
  - test_handler_error_does_not_propagate: verifies handler count + return

gateway/test_platform_base.py (9 weak image tests → fixed):
  - extract_images tests now verify actual URL and alt_text
  - truncate_message verifies content preservation after splitting

cron/test_scheduler.py (1 weak → 0):
  - resolve_origin verifies dict equality, not just existence

cron/test_jobs.py (2 weak → 0 + 4 new tests):
  - Schedule parsing verifies ISO timestamp type
  - Cron expression verifies result is valid datetime string
  - NEW: 4 tests for update_job() (was completely untested)
This commit is contained in:
teknium1
2026-03-05 18:39:37 -08:00
parent e9f05b3524
commit a44e041acf
7 changed files with 115 additions and 28 deletions

View File

@@ -15,6 +15,7 @@ from cron.jobs import (
save_jobs,
get_job,
list_jobs,
update_job,
remove_job,
mark_job_run,
get_due_jobs,
@@ -70,8 +71,10 @@ class TestParseSchedule:
result = parse_schedule("30m")
assert result["kind"] == "once"
assert "run_at" in result
# run_at should be ~30 minutes from now
run_at = datetime.fromisoformat(result["run_at"])
# run_at should be a valid ISO timestamp string ~30 minutes from now
run_at_str = result["run_at"]
assert isinstance(run_at_str, str)
run_at = datetime.fromisoformat(run_at_str)
assert run_at > datetime.now()
assert run_at < datetime.now() + timedelta(minutes=31)
@@ -140,8 +143,10 @@ class TestComputeNextRun:
pytest.importorskip("croniter")
schedule = {"kind": "cron", "expr": "* * * * *"} # every minute
result = compute_next_run(schedule)
assert result is not None
assert isinstance(result, str), f"Expected ISO timestamp string, got {type(result)}"
assert len(result) > 0
next_dt = datetime.fromisoformat(result)
assert isinstance(next_dt, datetime)
assert next_dt > datetime.now()
def test_unknown_kind_returns_none(self):
@@ -207,6 +212,48 @@ class TestJobCRUD:
assert job["deliver"] == "local"
class TestUpdateJob:
def test_update_name(self, tmp_cron_dir):
job = create_job(prompt="Check server status", schedule="every 1h", name="Old Name")
assert job["name"] == "Old Name"
updated = update_job(job["id"], {"name": "New Name"})
assert updated is not None
assert isinstance(updated, dict)
assert updated["name"] == "New Name"
# Verify other fields are preserved
assert updated["prompt"] == "Check server status"
assert updated["id"] == job["id"]
assert updated["schedule"] == job["schedule"]
# Verify persisted to disk
fetched = get_job(job["id"])
assert fetched["name"] == "New Name"
def test_update_schedule(self, tmp_cron_dir):
job = create_job(prompt="Daily report", schedule="every 1h")
assert job["schedule"]["kind"] == "interval"
assert job["schedule"]["minutes"] == 60
new_schedule = parse_schedule("every 2h")
updated = update_job(job["id"], {"schedule": new_schedule})
assert updated is not None
assert updated["schedule"]["kind"] == "interval"
assert updated["schedule"]["minutes"] == 120
# Verify persisted to disk
fetched = get_job(job["id"])
assert fetched["schedule"]["minutes"] == 120
def test_update_enable_disable(self, tmp_cron_dir):
job = create_job(prompt="Toggle me", schedule="every 1h")
assert job["enabled"] is True
updated = update_job(job["id"], {"enabled": False})
assert updated["enabled"] is False
fetched = get_job(job["id"])
assert fetched["enabled"] is False
def test_update_nonexistent_returns_none(self, tmp_cron_dir):
result = update_job("nonexistent_id", {"name": "X"})
assert result is None
class TestMarkJobRun:
def test_increments_completed(self, tmp_cron_dir):
job = create_job(prompt="Test", schedule="every 1h")

View File

@@ -15,9 +15,11 @@ class TestResolveOrigin:
}
}
result = _resolve_origin(job)
assert result is not None
assert isinstance(result, dict)
assert result == job["origin"]
assert result["platform"] == "telegram"
assert result["chat_id"] == "123456"
assert result["chat_name"] == "Test Chat"
def test_no_origin(self):
assert _resolve_origin({}) is None

View File

@@ -177,8 +177,10 @@ class TestEmit:
@pytest.mark.asyncio
async def test_no_handlers_for_event(self, tmp_path):
reg = HookRegistry()
# Should not raise
await reg.emit("unknown:event", {})
# Should not raise and should have no handlers registered
result = await reg.emit("unknown:event", {})
assert result is None
assert not reg._handlers.get("unknown:event")
@pytest.mark.asyncio
async def test_handler_error_does_not_propagate(self, tmp_path):
@@ -190,8 +192,10 @@ class TestEmit:
with patch("gateway.hooks.HOOKS_DIR", tmp_path):
reg.discover_and_load()
assert len(reg._handlers.get("agent:start", [])) == 1
# Should not raise even though handler throws
await reg.emit("agent:start", {})
result = await reg.emit("agent:start", {})
assert result is None
@pytest.mark.asyncio
async def test_emit_default_context(self, tmp_path):

View File

@@ -54,7 +54,7 @@ class TestCodeGeneration:
with patch("gateway.pairing.PAIRING_DIR", tmp_path):
store = PairingStore()
code = store.generate_code("telegram", "user1", "Alice")
assert code is not None
assert isinstance(code, str) and len(code) == CODE_LENGTH
assert len(code) == CODE_LENGTH
assert all(c in ALPHABET for c in code)
@@ -65,7 +65,7 @@ class TestCodeGeneration:
codes = set()
for i in range(3):
code = store.generate_code("telegram", f"user{i}")
assert code is not None
assert isinstance(code, str) and len(code) == CODE_LENGTH
codes.add(code)
assert len(codes) == 3
@@ -91,7 +91,7 @@ class TestRateLimiting:
store = PairingStore()
code1 = store.generate_code("telegram", "user1")
code2 = store.generate_code("telegram", "user1")
assert code1 is not None
assert isinstance(code1, str) and len(code1) == CODE_LENGTH
assert code2 is None # rate limited
def test_different_users_not_rate_limited(self, tmp_path):
@@ -99,14 +99,14 @@ class TestRateLimiting:
store = PairingStore()
code1 = store.generate_code("telegram", "user1")
code2 = store.generate_code("telegram", "user2")
assert code1 is not None
assert code2 is not None
assert isinstance(code1, str) and len(code1) == CODE_LENGTH
assert isinstance(code2, str) and len(code2) == CODE_LENGTH
def test_rate_limit_expires(self, tmp_path):
with patch("gateway.pairing.PAIRING_DIR", tmp_path):
store = PairingStore()
code1 = store.generate_code("telegram", "user1")
assert code1 is not None
assert isinstance(code1, str) and len(code1) == CODE_LENGTH
# Simulate rate limit expiry
limits = store._load_json(store._rate_limit_path())
@@ -114,7 +114,8 @@ class TestRateLimiting:
store._save_json(store._rate_limit_path(), limits)
code2 = store.generate_code("telegram", "user1")
assert code2 is not None
assert isinstance(code2, str) and len(code2) == CODE_LENGTH
assert code2 != code1
# ---------------------------------------------------------------------------
@@ -132,7 +133,7 @@ class TestMaxPending:
codes.append(code)
# First MAX_PENDING_PER_PLATFORM should succeed
assert all(c is not None for c in codes[:MAX_PENDING_PER_PLATFORM])
assert all(isinstance(c, str) and len(c) == CODE_LENGTH for c in codes[:MAX_PENDING_PER_PLATFORM])
# Next one should be blocked
assert codes[MAX_PENDING_PER_PLATFORM] is None
@@ -143,7 +144,7 @@ class TestMaxPending:
store.generate_code("telegram", f"user{i}")
# Different platform should still work
code = store.generate_code("discord", "user0")
assert code is not None
assert isinstance(code, str) and len(code) == CODE_LENGTH
# ---------------------------------------------------------------------------
@@ -158,7 +159,9 @@ class TestApprovalFlow:
code = store.generate_code("telegram", "user1", "Alice")
result = store.approve_code("telegram", code)
assert result is not None
assert isinstance(result, dict)
assert "user_id" in result
assert "user_name" in result
assert result["user_id"] == "user1"
assert result["user_name"] == "Alice"
@@ -187,14 +190,18 @@ class TestApprovalFlow:
store = PairingStore()
code = store.generate_code("telegram", "user1", "Alice")
result = store.approve_code("telegram", code.lower())
assert result is not None
assert isinstance(result, dict)
assert result["user_id"] == "user1"
assert result["user_name"] == "Alice"
def test_approve_strips_whitespace(self, tmp_path):
with patch("gateway.pairing.PAIRING_DIR", tmp_path):
store = PairingStore()
code = store.generate_code("telegram", "user1", "Alice")
result = store.approve_code("telegram", f" {code} ")
assert result is not None
assert isinstance(result, dict)
assert result["user_id"] == "user1"
assert result["user_name"] == "Alice"
def test_invalid_code_returns_none(self, tmp_path):
with patch("gateway.pairing.PAIRING_DIR", tmp_path):

View File

@@ -92,36 +92,50 @@ class TestExtractImages:
content = "![photo](https://example.com/photo.jpg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpg"
assert images[0][1] == "photo"
def test_markdown_image_jpeg(self):
content = "![](https://example.com/photo.jpeg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpeg"
assert images[0][1] == ""
def test_markdown_image_gif(self):
content = "![anim](https://example.com/anim.gif)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/anim.gif"
assert images[0][1] == "anim"
def test_markdown_image_webp(self):
content = "![](https://example.com/img.webp)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/img.webp"
assert images[0][1] == ""
def test_fal_media_cdn(self):
content = "![gen](https://fal.media/files/abc123/output.png)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal.media/files/abc123/output.png"
assert images[0][1] == "gen"
def test_fal_cdn_url(self):
content = "![](https://fal-cdn.example.com/result)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal-cdn.example.com/result"
assert images[0][1] == ""
def test_replicate_delivery(self):
content = "![](https://replicate.delivery/pbxt/abc/output)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://replicate.delivery/pbxt/abc/output"
assert images[0][1] == ""
def test_non_image_ext_not_extracted(self):
"""Markdown image with non-image extension should not be extracted."""
@@ -142,11 +156,15 @@ class TestExtractImages:
content = '<img src="https://example.com/photo.png"/>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_html_img_with_closing_tag(self):
content = '<img src="https://example.com/photo.png"></img>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_multiple_images(self):
content = "![a](https://example.com/a.png)\n![b](https://example.com/b.jpg)"
@@ -267,6 +285,11 @@ class TestTruncateMessage:
msg = "word " * 200 # ~1000 chars
chunks = adapter.truncate_message(msg, max_length=200)
assert len(chunks) > 1
# Verify all original content is preserved across chunks
reassembled = "".join(chunks)
# Strip chunk indicators like (1/N) to get raw content
for word in msg.strip().split():
assert word in reassembled, f"Word '{word}' lost during truncation"
def test_chunks_have_indicators(self):
adapter = self._adapter()

View File

@@ -23,7 +23,7 @@ class TestMaxTurnsResolution:
def test_default_max_turns_is_integer(self):
cli = _make_cli()
assert isinstance(cli.max_turns, int)
assert cli.max_turns > 0
assert cli.max_turns == 60
def test_explicit_max_turns_honored(self):
cli = _make_cli(max_turns=25)
@@ -32,7 +32,7 @@ class TestMaxTurnsResolution:
def test_none_max_turns_gets_default(self):
cli = _make_cli(max_turns=None)
assert isinstance(cli.max_turns, int)
assert cli.max_turns > 0
assert cli.max_turns == 60
def test_env_var_max_turns(self, monkeypatch):
"""Env var is used when config file doesn't set max_turns."""
@@ -54,7 +54,7 @@ class TestMaxTurnsResolution:
def test_max_turns_never_none_for_agent(self):
"""The value passed to AIAgent must never be None (causes TypeError in run_conversation)."""
cli = _make_cli()
assert cli.max_turns is not None
assert isinstance(cli.max_turns, int) and cli.max_turns == 60
class TestVerboseAndToolProgress:
@@ -81,4 +81,4 @@ class TestProviderResolution:
def test_model_is_string(self):
cli = _make_cli()
assert isinstance(cli.model, str)
assert len(cli.model) > 0
assert isinstance(cli.model, str) and '/' in cli.model

View File

@@ -43,7 +43,7 @@ class TestSessionLifecycle:
db.end_session("s1", end_reason="user_exit")
session = db.get_session("s1")
assert session["ended_at"] is not None
assert isinstance(session["ended_at"], float)
assert session["end_reason"] == "user_exit"
def test_update_system_prompt(self, db):
@@ -138,7 +138,7 @@ class TestFTS5Search:
db.append_message("s1", role="assistant", content="Use docker compose up.")
results = db.search_messages("docker")
assert len(results) >= 1
assert len(results) == 2
# At least one result should mention docker
snippets = [r.get("snippet", "") for r in results]
assert any("docker" in s.lower() or "Docker" in s for s in snippets)
@@ -174,8 +174,10 @@ class TestFTS5Search:
db.append_message("s1", role="assistant", content="Kubernetes is an orchestrator.")
results = db.search_messages("Kubernetes")
assert len(results) >= 1
assert len(results) == 2
assert "context" in results[0]
assert isinstance(results[0]["context"], list)
assert len(results[0]["context"]) > 0
# =========================================================================
@@ -266,7 +268,7 @@ class TestDeleteAndExport:
db.append_message("s1", role="assistant", content="Hi")
export = db.export_session("s1")
assert export is not None
assert isinstance(export, dict)
assert export["source"] == "cli"
assert len(export["messages"]) == 2
@@ -312,7 +314,9 @@ class TestPruneSessions:
pruned = db.prune_sessions(older_than_days=90)
assert pruned == 1
assert db.get_session("old") is None
assert db.get_session("new") is not None
session = db.get_session("new")
assert session is not None
assert session["id"] == "new"
def test_prune_skips_active_sessions(self, db):
db.create_session(session_id="active", source="cli")