Files
Timmy-time-dashboard/tests/scripts/test_triage_score_validation.py
Kimi Agent 57490338dd
Some checks are pending
Tests / test (push) Blocked by required conditions
Tests / lint (push) Successful in 19s
[kimi] Fix triage_score.py to merge queue instead of overwrite (#1463) (#1464)
2026-03-24 20:21:49 +00:00

332 lines
13 KiB
Python

"""Tests for queue.json validation and backup in triage_score.py."""
from __future__ import annotations
import json
import pytest
import scripts.triage_score as ts
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Redirect triage_score paths to tmp_path for isolation."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
def test_backup_created_on_write(tmp_path):
"""When writing queue.json, a backup should be created from previous valid file."""
# Create initial valid queue file
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
# Write new data
new_data = [{"issue": 2, "title": "New", "ready": True}]
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
# Manually run the backup logic as run_triage would
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text())
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
# Both files should exist with same content
assert ts.QUEUE_BACKUP_FILE.exists()
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
"""If queue.json is corrupt, it should be restored from backup."""
# Create a valid backup
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data) # Validate backup
ts.QUEUE_FILE.write_text(backup_data)
print("[triage] Restored queue.json from backup")
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Queue should be restored from backup
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
captured = capsys.readouterr()
assert "Restored queue.json from backup" in captured.out
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
"""If queue.json is corrupt and no backup exists, write empty list."""
# Ensure no backup exists
assert not ts.QUEUE_BACKUP_FILE.exists()
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_corrupt_backup_writes_empty_list(tmp_path):
"""If both queue.json and backup are corrupt, write empty list."""
# Create a corrupt backup
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("not valid json {{{")
# Run validation and restore logic
try:
json.loads(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
if ts.QUEUE_BACKUP_FILE.exists():
try:
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
json.loads(backup_data)
ts.QUEUE_FILE.write_text(backup_data)
except (json.JSONDecodeError, OSError):
ts.QUEUE_FILE.write_text("[]\n")
else:
ts.QUEUE_FILE.write_text("[]\n")
# Should have empty list
assert json.loads(ts.QUEUE_FILE.read_text()) == []
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
"""Don't overwrite backup if current queue.json is corrupt."""
# Create a valid backup
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
# Create a corrupt queue file
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text("corrupt data")
# Try to save backup (should skip because current is corrupt)
if ts.QUEUE_FILE.exists():
try:
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass # Should hit this branch
# Backup should still have original valid data
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
def test_backup_path_configuration():
"""Ensure backup file path is properly configured relative to queue file."""
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
assert ts.QUEUE_FILE.name == "queue.json"
def test_exclusions_file_path():
"""Ensure exclusions file path is properly configured."""
assert ts.EXCLUSIONS_FILE.name == "queue_exclusions.json"
assert ts.EXCLUSIONS_FILE.parent == ts.REPO_ROOT / ".loop"
def test_load_exclusions_empty_file(tmp_path):
"""Loading from empty/non-existent exclusions file returns empty list."""
assert ts.load_exclusions() == []
def test_load_exclusions_with_data(tmp_path, monkeypatch):
"""Loading exclusions returns list of integers."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.EXCLUSIONS_FILE.write_text("[123, 456, 789]")
assert ts.load_exclusions() == [123, 456, 789]
def test_load_exclusions_with_strings(tmp_path, monkeypatch):
"""Loading exclusions handles string numbers gracefully."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.EXCLUSIONS_FILE.write_text('["100", 200, "invalid", 300]')
assert ts.load_exclusions() == [100, 200, 300]
def test_load_exclusions_corrupt_file(tmp_path, monkeypatch):
"""Loading from corrupt exclusions file returns empty list."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.EXCLUSIONS_FILE.write_text("not valid json")
assert ts.load_exclusions() == []
def test_save_exclusions(tmp_path, monkeypatch):
"""Saving exclusions writes sorted unique integers."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.save_exclusions([300, 100, 200, 100]) # includes duplicate
assert json.loads(ts.EXCLUSIONS_FILE.read_text()) == [100, 200, 300]
def test_merge_preserves_existing_queue(tmp_path, monkeypatch):
"""Merge logic preserves existing queue items and only adds new ones."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
# Setup: existing queue with 2 items (simulating deep triage cut)
existing = [
{"issue": 1, "title": "Existing A", "ready": True, "score": 8},
{"issue": 2, "title": "Existing B", "ready": True, "score": 7},
]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(existing))
# Simulate merge logic (extracted from run_triage)
newly_ready = [
{"issue": 1, "title": "Existing A", "ready": True, "score": 8}, # duplicate
{"issue": 2, "title": "Existing B", "ready": True, "score": 7}, # duplicate
{"issue": 3, "title": "New C", "ready": True, "score": 9}, # new
]
exclusions = []
existing_queue = json.loads(ts.QUEUE_FILE.read_text())
existing_issues = {item["issue"] for item in existing_queue}
new_items = [
s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions
]
merged = existing_queue + new_items
# Should preserve existing (2 items) + add new (1 item) = 3 items
assert len(merged) == 3
assert merged[0]["issue"] == 1
assert merged[1]["issue"] == 2
assert merged[2]["issue"] == 3
def test_excluded_issues_not_added(tmp_path, monkeypatch):
"""Excluded issues are never added to the queue."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.EXCLUSIONS_FILE.write_text("[5, 10]")
exclusions = ts.load_exclusions()
newly_ready = [
{"issue": 5, "title": "Excluded A", "ready": True},
{"issue": 6, "title": "New B", "ready": True},
{"issue": 10, "title": "Excluded C", "ready": True},
]
# Filter out excluded
filtered = [s for s in newly_ready if s["issue"] not in exclusions]
assert len(filtered) == 1
assert filtered[0]["issue"] == 6
def test_excluded_issues_removed_from_scored(tmp_path, monkeypatch):
"""Excluded issues are filtered out before any queue logic."""
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.EXCLUSIONS_FILE.write_text("[42]")
exclusions = ts.load_exclusions()
scored = [
{"issue": 41, "title": "Keep", "ready": True},
{"issue": 42, "title": "Excluded", "ready": True},
{"issue": 43, "title": "Keep Too", "ready": True},
]
filtered = [s for s in scored if s["issue"] not in exclusions]
assert len(filtered) == 2
assert 42 not in [s["issue"] for s in filtered]
def test_empty_queue_merge_adds_all_new_items(tmp_path, monkeypatch):
"""When queue is empty, all new ready items are added."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
# No existing queue file
assert not ts.QUEUE_FILE.exists()
newly_ready = [
{"issue": 1, "title": "A", "ready": True},
{"issue": 2, "title": "B", "ready": True},
]
exclusions = ts.load_exclusions()
existing_queue = []
if ts.QUEUE_FILE.exists():
existing_queue = json.loads(ts.QUEUE_FILE.read_text())
existing_issues = {item["issue"] for item in existing_queue}
new_items = [
s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions
]
merged = existing_queue + new_items
assert len(merged) == 2
assert merged[0]["issue"] == 1
assert merged[1]["issue"] == 2
def test_queue_preserved_when_no_new_ready_items(tmp_path, monkeypatch):
"""Existing queue is preserved even when no new ready items are found."""
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json")
existing = [{"issue": 1, "title": "Only Item", "ready": True}]
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
ts.QUEUE_FILE.write_text(json.dumps(existing))
newly_ready = [] # No new ready items
exclusions = ts.load_exclusions()
existing_queue = json.loads(ts.QUEUE_FILE.read_text())
existing_issues = {item["issue"] for item in existing_queue}
new_items = [
s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions
]
merged = existing_queue + new_items
assert len(merged) == 1
assert merged[0]["issue"] == 1