"""Tests for queue.json validation and backup in triage_score.py.""" from __future__ import annotations import json import pytest import scripts.triage_score as ts @pytest.fixture(autouse=True) def _isolate(tmp_path, monkeypatch): """Redirect triage_score paths to tmp_path for isolation.""" monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json") monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak") monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl") monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json") monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl") def test_backup_created_on_write(tmp_path): """When writing queue.json, a backup should be created from previous valid file.""" # Create initial valid queue file initial_data = [{"issue": 1, "title": "Test", "ready": True}] ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text(json.dumps(initial_data)) # Write new data new_data = [{"issue": 2, "title": "New", "ready": True}] ts.QUEUE_FILE.write_text(json.dumps(new_data, indent=2) + "\n") # Manually run the backup logic as run_triage would if ts.QUEUE_FILE.exists(): try: json.loads(ts.QUEUE_FILE.read_text()) ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text()) except (json.JSONDecodeError, OSError): pass # Both files should exist with same content assert ts.QUEUE_BACKUP_FILE.exists() assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == new_data def test_corrupt_queue_restored_from_backup(tmp_path, capsys): """If queue.json is corrupt, it should be restored from backup.""" # Create a valid backup valid_data = [{"issue": 1, "title": "Backup", "ready": True}] ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_data, indent=2) + "\n") # Create a corrupt queue file ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text("not valid json {{{") # Run validation and restore logic try: json.loads(ts.QUEUE_FILE.read_text()) except (json.JSONDecodeError, OSError): if ts.QUEUE_BACKUP_FILE.exists(): try: backup_data = ts.QUEUE_BACKUP_FILE.read_text() json.loads(backup_data) # Validate backup ts.QUEUE_FILE.write_text(backup_data) print("[triage] Restored queue.json from backup") except (json.JSONDecodeError, OSError): ts.QUEUE_FILE.write_text("[]\n") else: ts.QUEUE_FILE.write_text("[]\n") # Queue should be restored from backup assert json.loads(ts.QUEUE_FILE.read_text()) == valid_data captured = capsys.readouterr() assert "Restored queue.json from backup" in captured.out def test_corrupt_queue_no_backup_writes_empty_list(tmp_path): """If queue.json is corrupt and no backup exists, write empty list.""" # Ensure no backup exists assert not ts.QUEUE_BACKUP_FILE.exists() # Create a corrupt queue file ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text("not valid json {{{") # Run validation and restore logic try: json.loads(ts.QUEUE_FILE.read_text()) except (json.JSONDecodeError, OSError): if ts.QUEUE_BACKUP_FILE.exists(): try: backup_data = ts.QUEUE_BACKUP_FILE.read_text() json.loads(backup_data) ts.QUEUE_FILE.write_text(backup_data) except (json.JSONDecodeError, OSError): ts.QUEUE_FILE.write_text("[]\n") else: ts.QUEUE_FILE.write_text("[]\n") # Should have empty list assert json.loads(ts.QUEUE_FILE.read_text()) == [] def test_corrupt_backup_writes_empty_list(tmp_path): """If both queue.json and backup are corrupt, write empty list.""" # Create a corrupt backup ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_BACKUP_FILE.write_text("also corrupt backup") # Create a corrupt queue file ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text("not valid json {{{") # Run validation and restore logic try: json.loads(ts.QUEUE_FILE.read_text()) except (json.JSONDecodeError, OSError): if ts.QUEUE_BACKUP_FILE.exists(): try: backup_data = ts.QUEUE_BACKUP_FILE.read_text() json.loads(backup_data) ts.QUEUE_FILE.write_text(backup_data) except (json.JSONDecodeError, OSError): ts.QUEUE_FILE.write_text("[]\n") else: ts.QUEUE_FILE.write_text("[]\n") # Should have empty list assert json.loads(ts.QUEUE_FILE.read_text()) == [] def test_valid_queue_not_corrupt_no_backup_overwrite(tmp_path): """Don't overwrite backup if current queue.json is corrupt.""" # Create a valid backup valid_backup = [{"issue": 99, "title": "Old Backup", "ready": True}] ts.QUEUE_BACKUP_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_BACKUP_FILE.write_text(json.dumps(valid_backup, indent=2) + "\n") # Create a corrupt queue file ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text("corrupt data") # Try to save backup (should skip because current is corrupt) if ts.QUEUE_FILE.exists(): try: json.loads(ts.QUEUE_FILE.read_text()) # This will fail ts.QUEUE_BACKUP_FILE.write_text(ts.QUEUE_FILE.read_text()) except (json.JSONDecodeError, OSError): pass # Should hit this branch # Backup should still have original valid data assert json.loads(ts.QUEUE_BACKUP_FILE.read_text()) == valid_backup def test_backup_path_configuration(): """Ensure backup file path is properly configured relative to queue file.""" assert ts.QUEUE_BACKUP_FILE.parent == ts.QUEUE_FILE.parent assert ts.QUEUE_BACKUP_FILE.name == "queue.json.bak" assert ts.QUEUE_FILE.name == "queue.json" def test_exclusions_file_path(): """Ensure exclusions file path is properly configured.""" assert ts.EXCLUSIONS_FILE.name == "queue_exclusions.json" assert ts.EXCLUSIONS_FILE.parent == ts.REPO_ROOT / ".loop" def test_load_exclusions_empty_file(tmp_path): """Loading from empty/non-existent exclusions file returns empty list.""" assert ts.load_exclusions() == [] def test_load_exclusions_with_data(tmp_path, monkeypatch): """Loading exclusions returns list of integers.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) ts.EXCLUSIONS_FILE.write_text("[123, 456, 789]") assert ts.load_exclusions() == [123, 456, 789] def test_load_exclusions_with_strings(tmp_path, monkeypatch): """Loading exclusions handles string numbers gracefully.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) ts.EXCLUSIONS_FILE.write_text('["100", 200, "invalid", 300]') assert ts.load_exclusions() == [100, 200, 300] def test_load_exclusions_corrupt_file(tmp_path, monkeypatch): """Loading from corrupt exclusions file returns empty list.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) ts.EXCLUSIONS_FILE.write_text("not valid json") assert ts.load_exclusions() == [] def test_save_exclusions(tmp_path, monkeypatch): """Saving exclusions writes sorted unique integers.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.save_exclusions([300, 100, 200, 100]) # includes duplicate assert json.loads(ts.EXCLUSIONS_FILE.read_text()) == [100, 200, 300] def test_merge_preserves_existing_queue(tmp_path, monkeypatch): """Merge logic preserves existing queue items and only adds new ones.""" monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json") monkeypatch.setattr(ts, "QUEUE_BACKUP_FILE", tmp_path / "queue.json.bak") monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") monkeypatch.setattr(ts, "RETRO_FILE", tmp_path / "retro" / "triage.jsonl") monkeypatch.setattr(ts, "QUARANTINE_FILE", tmp_path / "quarantine.json") monkeypatch.setattr(ts, "CYCLE_RETRO_FILE", tmp_path / "retro" / "cycles.jsonl") # Setup: existing queue with 2 items (simulating deep triage cut) existing = [ {"issue": 1, "title": "Existing A", "ready": True, "score": 8}, {"issue": 2, "title": "Existing B", "ready": True, "score": 7}, ] ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text(json.dumps(existing)) # Simulate merge logic (extracted from run_triage) newly_ready = [ {"issue": 1, "title": "Existing A", "ready": True, "score": 8}, # duplicate {"issue": 2, "title": "Existing B", "ready": True, "score": 7}, # duplicate {"issue": 3, "title": "New C", "ready": True, "score": 9}, # new ] exclusions = [] existing_queue = json.loads(ts.QUEUE_FILE.read_text()) existing_issues = {item["issue"] for item in existing_queue} new_items = [ s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions ] merged = existing_queue + new_items # Should preserve existing (2 items) + add new (1 item) = 3 items assert len(merged) == 3 assert merged[0]["issue"] == 1 assert merged[1]["issue"] == 2 assert merged[2]["issue"] == 3 def test_excluded_issues_not_added(tmp_path, monkeypatch): """Excluded issues are never added to the queue.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) ts.EXCLUSIONS_FILE.write_text("[5, 10]") exclusions = ts.load_exclusions() newly_ready = [ {"issue": 5, "title": "Excluded A", "ready": True}, {"issue": 6, "title": "New B", "ready": True}, {"issue": 10, "title": "Excluded C", "ready": True}, ] # Filter out excluded filtered = [s for s in newly_ready if s["issue"] not in exclusions] assert len(filtered) == 1 assert filtered[0]["issue"] == 6 def test_excluded_issues_removed_from_scored(tmp_path, monkeypatch): """Excluded issues are filtered out before any queue logic.""" monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") ts.EXCLUSIONS_FILE.parent.mkdir(parents=True, exist_ok=True) ts.EXCLUSIONS_FILE.write_text("[42]") exclusions = ts.load_exclusions() scored = [ {"issue": 41, "title": "Keep", "ready": True}, {"issue": 42, "title": "Excluded", "ready": True}, {"issue": 43, "title": "Keep Too", "ready": True}, ] filtered = [s for s in scored if s["issue"] not in exclusions] assert len(filtered) == 2 assert 42 not in [s["issue"] for s in filtered] def test_empty_queue_merge_adds_all_new_items(tmp_path, monkeypatch): """When queue is empty, all new ready items are added.""" monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json") monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") # No existing queue file assert not ts.QUEUE_FILE.exists() newly_ready = [ {"issue": 1, "title": "A", "ready": True}, {"issue": 2, "title": "B", "ready": True}, ] exclusions = ts.load_exclusions() existing_queue = [] if ts.QUEUE_FILE.exists(): existing_queue = json.loads(ts.QUEUE_FILE.read_text()) existing_issues = {item["issue"] for item in existing_queue} new_items = [ s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions ] merged = existing_queue + new_items assert len(merged) == 2 assert merged[0]["issue"] == 1 assert merged[1]["issue"] == 2 def test_queue_preserved_when_no_new_ready_items(tmp_path, monkeypatch): """Existing queue is preserved even when no new ready items are found.""" monkeypatch.setattr(ts, "QUEUE_FILE", tmp_path / "queue.json") monkeypatch.setattr(ts, "EXCLUSIONS_FILE", tmp_path / "exclusions.json") existing = [{"issue": 1, "title": "Only Item", "ready": True}] ts.QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True) ts.QUEUE_FILE.write_text(json.dumps(existing)) newly_ready = [] # No new ready items exclusions = ts.load_exclusions() existing_queue = json.loads(ts.QUEUE_FILE.read_text()) existing_issues = {item["issue"] for item in existing_queue} new_items = [ s for s in newly_ready if s["issue"] not in existing_issues and s["issue"] not in exclusions ] merged = existing_queue + new_items assert len(merged) == 1 assert merged[0]["issue"] == 1