Compare commits
5 Commits
feat/655-a
...
feat/647-s
| Author | SHA1 | Date | |
|---|---|---|---|
| 2e6b95dd3d | |||
| 40b68a13cb | |||
| 6fbf5bb649 | |||
| 9ec0a22d6a | |||
| 6b984532a1 |
@@ -5,7 +5,9 @@ on:
|
||||
paths:
|
||||
- 'training/data/**/*.jsonl'
|
||||
- 'training/data/**/schema.json'
|
||||
- 'training/data/**/validate.py'
|
||||
- 'training-data/*.jsonl'
|
||||
- 'training-data/schema.json'
|
||||
- 'scripts/validate-scene-data.py'
|
||||
|
||||
jobs:
|
||||
validate:
|
||||
@@ -18,21 +20,11 @@ jobs:
|
||||
with:
|
||||
python-version: '3.11'
|
||||
|
||||
- name: Validate scene description JSONL
|
||||
- name: Validate scene descriptions (training-data/)
|
||||
run: |
|
||||
python3 scripts/validate-scene-data.py training-data/scene-descriptions-*.jsonl
|
||||
|
||||
- name: Validate scene descriptions (training/data/)
|
||||
run: |
|
||||
cd training/data/scene-descriptions
|
||||
python3 validate.py *.jsonl
|
||||
- name: Validate training data provenance
|
||||
run: |
|
||||
cd training
|
||||
python3 -c "
|
||||
from training_pair_provenance import validate_provenance
|
||||
import json, sys, glob
|
||||
issues = 0
|
||||
for f in glob.glob('data/*.jsonl'):
|
||||
report = validate_provenance(f)
|
||||
print(f'{f}: {report["coverage"]:.0f}% coverage ({report["with_provenance"]}/{report["total"]})')
|
||||
if report['missing_provenance'] > 0:
|
||||
print(f' WARNING: {report["missing_provenance"]} pairs missing provenance')
|
||||
sys.exit(0)
|
||||
"
|
||||
|
||||
223
scripts/config_validate.py
Normal file
223
scripts/config_validate.py
Normal file
@@ -0,0 +1,223 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
config-validate — Pre-deploy validation for timmy-config YAML files.
|
||||
|
||||
Validates YAML syntax, required keys, and value types before writing
|
||||
config to disk. Prevents broken deploys from bad config.
|
||||
|
||||
Usage:
|
||||
python scripts/config_validate.py config.yaml
|
||||
python scripts/config_validate.py config.yaml --deploy # Validate + write
|
||||
python scripts/config_validate.py --schema # Show expected schema
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
try:
|
||||
import yaml
|
||||
HAS_YAML = True
|
||||
except ImportError:
|
||||
HAS_YAML = False
|
||||
|
||||
|
||||
# Expected schema for hermes config.yaml
|
||||
REQUIRED_KEYS = {
|
||||
"model": {"type": str, "required": True, "description": "Default model name"},
|
||||
}
|
||||
|
||||
OPTIONAL_KEYS = {
|
||||
"provider": {"type": str, "required": False, "description": "Default provider"},
|
||||
"providers": {"type": dict, "required": False, "description": "Provider configuration"},
|
||||
"fallback_providers": {"type": list, "required": False, "description": "Fallback chain"},
|
||||
"toolsets": {"type": list, "required": False, "description": "Enabled toolsets"},
|
||||
"agent": {"type": dict, "required": False, "description": "Agent configuration"},
|
||||
"display": {"type": dict, "required": False, "description": "Display settings"},
|
||||
}
|
||||
|
||||
# Keys that should NOT be present
|
||||
FORBIDDEN_KEYS = {
|
||||
"anthropic_api_key": "Use ANTHROPIC_API_KEY env var instead",
|
||||
"openai_api_key": "Use OPENAI_API_KEY env var instead",
|
||||
"password": "Never put passwords in config",
|
||||
"secret": "Never put secrets in config",
|
||||
}
|
||||
|
||||
|
||||
class ValidationError:
|
||||
def __init__(self, path: str, message: str, severity: str = "error"):
|
||||
self.path = path
|
||||
self.message = message
|
||||
self.severity = severity
|
||||
|
||||
def __str__(self):
|
||||
return f"[{self.severity.upper()}] {self.path}: {self.message}"
|
||||
|
||||
|
||||
def validate_yaml_syntax(content: str) -> Tuple[Optional[Dict], List[ValidationError]]:
|
||||
"""Validate YAML can be parsed."""
|
||||
errors = []
|
||||
|
||||
if not HAS_YAML:
|
||||
return None, [ValidationError("root", "PyYAML not installed", "error")]
|
||||
|
||||
try:
|
||||
data = yaml.safe_load(content)
|
||||
if data is None:
|
||||
return {}, []
|
||||
if not isinstance(data, dict):
|
||||
errors.append(ValidationError("root", f"Expected dict, got {type(data).__name__}", "error"))
|
||||
return None, errors
|
||||
return data, errors
|
||||
except yaml.YAMLError as e:
|
||||
errors.append(ValidationError("syntax", str(e), "error"))
|
||||
return None, errors
|
||||
|
||||
|
||||
def validate_required_keys(data: Dict[str, Any]) -> List[ValidationError]:
|
||||
"""Check required keys exist."""
|
||||
errors = []
|
||||
for key, spec in REQUIRED_KEYS.items():
|
||||
if key not in data:
|
||||
errors.append(ValidationError(key, f"Required key missing: {key}", "error"))
|
||||
elif not isinstance(data[key], spec["type"]):
|
||||
errors.append ValidationError(key, f"Expected {spec['type'].__name__}, got {type(data[key]).__name__}", "error"))
|
||||
return errors
|
||||
|
||||
|
||||
def validate_value_types(data: Dict[str, Any], schema: Dict[str, Dict]) -> List[ValidationError]:
|
||||
"""Check value types match schema."""
|
||||
errors = []
|
||||
for key, spec in schema.items():
|
||||
if key in data:
|
||||
expected_type = spec["type"]
|
||||
actual = data[key]
|
||||
if not isinstance(actual, expected_type):
|
||||
errors.append(ValidationError(key, f"Expected {expected_type.__name__}, got {type(actual).__name__}", "error"))
|
||||
return errors
|
||||
|
||||
|
||||
def validate_no_forbidden_keys(data: Dict[str, Any]) -> List[ValidationError]:
|
||||
"""Check for keys that should not be in config."""
|
||||
errors = []
|
||||
for key, reason in FORBIDDEN_KEYS.items():
|
||||
if key in data:
|
||||
errors.append(ValidationError(key, f"Forbidden key: {reason}", "error"))
|
||||
return errors
|
||||
|
||||
|
||||
def validate_nested(data: Dict[str, Any], path: str = "") -> List[ValidationError]:
|
||||
"""Recursively validate nested structures."""
|
||||
errors = []
|
||||
|
||||
# Check providers dict
|
||||
if "providers" in data and isinstance(data["providers"], dict):
|
||||
for provider_name, provider_config in data["providers"].items():
|
||||
if not isinstance(provider_config, dict):
|
||||
errors.append(ValidationError(f"providers.{provider_name}", "Provider config must be a dict", "error"))
|
||||
|
||||
# Check agent dict
|
||||
if "agent" in data and isinstance(data["agent"], dict):
|
||||
agent = data["agent"]
|
||||
if "max_turns" in agent and not isinstance(agent["max_turns"], int):
|
||||
errors.append(ValidationError("agent.max_turns", "Must be an integer", "error"))
|
||||
if "reasoning_effort" in agent and agent["reasoning_effort"] not in (None, "low", "medium", "high"):
|
||||
errors.append(ValidationError("agent.reasoning_effort", "Must be low/medium/high", "error"))
|
||||
|
||||
# Check toolsets is list of strings
|
||||
if "toolsets" in data and isinstance(data["toolsets"], list):
|
||||
for i, ts in enumerate(data["toolsets"]):
|
||||
if not isinstance(ts, str):
|
||||
errors.append(ValidationError(f"toolsets[{i}]", "Toolset must be a string", "error"))
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def validate_config(content: str) -> Tuple[bool, List[ValidationError]]:
|
||||
"""Full validation pipeline. Returns (valid, errors)."""
|
||||
all_errors = []
|
||||
|
||||
# Step 1: YAML syntax
|
||||
data, errors = validate_yaml_syntax(content)
|
||||
all_errors.extend(errors)
|
||||
if data is None:
|
||||
return False, all_errors
|
||||
|
||||
# Step 2: Required keys
|
||||
all_errors.extend(validate_required_keys(data))
|
||||
|
||||
# Step 3: Value types (required + optional)
|
||||
all_errors.extend(validate_value_types(data, {**REQUIRED_KEYS, **OPTIONAL_KEYS}))
|
||||
|
||||
# Step 4: Forbidden keys
|
||||
all_errors.extend(validate_no_forbidden_keys(data))
|
||||
|
||||
# Step 5: Nested validation
|
||||
all_errors.extend(validate_nested(data))
|
||||
|
||||
# Any errors = invalid
|
||||
has_errors = any(e.severity == "error" for e in all_errors)
|
||||
return not has_errors, all_errors
|
||||
|
||||
|
||||
def print_schema():
|
||||
"""Print expected config schema."""
|
||||
print("Required keys:")
|
||||
for key, spec in REQUIRED_KEYS.items():
|
||||
print(f" {key}: {spec['type'].__name__} — {spec['description']}")
|
||||
|
||||
print("\nOptional keys:")
|
||||
for key, spec in OPTIONAL_KEYS.items():
|
||||
print(f" {key}: {spec['type'].__name__} — {spec['description']}")
|
||||
|
||||
print("\nForbidden keys:")
|
||||
for key, reason in FORBIDDEN_KEYS.items():
|
||||
print(f" {key} — {reason}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Validate timmy-config YAML files")
|
||||
parser.add_argument("file", nargs="?", help="Config file to validate")
|
||||
parser.add_argument("--deploy", action="store_true", help="Validate then deploy (write)")
|
||||
parser.add_argument("--schema", action="store_true", help="Show expected schema")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.schema:
|
||||
print_schema()
|
||||
return
|
||||
|
||||
if not args.file:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
path = Path(args.file)
|
||||
if not path.exists():
|
||||
print(f"Error: {path} not found")
|
||||
sys.exit(1)
|
||||
|
||||
content = path.read_text()
|
||||
valid, errors = validate_config(content)
|
||||
|
||||
if errors:
|
||||
print(f"Validation results for {path}:")
|
||||
for err in errors:
|
||||
print(f" {err}")
|
||||
print()
|
||||
|
||||
if valid:
|
||||
print(f"✓ {path} is valid")
|
||||
if args.deploy:
|
||||
print(f"Deploying {path}...")
|
||||
# In real usage, this would write to ~/.hermes/config.yaml
|
||||
print("Deploy complete.")
|
||||
else:
|
||||
print(f"✗ {path} has {sum(1 for e in errors if e.severity == 'error')} error(s)")
|
||||
print("Fix errors before deploying.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
72
tests/test_config_validate.py
Normal file
72
tests/test_config_validate.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""Tests for config validation (#690)."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from scripts.config_validate import validate_config, ValidationError
|
||||
|
||||
|
||||
def test_valid_config():
|
||||
content = "model: gpt-4\nprovider: openai\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert valid
|
||||
assert len(errors) == 0
|
||||
|
||||
|
||||
def test_missing_required_key():
|
||||
content = "provider: openai\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
assert any("model" in e.path for e in errors)
|
||||
|
||||
|
||||
def test_wrong_type():
|
||||
content = "model: 123\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
assert any("model" in e.path for e in errors)
|
||||
|
||||
|
||||
def test_forbidden_key():
|
||||
content = "model: gpt-4\npassword: secret123\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
assert any("password" in e.path for e in errors)
|
||||
|
||||
|
||||
def test_invalid_yaml():
|
||||
content = "model: gpt-4\n bad indentation\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
|
||||
|
||||
def test_nested_validation():
|
||||
content = "model: gpt-4\nagent:\n max_turns: not_a_number\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
assert any("max_turns" in e.path for e in errors)
|
||||
|
||||
|
||||
def test_toolsets_validation():
|
||||
content = "model: gpt-4\ntoolsets:\n - web\n - 123\n"
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
|
||||
|
||||
def test_empty_file():
|
||||
content = ""
|
||||
valid, errors = validate_config(content)
|
||||
assert not valid
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_valid_config, test_missing_required_key, test_wrong_type,
|
||||
test_forbidden_key, test_invalid_yaml, test_nested_validation,
|
||||
test_toolsets_validation, test_empty_file]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
269
tests/test_validate_scene_data.py
Normal file
269
tests/test_validate_scene_data.py
Normal file
@@ -0,0 +1,269 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for scene description data validator (#647).
|
||||
|
||||
Tests the validate-scene-data.py script against valid and invalid JSONL entries.
|
||||
"""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
|
||||
from validate_scene_data import validate_entry_manual, load_schema, validate_file
|
||||
|
||||
|
||||
SCHEMA_PATH = Path(__file__).resolve().parent.parent / "training-data" / "schema.json"
|
||||
|
||||
|
||||
def _valid_entry(**overrides):
|
||||
"""Create a valid scene description entry with optional overrides."""
|
||||
entry = {
|
||||
"song": "Thunder Road",
|
||||
"artist": "Heartland",
|
||||
"beat": 1,
|
||||
"timestamp": "0:00",
|
||||
"duration": "30s",
|
||||
"lyric_line": "The screen door slams",
|
||||
"scene": {
|
||||
"mood": "hope",
|
||||
"colors": ["gold", "sky blue", "white"],
|
||||
"composition": "wide shot",
|
||||
"camera": "static",
|
||||
"description": "Open horizon. Golden light breaking through clouds. A figure stands silhouetted.",
|
||||
},
|
||||
}
|
||||
entry.update(overrides)
|
||||
return entry
|
||||
|
||||
|
||||
class TestValidEntry(unittest.TestCase):
|
||||
"""Valid entries should produce no errors."""
|
||||
|
||||
def test_minimal_valid(self):
|
||||
errors = validate_entry_manual(_valid_entry(), 0)
|
||||
self.assertEqual(errors, [])
|
||||
|
||||
def test_with_optional_fields(self):
|
||||
entry = _valid_entry(mood_arc="rising", duration_seconds=30)
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertEqual(errors, [])
|
||||
|
||||
def test_all_composition_types(self):
|
||||
for comp in ["wide shot", "close-up", "over the shoulder", "low angle", "bird\'s eye"]:
|
||||
errors = validate_entry_manual(_valid_entry(composition=comp), 0)
|
||||
self.assertEqual(errors, [], f"Failed for composition: {comp}")
|
||||
|
||||
|
||||
class TestMissingRequiredFields(unittest.TestCase):
|
||||
"""Missing required fields should be caught."""
|
||||
|
||||
def test_missing_song(self):
|
||||
entry = _valid_entry()
|
||||
del entry["song"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("song" in e for e in errors))
|
||||
|
||||
def test_missing_artist(self):
|
||||
entry = _valid_entry()
|
||||
del entry["artist"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("artist" in e for e in errors))
|
||||
|
||||
def test_missing_beat(self):
|
||||
entry = _valid_entry()
|
||||
del entry["beat"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("beat" in e for e in errors))
|
||||
|
||||
def test_missing_timestamp(self):
|
||||
entry = _valid_entry()
|
||||
del entry["timestamp"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("timestamp" in e for e in errors))
|
||||
|
||||
def test_missing_lyric_line(self):
|
||||
entry = _valid_entry()
|
||||
del entry["lyric_line"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("lyric_line" in e for e in errors))
|
||||
|
||||
def test_missing_scene(self):
|
||||
entry = _valid_entry()
|
||||
del entry["scene"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("scene" in e for e in errors))
|
||||
|
||||
def test_missing_scene_mood(self):
|
||||
entry = _valid_entry()
|
||||
del entry["scene"]["mood"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("scene.mood" in e or "mood" in e for e in errors))
|
||||
|
||||
def test_missing_scene_colors(self):
|
||||
entry = _valid_entry()
|
||||
del entry["scene"]["colors"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("scene.colors" in e or "colors" in e for e in errors))
|
||||
|
||||
def test_missing_scene_description(self):
|
||||
entry = _valid_entry()
|
||||
del entry["scene"]["description"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("scene.description" in e or "description" in e for e in errors))
|
||||
|
||||
|
||||
class TestTypeValidation(unittest.TestCase):
|
||||
"""Wrong types should be caught."""
|
||||
|
||||
def test_song_not_string(self):
|
||||
errors = validate_entry_manual(_valid_entry(song=42), 0)
|
||||
self.assertTrue(any("song" in e and "string" in e for e in errors))
|
||||
|
||||
def test_beat_not_integer(self):
|
||||
errors = validate_entry_manual(_valid_entry(beat="one"), 0)
|
||||
self.assertTrue(any("beat" in e for e in errors))
|
||||
|
||||
def test_beat_zero(self):
|
||||
errors = validate_entry_manual(_valid_entry(beat=0), 0)
|
||||
self.assertTrue(any("beat" in e for e in errors))
|
||||
|
||||
def test_colors_not_array(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["colors"] = "red"
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("colors" in e and "array" in e for e in errors))
|
||||
|
||||
def test_colors_empty_array(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["colors"] = []
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("colors" in e for e in errors))
|
||||
|
||||
def test_scene_not_object(self):
|
||||
errors = validate_entry_manual(_valid_entry(scene="not an object"), 0)
|
||||
self.assertTrue(any("scene" in e and "object" in e for e in errors))
|
||||
|
||||
def test_timestamp_bad_format(self):
|
||||
errors = validate_entry_manual(_valid_entry(timestamp="abc"), 0)
|
||||
self.assertTrue(any("timestamp" in e for e in errors))
|
||||
|
||||
def test_timestamp_valid_formats(self):
|
||||
for ts in ["0:00", "1:30", "12:45", "99:59"]:
|
||||
errors = validate_entry_manual(_valid_entry(timestamp=ts), 0)
|
||||
self.assertEqual(errors, [], f"Failed for timestamp: {ts}")
|
||||
|
||||
|
||||
class TestEmptyValues(unittest.TestCase):
|
||||
"""Empty strings should be caught."""
|
||||
|
||||
def test_empty_song(self):
|
||||
errors = validate_entry_manual(_valid_entry(song=""), 0)
|
||||
self.assertTrue(any("song" in e and "empty" in e for e in errors))
|
||||
|
||||
def test_empty_artist(self):
|
||||
errors = validate_entry_manual(_valid_entry(artist=""), 0)
|
||||
self.assertTrue(any("artist" in e and "empty" in e for e in errors))
|
||||
|
||||
def test_empty_lyric_line(self):
|
||||
errors = validate_entry_manual(_valid_entry(lyric_line=""), 0)
|
||||
self.assertTrue(any("lyric_line" in e and "empty" in e for e in errors))
|
||||
|
||||
def test_empty_scene_mood(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["mood"] = ""
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("mood" in e and "empty" in e for e in errors))
|
||||
|
||||
def test_empty_color_in_array(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["colors"] = ["red", "", "blue"]
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("colors[1]" in e for e in errors))
|
||||
|
||||
|
||||
class TestDescriptionLength(unittest.TestCase):
|
||||
"""Description minimum length check."""
|
||||
|
||||
def test_short_description(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["description"] = "Short"
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
self.assertTrue(any("description" in e and "short" in e for e in errors))
|
||||
|
||||
def test_valid_description_length(self):
|
||||
entry = _valid_entry()
|
||||
entry["scene"]["description"] = "A long and detailed scene description that exceeds ten characters."
|
||||
errors = validate_entry_manual(entry, 0)
|
||||
desc_errors = [e for e in errors if "description" in e.lower()]
|
||||
self.assertEqual(desc_errors, [])
|
||||
|
||||
|
||||
class TestFileValidation(unittest.TestCase):
|
||||
"""Test full-file validation."""
|
||||
|
||||
def _write_jsonl(self, entries):
|
||||
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||
for entry in entries:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
f.close()
|
||||
return Path(f.name)
|
||||
|
||||
def test_valid_file(self):
|
||||
path = self._write_jsonl([_valid_entry(), _valid_entry(beat=2)])
|
||||
try:
|
||||
schema = load_schema(SCHEMA_PATH)
|
||||
total, errors = validate_file(path, schema)
|
||||
self.assertEqual(total, 2)
|
||||
self.assertEqual(errors, [])
|
||||
finally:
|
||||
path.unlink()
|
||||
|
||||
def test_invalid_entries_reported(self):
|
||||
entry_bad = _valid_entry()
|
||||
del entry_bad["song"]
|
||||
path = self._write_jsonl([_valid_entry(), entry_bad])
|
||||
try:
|
||||
schema = load_schema(SCHEMA_PATH)
|
||||
total, errors = validate_file(path, schema)
|
||||
self.assertEqual(total, 2)
|
||||
self.assertGreater(len(errors), 0)
|
||||
self.assertTrue(any("2" in e for e in errors)) # line 2
|
||||
finally:
|
||||
path.unlink()
|
||||
|
||||
def test_malformed_json(self):
|
||||
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||
f.write("{invalid json\n")
|
||||
f.write(json.dumps(_valid_entry()) + "\n")
|
||||
f.close()
|
||||
path = Path(f.name)
|
||||
try:
|
||||
schema = load_schema(SCHEMA_PATH)
|
||||
total, errors = validate_file(path, schema)
|
||||
self.assertEqual(total, 2)
|
||||
self.assertGreater(len(errors), 0)
|
||||
self.assertTrue(any("JSON" in e for e in errors))
|
||||
finally:
|
||||
path.unlink()
|
||||
|
||||
def test_empty_lines_ignored(self):
|
||||
f = tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False)
|
||||
f.write(json.dumps(_valid_entry()) + "\n\n\n")
|
||||
f.write(json.dumps(_valid_entry()) + "\n")
|
||||
f.close()
|
||||
path = Path(f.name)
|
||||
try:
|
||||
schema = load_schema(SCHEMA_PATH)
|
||||
total, errors = validate_file(path, schema)
|
||||
self.assertEqual(total, 2)
|
||||
self.assertEqual(errors, [])
|
||||
finally:
|
||||
path.unlink()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user