Compare commits

..

1 Commits

Author SHA1 Message Date
223e1e5543 feat: add training data quality filter script (#687)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 10s
Smoke Test / smoke (pull_request) Failing after 19s
PR Checklist / pr-checklist (pull_request) Failing after 4m23s
Validate Config / YAML Lint (pull_request) Failing after 14s
Validate Config / JSON Validate (pull_request) Successful in 21s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m20s
Validate Config / Shell Script Lint (pull_request) Failing after 1m8s
Validate Config / Cron Syntax Check (pull_request) Successful in 10s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 15s
Validate Config / Playbook Schema Validation (pull_request) Successful in 22s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-15 03:15:22 +00:00
2 changed files with 266 additions and 150 deletions

View File

@@ -1,150 +0,0 @@
#!/usr/bin/env python3
"""
sidecar_validator.py - Pre-deploy validation for timmy-config sidecar configs.
Validates YAML/JSON configs against expected schemas before deploy.
Usage:
python3 scripts/sidecar_validator.py [config_path ...]
python3 scripts/sidecar_validator.py --all
python3 scripts/sidecar_validator.py --pre-deploy
"""
import json, os, sys
from pathlib import Path
try:
import yaml
except ImportError:
print("ERROR: PyYAML not installed.", file=sys.stderr); sys.exit(2)
SCHEMAS = {
"wizard_config": {
"description": "Wizard agent configuration",
"required": {"model": dict},
"optional": {"toolsets": list, "fallback_providers": list, "agent": dict, "providers": dict, "terminal": dict, "browser": dict, "compression": dict, "auxiliary": dict},
"nested_required": {"model": {"default": str, "provider": str}},
"nested_optional": {"model": {"fallback": str}, "agent": {"max_turns": (int, float), "reasoning_effort": str, "verbose": bool}},
},
"sidecar_config": {
"description": "Sidecar agent configuration",
"required": {"name": str, "role": str},
"optional": {"capabilities": list, "instructions": str, "model": str, "provider": str, "toolsets": list},
},
"main_config": {
"description": "Main hermes configuration",
"required": {"model": dict},
"optional": {"toolsets": list, "agent": dict, "terminal": dict, "browser": dict, "checkpoints": dict, "compression": dict, "auxiliary": dict, "fallback_providers": list, "providers": dict},
"nested_required": {"model": {"default": str, "provider": str}},
},
"cron_pipeline": {
"description": "Cron/pipeline schedule",
"required": {"name": str},
"optional": {"schedule": str, "cron": str, "tasks": list, "steps": list, "prompt": str, "model": dict},
},
"playbook": {
"description": "Agent playbook",
"required": {"name": str},
"optional": {"description": str, "model": str, "steps": list, "prompt": str},
},
}
def classify_config(filepath):
parts, name = filepath.parts, filepath.name
if "wizards" in parts and "-sidecar." in name: return "sidecar_config"
if "wizards" in parts and name in ("config.yaml", "config.yml"): return "wizard_config"
if name == "config.yaml" and len(parts) <= 2: return "main_config"
if "cron" in parts and name.endswith((".yml", ".yaml")): return "cron_pipeline"
if "playbooks" in parts and name.endswith((".yaml", ".yml")): return "playbook"
return None
def type_name(t):
if isinstance(t, tuple): return " or ".join(tt.__name__ for tt in t)
return t.__name__
def validate_config(data, schema_name, schema):
errors = []
for key, expected_type in schema["required"].items():
if key not in data:
errors.append(f"missing required key: '{key}' (expected {type_name(expected_type)})")
elif not isinstance(data[key], expected_type):
errors.append(f"'{key}' wrong type: got {type(data[key]).__name__}, expected {type_name(expected_type)}")
for pk, cs in schema.get("nested_required", {}).items():
if pk in data and isinstance(data[pk], dict):
for ck, et in cs.items():
if ck not in data[pk]:
errors.append(f"'{pk}' missing key: '{ck}'")
elif not isinstance(data[pk][ck], et):
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
for pk, cs in schema.get("nested_optional", {}).items():
if pk in data and isinstance(data[pk], dict):
for ck, et in cs.items():
if ck in data[pk] and not isinstance(data[pk][ck], et):
errors.append(f"'{pk}.{ck}' wrong type: {type(data[pk][ck]).__name__}")
if schema_name == "wizard_config" and "fallback_providers" in data and isinstance(data["fallback_providers"], list):
for i, fb in enumerate(data["fallback_providers"]):
if not isinstance(fb, dict):
errors.append(f"fallback_providers[{i}]: expected dict")
elif "provider" not in fb: errors.append(f"fallback_providers[{i}]: missing 'provider'")
elif "model" not in fb: errors.append(f"fallback_providers[{i}]: missing 'model'")
if schema_name == "sidecar_config" and "capabilities" in data:
if not isinstance(data["capabilities"], list):
errors.append(f"'capabilities' must be a list")
if schema_name == "cron_pipeline":
s = data.get("schedule") or data.get("cron", "")
if isinstance(s, str) and s.strip() and len(s.strip().split()) != 5:
errors.append(f"schedule has {len(s.strip().split())} fields, expected 5")
return errors
def validate_file(filepath):
schema_name = classify_config(filepath)
if schema_name is None: return True, []
schema = SCHEMAS[schema_name]
try: text = filepath.read_text(encoding="utf-8", errors="replace")
except Exception as e: return False, [f"cannot read: {e}"]
try:
data = json.loads(text) if filepath.suffix == ".json" else yaml.safe_load(text)
except Exception as e: return False, [f"parse error: {e}"]
if not isinstance(data, dict): return False, [f"expected mapping, got {type(data).__name__}"]
errors = validate_config(data, schema_name, schema)
return len(errors) == 0, errors
def find_deploy_targets(root):
targets = []
for p in ["config.yaml", "wizards/*/config.yaml", "wizards/*-sidecar.json"]:
targets.extend(root.glob(p))
return sorted(targets)
def find_all_configs(root):
skip = {".git", "node_modules", "venv", "__pycache__"}
results = []
for dp, dns, fns in os.walk(root):
dns[:] = [d for d in dns if d not in skip]
for fn in fns:
if fn.endswith((".yaml", ".yml", ".json")):
fp = Path(dp) / fn
if classify_config(fp) is not None: results.append(fp)
return sorted(results)
def main():
root = Path(__file__).resolve().parent.parent
args = sys.argv[1:]
if not args or args == ["--help"]: print(__doc__); sys.exit(2)
if "--all" in args: files = find_all_configs(root); mode = "all configs"
elif "--pre-deploy" in args: files = find_deploy_targets(root); mode = "deploy targets"
else: files = [Path(a) for a in args if not a.startswith("-")]; mode = "specified files"
if not files: print(f"No files found: {mode}"); sys.exit(0)
print(f"Sidecar Validator - {mode}"); print("=" * 60)
total = 0; failed = []
for fp in files:
rel = fp.relative_to(root) if fp.is_absolute() else fp
sn = classify_config(fp) or "unknown"
ok, errs = validate_file(fp)
if ok: print(f"PASS {rel} [{sn}]")
else: failed.append((rel, sn, errs)); total += len(errs); print(f"FAIL {rel} [{sn}]")
print(); print("=" * 60)
print(f"Results: {len(files)-len(failed)}/{len(files)} valid")
if failed:
print(f"\n{total} error(s) in {len(failed)} file(s):\n")
for rel, sn, errs in failed:
print(f" {rel} ({sn}):")
for e in errs: print(f" - {e}")
print(); sys.exit(1)
else: print("\nAll configs valid!"); sys.exit(0)
if __name__ == "__main__": main()

View File

@@ -0,0 +1,266 @@
#!/usr/bin/env python3
"""
[QUALITY] Training Data Quality Filter
Part of the Timmy Foundation tooling.
Scores and filters JSONL training pairs on specificity, length ratio,
and code correctness. Removes low-quality pairs and reports results.
Usage:
python3 scripts/training_quality_filter.py input.jsonl -o filtered.jsonl
python3 scripts/training_quality_filter.py input.jsonl --threshold 0.4
cat input.jsonl | python3 scripts/training_quality_filter.py -
"""
import sys
import json
import argparse
import re
from typing import Dict, Any, Tuple
DEFAULT_THRESHOLD = 0.35
MIN_TERSE_LEN = 3
MIN_RICH_LEN = 10
def score_specificity(terse: str, rich: str) -> float:
"""Score how specific the rich response is vs the terse prompt.
Higher score = more specific, actionable detail in the rich version.
"""
if not terse or not rich:
return 0.0
# Ratio of unique words (higher = more varied/specific language)
rich_words = rich.lower().split()
terse_words = terse.lower().split()
if len(rich_words) < 3:
return 0.1
unique_ratio = len(set(rich_words)) / len(rich_words)
# Check for concrete details: numbers, file paths, commands, code refs
concrete_patterns = [
r"\b\d+\b", # numbers
r"[/\\]\w+", # file paths
r"`[^`]+`", # inline code
r"\b(fix|add|remove|update|create|delete|check|run|use)\b", # action verbs
]
concrete_count = sum(
len(re.findall(p, rich, re.IGNORECASE)) for p in concrete_patterns
)
concrete_score = min(concrete_count / 5.0, 1.0)
# Length expansion ratio (rich should be meaningfully longer than terse)
expansion = len(rich_words) / max(len(terse_words), 1)
expansion_score = min(expansion / 5.0, 1.0)
return round(0.3 * unique_ratio + 0.4 * concrete_score + 0.3 * expansion_score, 3)
def score_length_ratio(terse: str, rich: str) -> float:
"""Score the length ratio between terse and rich.
Too short rich = low quality. Too long = possibly padded.
Sweet spot: 3-15x expansion.
"""
if not terse or not rich:
return 0.0
t_len = len(terse.split())
r_len = len(rich.split())
if t_len < MIN_TERSE_LEN or r_len < MIN_RICH_LEN:
return 0.1
ratio = r_len / max(t_len, 1)
if ratio < 1.5:
return 0.2 # barely expanded
elif ratio < 3.0:
return 0.5 # some expansion
elif ratio <= 15.0:
return 1.0 # good expansion
elif ratio <= 30.0:
return 0.7 # possibly padded
else:
return 0.4 # very padded
def score_code_correctness(terse: str, rich: str) -> float:
"""Score code blocks in the rich response for basic correctness.
Checks for matching brackets, valid-looking syntax patterns.
"""
if not rich:
return 0.5 # no code = neutral
code_blocks = re.findall(r"```(?:\w*)\n(.*?)```", rich, re.DOTALL)
if not code_blocks:
return 0.5 # no code blocks = neutral
scores = []
for block in code_blocks:
block_score = 1.0
# Check bracket balance
for open_c, close_c in [("(", ")"), ("[", "]"), ("{", "}")]:
if block.count(open_c) != block.count(close_c):
block_score -= 0.3
# Check for common syntax errors
if re.search(r"def \w+[^:]*\n(?!\s)", block):
block_score -= 0.2 # missing colon or body
# Minimum viable code length
if len(block.strip()) < 10:
block_score -= 0.3
scores.append(max(block_score, 0.0))
return round(sum(scores) / len(scores), 3) if scores else 0.5
def score_pair(pair: Dict[str, Any]) -> Tuple[float, Dict[str, float]]:
"""Score a single training pair. Returns (total_score, breakdown)."""
terse = pair.get("terse", "") or pair.get("prompt", "") or ""
rich = pair.get("rich", "") or pair.get("response", "") or ""
spec = score_specificity(terse, rich)
length = score_length_ratio(terse, rich)
code = score_code_correctness(terse, rich)
# Weighted total
total = round(0.4 * spec + 0.3 * length + 0.3 * code, 3)
return total, {"specificity": spec, "length_ratio": length, "code_correctness": code}
def filter_pairs(input_path: str, output_path: str, threshold: float,
report: bool = False) -> Dict[str, Any]:
"""Filter JSONL training pairs by quality score."""
kept = []
removed = []
errors = 0
source = sys.stdin if input_path == "-" else open(input_path, "r")
try:
for line_num, line in enumerate(source, 1):
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
errors += 1
continue
score, breakdown = score_pair(pair)
entry = {**pair, "_quality_score": score, "_quality_breakdown": breakdown}
if score >= threshold:
kept.append(entry)
else:
removed.append(entry)
finally:
if source is not sys.stdin:
source.close()
# Write filtered output
if output_path:
out = sys.stdout if output_path == "-" else open(output_path, "w")
try:
for pair in kept:
# Strip internal scoring fields before output
clean = {k: v for k, v in pair.items() if not k.startswith("_quality")}
out.write(json.dumps(clean, ensure_ascii=False) + "\n")
finally:
if out is not sys.stdin:
out.close()
result = {
"total": len(kept) + len(removed),
"kept": len(kept),
"filtered_out": len(removed),
"errors": errors,
"threshold": threshold,
"filter_rate": round(len(removed) / max(len(kept) + len(removed), 1) * 100, 1),
}
if report and removed:
# Show worst offenders
removed_sorted = sorted(removed, key=lambda x: x["_quality_score"])
result["worst_5"] = [
{
"score": e["_quality_score"],
"terse": (e.get("terse", "") or e.get("prompt", ""))[:80],
"breakdown": e["_quality_breakdown"],
}
for e in removed_sorted[:5]
]
return result
def main():
parser = argparse.ArgumentParser(description="Filter training data pairs by quality")
parser.add_argument("input", help="Input JSONL file (use - for stdin)")
parser.add_argument("-o", "--output", default="-", help="Output JSONL file (default: stdout)")
parser.add_argument("-t", "--threshold", type=float, default=DEFAULT_THRESHOLD,
help=f"Quality threshold (0.0-1.0, default: {DEFAULT_THRESHOLD})")
parser.add_argument("--report", action="store_true", help="Show quality report")
parser.add_argument("--dry-run", action="store_true", help="Score only, dont filter")
args = parser.parse_args()
if args.dry_run:
# Just score and report, no filtering
source = sys.stdin if args.input == "-" else open(args.input, "r")
scores = []
try:
for line in source:
line = line.strip()
if not line:
continue
try:
pair = json.loads(line)
except json.JSONDecodeError:
continue
score, breakdown = score_pair(pair)
scores.append(score)
finally:
if source is not sys.stdin:
source.close()
if scores:
avg = sum(scores) / len(scores)
below = sum(1 for s in scores if s < args.threshold)
print(f"Total pairs: {len(scores)}")
print(f"Average score: {avg:.3f}")
print(f"Below threshold ({args.threshold}): {below} ({below/len(scores)*100:.1f}%)")
print(f"Min: {min(scores):.3f} Max: {max(scores):.3f} Median: {sorted(scores)[len(scores)//2]:.3f}")
return
result = filter_pairs(args.input, args.output, args.threshold, report=args.report)
print(f"Training Data Quality Filter", file=sys.stderr)
print(f"{'='*40}", file=sys.stderr)
print(f"Total pairs: {result['total']}", file=sys.stderr)
print(f"Kept: {result['kept']}", file=sys.stderr)
print(f"Filtered out: {result['filtered_out']} ({result['filter_rate']}%)", file=sys.stderr)
print(f"Errors: {result['errors']}", file=sys.stderr)
print(f"Threshold: {result['threshold']}", file=sys.stderr)
if args.report and "worst_5" in result:
print(f"\nWorst 5 pairs:", file=sys.stderr)
for w in result["worst_5"]:
terse_preview = w["terse"][:60]
print(f" [{w['score']:.3f}] {terse_preview}...", file=sys.stderr)
bd = w["breakdown"]
print(f" spec={bd['specificity']} length={bd['length_ratio']} code={bd['code_correctness']}", file=sys.stderr)
if __name__ == "__main__":
main()