forked from Rockachopa/Timmy-time-dashboard
160 lines
6.1 KiB
Python
160 lines
6.1 KiB
Python
"""Tests for queue.json validation and backup in triage_score.py."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
import scripts.triage_score as ts
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate(tmp_path, monkeypatch):
|
|
"""Redirect triage_score paths to tmp_path for isolation."""
|
|
monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json")
|
|
monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak")
|
|
monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl")
|
|
monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json")
|
|
monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl")
|
|
|
|
|
|
def test_backup_created_on_write(tmp_path):
|
|
"""When writing queue.json, a backup should be created from previous valid file."""
|
|
# Create initial valid queue file
|
|
initial_data = [{"issue": 1, "title": "Test", "ready": True}]
|
|
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_FILE.write_text(json.dumps(initial_data))
|
|
|
|
# Write new data
|
|
new_data = [{"issue": 2, "title": "New", "ready": True}]
|
|
ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n")
|
|
|
|
# Manually run the backup logic as run_triage would
|
|
if ts.QUEUE_FILE.exists():
|
|
try:
|
|
json.loads(ts.QUEUE_FILE.read_text())
|
|
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
|
|
# Both files should exist with same content
|
|
assert ts.QUEUE_BACKUP_FILE.exists()
|
|
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data
|
|
|
|
|
|
def test_corrupt_queue_restored_from_backup(tmp_path, capsys):
|
|
"""If queue.json is corrupt, it should be restored from backup."""
|
|
# Create a valid backup
|
|
valid_data = [{"issue": 1, "title": "Backup", "ready": True}]
|
|
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n")
|
|
|
|
# Create a corrupt queue file
|
|
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_FILE.write_text("not valid json {{{")
|
|
|
|
# Run validation and restore logic
|
|
try:
|
|
json.loads(ts.QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
if ts.QUEUE_BACKUP_FILE.exists():
|
|
try:
|
|
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
|
|
json.loads(backup_data) # Validate backup
|
|
ts.QUEUE_FILE.write_text(backup_data)
|
|
print("[triage] Restored queue.json from backup")
|
|
except (json.JSONDecodeError, OSError):
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
else:
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
|
|
# Queue should be restored from backup
|
|
assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data
|
|
captured = capsys.readouterr()
|
|
assert "Restored queue.json from backup" in captured.out
|
|
|
|
|
|
def test_corrupt_queue_no_backup_writes_empty_list(tmp_path):
|
|
"""If queue.json is corrupt and no backup exists, write empty list."""
|
|
# Ensure no backup exists
|
|
assert not ts.QUEUE_BACKUP_FILE.exists()
|
|
|
|
# Create a corrupt queue file
|
|
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_FILE.write_text("not valid json {{{")
|
|
|
|
# Run validation and restore logic
|
|
try:
|
|
json.loads(ts.QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
if ts.QUEUE_BACKUP_FILE.exists():
|
|
try:
|
|
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
|
|
json.loads(backup_data)
|
|
ts.QUEUE_FILE.write_text(backup_data)
|
|
except (json.JSONDecodeError, OSError):
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
else:
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
|
|
# Should have empty list
|
|
assert json.loads(ts.QUEUE_FILE.read_text()) == []
|
|
|
|
|
|
def test_corrupt_backup_writes_empty_list(tmp_path):
|
|
"""If both queue.json and backup are corrupt, write empty list."""
|
|
# Create a corrupt backup
|
|
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup")
|
|
|
|
# Create a corrupt queue file
|
|
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_FILE.write_text("not valid json {{{")
|
|
|
|
# Run validation and restore logic
|
|
try:
|
|
json.loads(ts.QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
if ts.QUEUE_BACKUP_FILE.exists():
|
|
try:
|
|
backup_data = ts.QUEUE_BACKUP_FILE.read_text()
|
|
json.loads(backup_data)
|
|
ts.QUEUE_FILE.write_text(backup_data)
|
|
except (json.JSONDecodeError, OSError):
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
else:
|
|
ts.QUEUE_FILE.write_text("[]\n")
|
|
|
|
# Should have empty list
|
|
assert json.loads(ts.QUEUE_FILE.read_text()) == []
|
|
|
|
|
|
def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path):
|
|
"""Don't overwrite backup if current queue.json is corrupt."""
|
|
# Create a valid backup
|
|
valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}]
|
|
ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n")
|
|
|
|
# Create a corrupt queue file
|
|
ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
ts.QUEUE_FILE.write_text("corrupt data")
|
|
|
|
# Try to save backup (should skip because current is corrupt)
|
|
if ts.QUEUE_FILE.exists():
|
|
try:
|
|
json.loads(ts.QUEUE_FILE.read_text()) # This will fail
|
|
ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass # Should hit this branch
|
|
|
|
# Backup should still have original valid data
|
|
assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup
|
|
|
|
|
|
def test_backup_path_configuration():
|
|
"""Ensure backup file path is properly configured relative to queue file."""
|
|
assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent
|
|
assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak"
|
|
assert ts.QUEUE_FILE.name == "queue.json"
|