fix(cron): Thread-safe save_jobs with file lock
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 2m14s
Nix / nix (ubuntu-latest) (pull_request) Failing after 2s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 18s
Tests / test (pull_request) Failing after 17m3s
Tests / e2e (pull_request) Successful in 1m21s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 2m14s
Nix / nix (ubuntu-latest) (pull_request) Failing after 2s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 18s
Tests / test (pull_request) Failing after 17m3s
Tests / e2e (pull_request) Successful in 1m21s
Nix / nix (macos-latest) (pull_request) Has been cancelled
Prevents race condition when parallel tick workers call mark_job_run() concurrently. Uses fcntl.flock() to serialize jobs.json writes. Without this fix, 6 parallel workers could read-modify-write jobs.json simultaneously, causing lost status updates.
This commit is contained in:
32
cron/jobs.py
32
cron/jobs.py
@@ -347,22 +347,26 @@ def load_jobs() -> List[Dict[str, Any]]:
|
||||
|
||||
|
||||
def save_jobs(jobs: List[Dict[str, Any]]):
|
||||
"""Save all jobs to storage."""
|
||||
"""Save all jobs to storage. Thread-safe via file lock."""
|
||||
ensure_dirs()
|
||||
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
|
||||
try:
|
||||
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, JOBS_FILE)
|
||||
_secure_file(JOBS_FILE)
|
||||
except BaseException:
|
||||
import fcntl
|
||||
lock_path = JOBS_FILE.parent / ".jobs.lock"
|
||||
with open(lock_path, "w") as lock_fd:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX)
|
||||
fd, tmp_path = tempfile.mkstemp(dir=str(JOBS_FILE.parent), suffix='.tmp', prefix='.jobs_')
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||
json.dump({"jobs": jobs, "updated_at": _hermes_now().isoformat()}, f, indent=2)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, JOBS_FILE)
|
||||
_secure_file(JOBS_FILE)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
|
||||
def create_job(
|
||||
|
||||
Reference in New Issue
Block a user