Compare commits

..

1 Commits

Author SHA1 Message Date
bca52ea61e fix: add timestamp-based staleness detection for pipeline states
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 35s
Smoke Test / smoke (pull_request) Failing after 26s
Validate Config / YAML Lint (pull_request) Failing after 21s
Validate Config / JSON Validate (pull_request) Successful in 14s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 2m0s
Validate Config / Shell Script Lint (pull_request) Failing after 50s
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 20s
Validate Config / Playbook Schema Validation (pull_request) Successful in 34s
PR Checklist / pr-checklist (pull_request) Failing after 11m9s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Closes #676, fixes #650

- is_pipeline_complete: stale if >24h old
- is_pipeline_running: stale if >6h old (likely crashed)
- reset_stale_states: purges stale entries at scheduler start
- --status shows (stale) indicator for outdated states
2026-04-15 03:27:39 +00:00
3 changed files with 90 additions and 299 deletions

View File

@@ -1,83 +0,0 @@
#!/usr/bin/env python3
"""
Config Validator -- pre-deploy YAML validation for timmy-config sidecar.
Validates YAML syntax, required keys (model.default, model.provider,
toolsets), and provider names before deploy.sh writes to ~/.hermes/.
Usage:
python3 bin/validate_config.py [path/to/config.yaml]
python3 bin/validate_config.py --strict (fail on warnings too)
"""
import json, os, sys, yaml
from pathlib import Path
REQUIRED = {
"model": {"type": dict, "keys": {"default": str, "provider": str}},
"toolsets": {"type": list},
}
ALLOWED_PROVIDERS = [
"anthropic", "openai", "nous", "ollama", "openrouter", "openai-codex"
]
def validate(path):
errors = []
try:
with open(path) as f:
data = yaml.safe_load(f)
except Exception as e:
return [f"YAML parse error: {e}"]
if not isinstance(data, dict):
return [f"Expected mapping, got {type(data).__name__}"]
for key, spec in REQUIRED.items():
if key not in data:
errors.append(f"Required key missing: {key}")
continue
if spec["type"] == dict and not isinstance(data[key], dict):
errors.append(f"{key}: expected dict")
continue
if spec["type"] == list and not isinstance(data[key], list):
errors.append(f"{key}: expected list")
continue
if "keys" in spec:
for sub, sub_type in spec["keys"].items():
if sub not in data[key]:
errors.append(f"{key}.{sub}: required")
elif not isinstance(data[key][sub], sub_type):
errors.append(f"{key}.{sub}: expected {sub_type.__name__}")
provider = data.get("model", {}).get("provider")
if provider and provider not in ALLOWED_PROVIDERS:
errors.append(f"model.provider: unknown provider '{provider}'")
# Check JSON files
for jf in ["channel_directory.json"]:
jp = Path(path).parent / jf
if jp.exists():
try:
json.loads(jp.read_text())
except Exception as e:
errors.append(f"{jf}: invalid JSON: {e}")
return errors
def main():
strict = "--strict" in sys.argv
args = [a for a in sys.argv[1:] if not a.startswith("--")]
path = args[0] if args else str(Path(__file__).parent.parent / "config.yaml")
if not os.path.exists(path):
print(f"ERROR: {path} not found")
sys.exit(1)
errs = validate(path)
if errs:
for e in errs:
print(f"ERROR: {e}")
print(f"Validation FAILED: {len(errs)} issue(s)")
sys.exit(1)
print(f"OK: {path} is valid")
if __name__ == "__main__":
main()

View File

@@ -117,14 +117,26 @@ is_pipeline_complete() {
local pipeline="$1"
python3 -c "
import json, os
from datetime import datetime, timezone, timedelta
path = '$STATE_FILE'
if not os.path.exists(path):
print('false')
else:
with open(path) as f:
d = json.load(f)
state = d.get('$pipeline', {}).get('state', 'not_started')
print('true' if state == 'complete' else 'false')
entry = d.get('$pipeline', {})
state = entry.get('state', 'not_started')
if state != 'complete':
print('false')
else:
# Check staleness: complete from a previous day is stale
updated = entry.get('updated', '')
try:
t = datetime.fromisoformat(updated.replace('Z', '+00:00'))
is_today = (datetime.now(timezone.utc) - t) < timedelta(hours=24)
print('true' if is_today else 'false')
except Exception:
print('false')
" 2>/dev/null || echo false
}
@@ -132,17 +144,64 @@ is_pipeline_running() {
local pipeline="$1"
python3 -c "
import json, os
from datetime import datetime, timezone, timedelta
path = '$STATE_FILE'
if not os.path.exists(path):
print('false')
else:
with open(path) as f:
d = json.load(f)
state = d.get('$pipeline', {}).get('state', 'not_started')
print('true' if state == 'running' else 'false')
entry = d.get('$pipeline', {})
state = entry.get('state', 'not_started')
if state != 'running':
print('false')
else:
# Running for >6 hours is likely crashed
updated = entry.get('updated', '')
try:
t = datetime.fromisoformat(updated.replace('Z', '+00:00'))
stale = (datetime.now(timezone.utc) - t) > timedelta(hours=6)
print('false' if stale else 'true')
except Exception:
print('true')
" 2>/dev/null || echo false
}
reset_stale_states() {
python3 -c "
import json, os
from datetime import datetime, timezone, timedelta
path = '\$STATE_FILE'
if not os.path.exists(path):
exit(0)
with open(path) as f:
d = json.load(f)
stale = []
for name, entry in list(d.items()):
if not isinstance(entry, dict) or 'state' not in entry:
continue
state = entry['state']
updated = entry.get('updated', '')
try:
t = datetime.fromisoformat(updated.replace('Z', '+00:00'))
except Exception:
stale.append(name)
continue
age = datetime.now(timezone.utc) - t
if state in ('complete', 'failed') and age > timedelta(hours=24):
stale.append(name)
elif state == 'running' and age > timedelta(hours=6):
stale.append(name)
for name in stale:
del d[name]
if stale:
with open(path, 'w') as f:
json.dump(d, f, indent=2)
print('Reset stale pipelines: ' + ', '.join(stale))
" 2>/dev/null
}
check_dependency() {
local dep="$1"
if [[ "$dep" == "none" ]]; then
@@ -310,22 +369,44 @@ main() {
exit 0
fi
# Reset stale pipeline states from previous days
reset_stale_states
# Check 5: Pipeline status
if [[ "$mode" == "--status" ]]; then
echo -e "${CYAN}Pipeline Status:${NC}"
echo "────────────────────────────────────────────────────"
for entry in "${PIPELINES[@]}"; do
IFS='|' read -r name script max_tokens dep <<< "$entry"
local state=$(python3 -c "
local state_info=$(python3 -c "
import json, os
from datetime import datetime, timezone, timedelta
path = '$STATE_FILE'
if not os.path.exists(path):
print('not_started')
print('not_started|')
else:
with open(path) as f:
d = json.load(f)
print(d.get('$name', {}).get('state', 'not_started'))
" 2>/dev/null || echo "not_started")
entry = d.get('$name', {})
state = entry.get('state', 'not_started')
updated = entry.get('updated', '')
stale = ''
try:
t = datetime.fromisoformat(updated.replace('Z', '+00:00'))
age = datetime.now(timezone.utc) - t
if state in ('complete', 'failed') and age > timedelta(hours=24):
stale = 'stale'
elif state == 'running' and age > timedelta(hours=6):
stale = 'stale'
except Exception:
pass
print(state + '|' + stale)
" 2>/dev/null || echo "not_started|")
local state="${state_info%%|*}"
local stale="${state_info##*|}"
local suffix=""
[[ -n "$stale" ]] && suffix=" (stale)"
local color=$NC
case "$state" in
@@ -333,7 +414,7 @@ else:
complete) color=$GREEN ;;
failed) color=$RED ;;
esac
printf " %-25s %b%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$NC" "$max_tokens" "$dep"
printf " %-25s %b%s%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$suffix" "$NC" "$max_tokens" "$dep"
done
echo "────────────────────────────────────────────────────"
echo " Budget: $budget / $TOKEN_DAILY_LIMIT tokens remaining"

View File

@@ -1,207 +0,0 @@
#!/usr/bin/env python3
"""
validate-sidecar-config.py — Pre-deploy validation for timmy-config sidecar.
Validates YAML syntax, required keys, value types before deploy.
Rejects bad config with clear errors.
Usage:
python3 scripts/validate-sidecar-config.py ~/.timmy/config.yaml
python3 scripts/validate-sidecar-config.py --all # Validate all config files
python3 scripts/validate-sidecar-config.py --schema # Print expected schema
"""
import argparse
import json
import os
import sys
from pathlib import Path
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
# Expected schema: field -> type
REQUIRED_SCHEMA = {
"model": str,
"provider": str,
}
OPTIONAL_SCHEMA = {
"api_base": str,
"max_tokens": int,
"temperature": (int, float),
"system_prompt": str,
"tools": list,
"memory_enabled": bool,
"session_timeout": int,
"log_level": str,
}
CONFIG_DIRS = [
Path.home() / ".timmy",
Path.home() / ".hermes",
]
def validate_yaml_syntax(filepath: Path) -> list[str]:
"""Validate YAML can be parsed."""
errors = []
try:
content = filepath.read_text(errors="ignore")
if HAS_YAML:
yaml.safe_load(content)
else:
# Fallback: check for basic JSON-in-YAML
json.loads(content)
except (yaml.YAMLError if HAS_YAML else Exception, json.JSONDecodeError) as e:
errors.append(f"YAML syntax error: {e}")
except OSError as e:
errors.append(f"Cannot read file: {e}")
return errors
def validate_schema(filepath: Path) -> list[str]:
"""Validate config against expected schema."""
errors = []
try:
content = filepath.read_text(errors="ignore")
if HAS_YAML:
config = yaml.safe_load(content) or {}
else:
config = json.loads(content)
except Exception as e:
return [f"Cannot parse: {e}"]
if not isinstance(config, dict):
return ["Config must be a YAML/JSON object (dict)"]
# Check required keys
for key, expected_type in REQUIRED_SCHEMA.items():
if key not in config:
errors.append(f"Missing required key: '{key}' (expected {expected_type.__name__})")
elif not isinstance(config[key], expected_type):
errors.append(f"Wrong type for '{key}': expected {expected_type.__name__}, got {type(config[key]).__name__}")
# Check optional keys types
for key, expected_type in OPTIONAL_SCHEMA.items():
if key in config:
if isinstance(expected_type, tuple):
if not isinstance(config[key], expected_type):
type_names = " or ".join(t.__name__ for t in expected_type)
errors.append(f"Wrong type for '{key}': expected {type_names}, got {type(config[key]).__name__}")
else:
if not isinstance(config[key], expected_type):
errors.append(f"Wrong type for '{key}': expected {expected_type.__name__}, got {type(config[key]).__name__}")
# Check for common mistakes
if "model" in config and isinstance(config["model"], str):
if config["model"].startswith("http"):
errors.append("'model' looks like a URL — did you mean 'api_base'?")
if "api_base" in config and isinstance(config["api_base"], str):
if not config["api_base"].startswith("http"):
errors.append("'api_base' should start with http:// or https://")
return errors
def validate_file(filepath: Path) -> tuple[bool, list[str]]:
"""Full validation of a config file. Returns (valid, errors)."""
errors = []
errors.extend(validate_yaml_syntax(filepath))
if not errors:
errors.extend(validate_schema(filepath))
return len(errors) == 0, errors
def find_config_files() -> list[Path]:
"""Find config files in standard locations."""
configs = []
for d in CONFIG_DIRS:
if not d.exists():
continue
for f in d.rglob("*.yaml"):
if f.name in ("config.yaml", "settings.yaml", "env.yaml", "config.yml"):
configs.append(f)
for f in d.rglob("*.yml"):
if "config" in f.name.lower():
configs.append(f)
for f in d.rglob("*.json"):
if f.name in ("config.json", "settings.json"):
configs.append(f)
return sorted(set(configs))
def cmd_validate(filepath: str) -> bool:
path = Path(filepath)
if not path.exists():
print(f"ERROR: {path} not found")
return False
valid, errors = validate_file(path)
if valid:
print(f"OK: {path}")
else:
print(f"FAIL: {path}")
for e in errors:
print(f" - {e}")
return valid
def cmd_validate_all() -> bool:
configs = find_config_files()
if not configs:
print("No config files found in standard locations.")
return True
all_valid = True
for config in configs:
valid, errors = validate_file(config)
if valid:
print(f"OK: {config}")
else:
all_valid = False
print(f"FAIL: {config}")
for e in errors:
print(f" - {e}")
print(f"\n{'All configs valid.' if all_valid else 'Validation failures found.'}")
return all_valid
def cmd_schema():
print("Required keys:")
for key, typ in REQUIRED_SCHEMA.items():
print(f" {key}: {typ.__name__}")
print("\nOptional keys:")
for key, typ in OPTIONAL_SCHEMA.items():
if isinstance(typ, tuple):
print(f" {key}: {'|'.join(t.__name__ for t in typ)}")
else:
print(f" {key}: {typ.__name__}")
def main():
parser = argparse.ArgumentParser(description="Validate sidecar config files")
parser.add_argument("file", nargs="?", help="Config file to validate")
parser.add_argument("--all", action="store_true", help="Validate all config files")
parser.add_argument("--schema", action="store_true", help="Print expected schema")
args = parser.parse_args()
if args.schema:
cmd_schema()
elif args.all:
ok = cmd_validate_all()
sys.exit(0 if ok else 1)
elif args.file:
ok = cmd_validate(args.file)
sys.exit(0 if ok else 1)
else:
parser.print_help()
if __name__ == "__main__":
main()