Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fe22dbfcc9 |
274
orchestration.py
274
orchestration.py
@@ -1,274 +1,6 @@
|
||||
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
|
||||
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
|
||||
|
||||
Pipeline tasks automatically track token usage via token_budget.py.
|
||||
After each task completes, the Huey signal records usage for the pipeline.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from huey import SqliteHuey, crontab
|
||||
from pathlib import Path
|
||||
|
||||
from huey import SqliteHuey, crontab, signals
|
||||
|
||||
# --- Setup ---
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
|
||||
|
||||
# Token budget integration
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
try:
|
||||
from token_budget import record_usage, get_remaining, can_afford, get_report
|
||||
HAS_BUDGET = True
|
||||
except ImportError:
|
||||
HAS_BUDGET = False
|
||||
|
||||
|
||||
# --- Pipeline definitions ---
|
||||
PIPELINES = {
|
||||
"playground-factory": {
|
||||
"script": "scripts/pipeline_playground_factory.sh",
|
||||
"max_tokens": 100_000,
|
||||
"dependencies": [],
|
||||
},
|
||||
"training-factory": {
|
||||
"script": "scripts/pipeline_training_factory.sh",
|
||||
"max_tokens": 150_000,
|
||||
"dependencies": [],
|
||||
},
|
||||
"knowledge-mine": {
|
||||
"script": "scripts/pipeline_knowledge_mine.sh",
|
||||
"max_tokens": 80_000,
|
||||
"dependencies": ["training-factory"],
|
||||
},
|
||||
"adversary": {
|
||||
"script": "scripts/pipeline_adversary.sh",
|
||||
"max_tokens": 50_000,
|
||||
"dependencies": ["knowledge-mine"],
|
||||
},
|
||||
"codebase-genome": {
|
||||
"script": "scripts/pipeline_codebase_genome.sh",
|
||||
"max_tokens": 120_000,
|
||||
"dependencies": [],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# --- Token tracking signal ---
|
||||
@huey.signal()
|
||||
def track_tokens(signal, task, task_value=None, **kwargs):
|
||||
"""Automatically log token usage after each pipeline task completes.
|
||||
|
||||
Hooks into Huey's signal system. Fires on task execution.
|
||||
Extracts token counts from the task result and records them.
|
||||
"""
|
||||
if not HAS_BUDGET:
|
||||
return
|
||||
|
||||
# Only track pipeline tasks
|
||||
task_name = getattr(task, "name", "") or ""
|
||||
if not task_name.startswith("pipeline."):
|
||||
return
|
||||
|
||||
pipeline = task_name.replace("pipeline.", "")
|
||||
|
||||
# Extract token counts from result
|
||||
result = task_value or {}
|
||||
if isinstance(result, dict):
|
||||
input_tokens = result.get("input_tokens", 0)
|
||||
output_tokens = result.get("output_tokens", 0)
|
||||
if input_tokens or output_tokens:
|
||||
record_usage(pipeline, input_tokens, output_tokens)
|
||||
|
||||
|
||||
# --- Pipeline tasks ---
|
||||
@huey.task()
|
||||
def pipeline_task(name: str, max_tokens: int = None):
|
||||
"""Run a single pipeline and return token usage stats."""
|
||||
spec = PIPELINES.get(name)
|
||||
if not spec:
|
||||
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
|
||||
|
||||
script = spec["script"]
|
||||
budget = max_tokens or spec["max_tokens"]
|
||||
|
||||
# Check budget before running
|
||||
if HAS_BUDGET and not can_afford(budget):
|
||||
return {
|
||||
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
|
||||
# Check dependencies
|
||||
for dep in spec.get("dependencies", []):
|
||||
dep_state = _get_pipeline_state(dep)
|
||||
if dep_state not in ("running", "complete"):
|
||||
return {
|
||||
"error": f"Dependency {dep} not met (state: {dep_state})",
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
|
||||
# Run the pipeline script
|
||||
script_path = Path.home() / "timmy-config" / script
|
||||
if not script_path.exists():
|
||||
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
|
||||
|
||||
_set_pipeline_state(name, "running")
|
||||
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["bash", str(script_path), "--max-tokens", str(budget)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3600, # 1 hour max
|
||||
)
|
||||
|
||||
# Parse token usage from stdout (if script reports it)
|
||||
input_tokens = 0
|
||||
output_tokens = 0
|
||||
for line in result.stdout.splitlines():
|
||||
if "input_tokens=" in line:
|
||||
try:
|
||||
input_tokens = int(line.split("input_tokens=")[1].split()[0])
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
if "output_tokens=" in line:
|
||||
try:
|
||||
output_tokens = int(line.split("output_tokens=")[1].split()[0])
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
# If script didn't report tokens, estimate from output
|
||||
if not input_tokens and not output_tokens:
|
||||
output_tokens = len(result.stdout) // 4 # rough estimate
|
||||
|
||||
# Log output
|
||||
with open(log_path, "a") as f:
|
||||
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
|
||||
f.write(result.stdout)
|
||||
if result.stderr:
|
||||
f.write(f"\nSTDERR:\n{result.stderr}")
|
||||
|
||||
if result.returncode == 0:
|
||||
_set_pipeline_state(name, "complete")
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "complete",
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
}
|
||||
else:
|
||||
_set_pipeline_state(name, "failed")
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "failed",
|
||||
"error": result.stderr[:500],
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
_set_pipeline_state(name, "failed")
|
||||
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
|
||||
except Exception as e:
|
||||
_set_pipeline_state(name, "failed")
|
||||
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
|
||||
|
||||
|
||||
@huey.periodic_task(crontab(hour="*/6"))
|
||||
def pipeline_scheduler():
|
||||
"""Check pipeline state and start the next eligible pipeline."""
|
||||
report_lines = ["=== Pipeline Scheduler ==="]
|
||||
|
||||
# Check budget
|
||||
if HAS_BUDGET:
|
||||
remaining = get_remaining()
|
||||
report_lines.append(f"Budget remaining: {remaining:,} tokens")
|
||||
if remaining <= 0:
|
||||
report_lines.append("Budget exhausted. Skipping.")
|
||||
return "\n".join(report_lines)
|
||||
|
||||
# Find next eligible pipeline
|
||||
started = False
|
||||
for name, spec in PIPELINES.items():
|
||||
state = _get_pipeline_state(name)
|
||||
|
||||
if state in ("running", "complete"):
|
||||
report_lines.append(f"SKIP {name}: {state}")
|
||||
continue
|
||||
|
||||
# Check dependencies
|
||||
deps_ok = True
|
||||
for dep in spec.get("dependencies", []):
|
||||
dep_state = _get_pipeline_state(dep)
|
||||
if dep_state not in ("running", "complete"):
|
||||
report_lines.append(f"SKIP {name}: dependency {dep} not met")
|
||||
deps_ok = False
|
||||
break
|
||||
|
||||
if not deps_ok:
|
||||
continue
|
||||
|
||||
# Start pipeline
|
||||
report_lines.append(f"START {name}")
|
||||
pipeline_task(name)
|
||||
started = True
|
||||
break # One pipeline per scheduler tick
|
||||
|
||||
if not started:
|
||||
report_lines.append("No pipelines to start")
|
||||
|
||||
return "\n".join(report_lines)
|
||||
|
||||
|
||||
# --- State management ---
|
||||
STATE_FILE = HERMES_HOME / "pipeline_state.json"
|
||||
|
||||
|
||||
def _get_pipeline_state(name: str) -> str:
|
||||
if STATE_FILE.exists():
|
||||
try:
|
||||
data = json.loads(STATE_FILE.read_text())
|
||||
return data.get(name, {}).get("state", "not_started")
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return "not_started"
|
||||
|
||||
|
||||
def _set_pipeline_state(name: str, state: str):
|
||||
data = {}
|
||||
if STATE_FILE.exists():
|
||||
try:
|
||||
data = json.loads(STATE_FILE.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
|
||||
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
STATE_FILE.write_text(json.dumps(data, indent=2))
|
||||
|
||||
|
||||
# --- CLI ---
|
||||
if __name__ == "__main__":
|
||||
if "--budget" in sys.argv or "--report" in sys.argv:
|
||||
if HAS_BUDGET:
|
||||
print(get_report())
|
||||
else:
|
||||
print("token_budget module not available")
|
||||
elif "--status" in sys.argv:
|
||||
for name in PIPELINES:
|
||||
state = _get_pipeline_state(name)
|
||||
print(f" {name}: {state}")
|
||||
elif "--run" in sys.argv:
|
||||
idx = sys.argv.index("--run")
|
||||
name = sys.argv[idx + 1]
|
||||
print(f"Enqueuing pipeline: {name}")
|
||||
result = pipeline_task(name)
|
||||
print(f"Task enqueued")
|
||||
else:
|
||||
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")
|
||||
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
|
||||
|
||||
@@ -1,142 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
token_budget.py — Daily token budget tracker for pipeline orchestration.
|
||||
|
||||
Tracks token usage per pipeline per day, enforces daily limits,
|
||||
and provides a query interface for the orchestrator.
|
||||
|
||||
Data: ~/.hermes/pipeline_budget.json
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
BUDGET_FILE = Path.home() / ".hermes" / "pipeline_budget.json"
|
||||
DEFAULT_DAILY_LIMIT = 500_000
|
||||
|
||||
|
||||
def _load() -> dict:
|
||||
if BUDGET_FILE.exists():
|
||||
try:
|
||||
return json.loads(BUDGET_FILE.read_text())
|
||||
except (json.JSONDecodeError, OSError):
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
def _save(data: dict):
|
||||
BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
BUDGET_FILE.write_text(json.dumps(data, indent=2))
|
||||
|
||||
|
||||
def today_key() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def get_daily_usage(pipeline: str = None) -> dict:
|
||||
"""Get token usage for today. If pipeline specified, return just that pipeline."""
|
||||
data = _load()
|
||||
day = data.get("daily", {}).get(today_key(), {"tokens_used": 0, "pipelines": {}})
|
||||
if pipeline:
|
||||
return {
|
||||
"pipeline": pipeline,
|
||||
"tokens_used": day.get("pipelines", {}).get(pipeline, 0),
|
||||
"daily_total": day.get("tokens_used", 0),
|
||||
}
|
||||
return day
|
||||
|
||||
|
||||
def get_remaining(limit: int = DEFAULT_DAILY_LIMIT) -> int:
|
||||
"""Get remaining token budget for today."""
|
||||
usage = get_daily_usage()
|
||||
return max(0, limit - usage.get("tokens_used", 0))
|
||||
|
||||
|
||||
def can_afford(tokens: int, limit: int = DEFAULT_DAILY_LIMIT) -> bool:
|
||||
"""Check if we have budget for a token spend."""
|
||||
return get_remaining(limit) >= tokens
|
||||
|
||||
|
||||
def record_usage(pipeline: str, input_tokens: int, output_tokens: int) -> dict:
|
||||
"""
|
||||
Record token usage for a pipeline task.
|
||||
|
||||
Called automatically by the orchestrator after each pipeline task completes.
|
||||
Returns the updated daily state.
|
||||
"""
|
||||
total = input_tokens + output_tokens
|
||||
data = _load()
|
||||
today = today_key()
|
||||
|
||||
daily = data.setdefault("daily", {})
|
||||
day = daily.setdefault(today, {"tokens_used": 0, "pipelines": {}})
|
||||
|
||||
day["tokens_used"] = day.get("tokens_used", 0) + total
|
||||
pipes = day.setdefault("pipelines", {})
|
||||
pipes[pipeline] = pipes.get(pipeline, 0) + total
|
||||
|
||||
# Track breakdown
|
||||
breakdown = day.setdefault("breakdown", {})
|
||||
pb = breakdown.setdefault(pipeline, {"input": 0, "output": 0, "calls": 0})
|
||||
pb["input"] += input_tokens
|
||||
pb["output"] += output_tokens
|
||||
pb["calls"] += 1
|
||||
|
||||
# Track lifetime stats
|
||||
lifetime = data.setdefault("lifetime", {"total_tokens": 0, "total_days": 0})
|
||||
lifetime["total_tokens"] = lifetime.get("total_tokens", 0) + total
|
||||
|
||||
_save(data)
|
||||
|
||||
return {
|
||||
"pipeline": pipeline,
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
"total": total,
|
||||
"daily_used": day["tokens_used"],
|
||||
"daily_remaining": get_remaining(),
|
||||
}
|
||||
|
||||
|
||||
def get_report() -> str:
|
||||
"""Generate a human-readable budget report."""
|
||||
data = _load()
|
||||
today = today_key()
|
||||
day = data.get("daily", {}).get(today, {"tokens_used": 0, "pipelines": {}})
|
||||
|
||||
lines = []
|
||||
lines.append(f"Token Budget — {today}")
|
||||
lines.append(f" Daily usage: {day.get('tokens_used', 0):,} / {DEFAULT_DAILY_LIMIT:,}")
|
||||
lines.append(f" Remaining: {get_remaining():,}")
|
||||
lines.append("")
|
||||
lines.append(" Pipelines:")
|
||||
|
||||
breakdown = day.get("breakdown", {})
|
||||
for name, stats in sorted(breakdown.items(), key=lambda x: -x[1]["output"]):
|
||||
total = stats["input"] + stats["output"]
|
||||
lines.append(f" {name}: {total:,} tokens ({stats['calls']} calls)")
|
||||
|
||||
if not breakdown:
|
||||
lines.append(" (no pipelines run today)")
|
||||
|
||||
lifetime = data.get("lifetime", {})
|
||||
lines.append("")
|
||||
lines.append(f" Lifetime: {lifetime.get('total_tokens', 0):,} total tokens")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
if "--report" in sys.argv:
|
||||
print(get_report())
|
||||
elif "--remaining" in sys.argv:
|
||||
print(get_remaining())
|
||||
elif "--can-afford" in sys.argv:
|
||||
idx = sys.argv.index("--can-afford")
|
||||
tokens = int(sys.argv[idx + 1])
|
||||
print("yes" if can_afford(tokens) else "no")
|
||||
else:
|
||||
print(get_report())
|
||||
165
tests/test_quality_gates.py
Normal file
165
tests/test_quality_gates.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Tests for CI Automation Gate and Task Gate.
|
||||
|
||||
Tests the quality gate infrastructure:
|
||||
- ci_automation_gate.py: function length, linting, trailing whitespace
|
||||
- task_gate.py: pre/post task validation
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
|
||||
from ci_automation_gate import QualityGate
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# QualityGate — ci_automation_gate.py
|
||||
# =========================================================================
|
||||
|
||||
class TestQualityGateLinting:
|
||||
"""Test trailing whitespace and final newline checks."""
|
||||
|
||||
def test_clean_file_passes(self, tmp_path):
|
||||
f = tmp_path / "clean.py"
|
||||
f.write_text("def foo():\n return 1\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.failures == 0
|
||||
assert gate.warnings == 0
|
||||
|
||||
def test_trailing_whitespace_warns(self, tmp_path):
|
||||
f = tmp_path / "messy.py"
|
||||
f.write_text("def foo(): \n return 1\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.warnings >= 1
|
||||
assert gate.failures == 0
|
||||
|
||||
def test_missing_final_newline_warns(self, tmp_path):
|
||||
f = tmp_path / "no_newline.py"
|
||||
f.write_text("def foo():\n return 1")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.warnings >= 1
|
||||
|
||||
def test_fix_mode_cleans_whitespace(self, tmp_path):
|
||||
f = tmp_path / "messy.py"
|
||||
f.write_text("def foo(): \n return 1\n")
|
||||
gate = QualityGate(fix=True)
|
||||
gate.check_file(f)
|
||||
fixed = f.read_text()
|
||||
assert " \n" not in fixed # trailing spaces removed
|
||||
assert fixed.endswith("\n")
|
||||
|
||||
def test_fix_mode_adds_final_newline(self, tmp_path):
|
||||
f = tmp_path / "no_newline.py"
|
||||
f.write_text("def foo():\n return 1")
|
||||
gate = QualityGate(fix=True)
|
||||
gate.check_file(f)
|
||||
fixed = f.read_text()
|
||||
assert fixed.endswith("\n")
|
||||
|
||||
|
||||
class TestQualityGateFunctionLength:
|
||||
"""Test function length detection for JS/TS files."""
|
||||
|
||||
def test_short_function_passes(self, tmp_path):
|
||||
f = tmp_path / "short.js"
|
||||
f.write_text("function foo() {\n return 1;\n}\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.failures == 0
|
||||
|
||||
def test_long_function_warns(self, tmp_path):
|
||||
body = "\n".join(f" console.log({i});" for i in range(25))
|
||||
f = tmp_path / "long.js"
|
||||
f.write_text(f"function foo() {{\n{body}\n}}\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.warnings >= 1
|
||||
|
||||
def test_very_long_function_fails(self, tmp_path):
|
||||
body = "\n".join(f" console.log({i});" for i in range(55))
|
||||
f = tmp_path / "huge.js"
|
||||
f.write_text(f"function foo() {{\n{body}\n}}\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.failures >= 1
|
||||
|
||||
def test_python_files_skip_length_check(self, tmp_path):
|
||||
"""Python files should not trigger JS function length regex."""
|
||||
body = "\n".join(f" x = {i}" for i in range(60))
|
||||
f = tmp_path / "long.py"
|
||||
f.write_text(f"def foo():\n{body}\n return x\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
assert gate.failures == 0 # JS regex won't match Python
|
||||
|
||||
def test_non_code_files_skipped(self, tmp_path):
|
||||
f = tmp_path / "readme.md"
|
||||
f.write_text("# Hello \n\nSome text")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.check_file(f)
|
||||
# .md files should be skipped entirely
|
||||
assert gate.failures == 0
|
||||
assert gate.warnings == 0
|
||||
|
||||
|
||||
class TestQualityGateRun:
|
||||
"""Test the full directory scan."""
|
||||
|
||||
def test_run_exits_1_on_failure(self, tmp_path):
|
||||
body = "\n".join(f" console.log({i});" for i in range(55))
|
||||
f = tmp_path / "huge.js"
|
||||
f.write_text(f"function foo() {{\n{body}\n}}\n")
|
||||
gate = QualityGate(fix=False)
|
||||
with pytest.raises(SystemExit) as exc:
|
||||
gate.run(str(tmp_path))
|
||||
assert exc.value.code == 1
|
||||
|
||||
def test_run_exits_0_on_clean(self, tmp_path):
|
||||
f = tmp_path / "clean.py"
|
||||
f.write_text("x = 1\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.run(str(tmp_path)) # should not raise
|
||||
assert gate.failures == 0
|
||||
|
||||
def test_run_skips_node_modules(self, tmp_path):
|
||||
nm = tmp_path / "node_modules"
|
||||
nm.mkdir()
|
||||
bad = nm / "huge.js"
|
||||
body = "\n".join(f" console.log({i});" for i in range(55))
|
||||
bad.write_text(f"function foo() {{\n{body}\n}}\n")
|
||||
gate = QualityGate(fix=False)
|
||||
gate.run(str(tmp_path))
|
||||
assert gate.failures == 0 # node_modules skipped
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Task Gate — task_gate.py (integration-level tests)
|
||||
# =========================================================================
|
||||
|
||||
class TestTaskGateImports:
|
||||
"""Verify task_gate module is importable."""
|
||||
|
||||
def test_import_task_gate(self):
|
||||
from task_gate import FILTER_TAGS, AGENT_USERNAMES
|
||||
assert isinstance(FILTER_TAGS, list)
|
||||
assert len(FILTER_TAGS) > 0
|
||||
assert isinstance(AGENT_USERNAMES, set)
|
||||
assert "timmy" in AGENT_USERNAMES
|
||||
|
||||
def test_filter_tags_contain_epic(self):
|
||||
from task_gate import FILTER_TAGS
|
||||
assert any("EPIC" in tag for tag in FILTER_TAGS)
|
||||
|
||||
def test_filter_tags_contain_permanent(self):
|
||||
from task_gate import FILTER_TAGS
|
||||
assert any("PERMANENT" in tag for tag in FILTER_TAGS)
|
||||
Reference in New Issue
Block a user