Make process_registry checkpoint writes atomic
This commit is contained in:
@@ -34,6 +34,7 @@ import logging
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
@@ -650,7 +651,7 @@ class ProcessRegistry:
|
||||
# ----- Checkpoint (crash recovery) -----
|
||||
|
||||
def _write_checkpoint(self):
|
||||
"""Write running process metadata to checkpoint file."""
|
||||
"""Write running process metadata to checkpoint file atomically."""
|
||||
try:
|
||||
with self._lock:
|
||||
entries = []
|
||||
@@ -665,12 +666,28 @@ class ProcessRegistry:
|
||||
"task_id": s.task_id,
|
||||
"session_key": s.session_key,
|
||||
})
|
||||
|
||||
# Atomic write: temp file + os.replace to avoid corruption on crash
|
||||
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
CHECKPOINT_PATH.write_text(
|
||||
json.dumps(entries, indent=2), encoding="utf-8"
|
||||
fd, tmp_path = tempfile.mkstemp(
|
||||
dir=str(CHECKPOINT_PATH.parent),
|
||||
prefix='.checkpoint_',
|
||||
suffix='.tmp',
|
||||
)
|
||||
except Exception:
|
||||
pass # Best-effort
|
||||
try:
|
||||
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||
json.dump(entries, f, indent=2, ensure_ascii=False)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, CHECKPOINT_PATH)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.debug("Failed to write checkpoint file: %s", e, exc_info=True)
|
||||
|
||||
def recover_from_checkpoint(self) -> int:
|
||||
"""
|
||||
@@ -717,10 +734,28 @@ class ProcessRegistry:
|
||||
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
|
||||
|
||||
# Clear the checkpoint (will be rewritten as processes finish)
|
||||
# Use atomic write to avoid corruption
|
||||
try:
|
||||
CHECKPOINT_PATH.write_text("[]", encoding="utf-8")
|
||||
CHECKPOINT_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp_path = tempfile.mkstemp(
|
||||
dir=str(CHECKPOINT_PATH.parent),
|
||||
prefix='.checkpoint_',
|
||||
suffix='.tmp',
|
||||
)
|
||||
try:
|
||||
with os.fdopen(fd, 'w', encoding='utf-8') as f:
|
||||
f.write("[]")
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, CHECKPOINT_PATH)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.debug("Could not write checkpoint file: %s", e)
|
||||
logger.debug("Could not clear checkpoint file: %s", e, exc_info=True)
|
||||
|
||||
return recovered
|
||||
|
||||
|
||||
Reference in New Issue
Block a user