Compare commits
1 Commits
step35/443
...
fix/621
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b39aee90b4 |
@@ -235,6 +235,33 @@ class OrchestratorDB:
|
|||||||
conn = sqlite3.connect(str(self.db_path))
|
conn = sqlite3.connect(str(self.db_path))
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
def _job_from_row(self, row: sqlite3.Row) -> Job:
|
||||||
|
"""Hydrate a Job from a DB row, loading checkpoints from either storage path."""
|
||||||
|
checkpoint = None
|
||||||
|
if row['checkpoint']:
|
||||||
|
checkpoint = JobCheckpoint.from_dict(json.loads(row['checkpoint']))
|
||||||
|
else:
|
||||||
|
checkpoint = self.get_checkpoint(row['id'])
|
||||||
|
|
||||||
|
return Job(
|
||||||
|
id=row['id'],
|
||||||
|
pipeline=row['pipeline'],
|
||||||
|
task=json.loads(row['task']),
|
||||||
|
status=JobStatus(row['status']),
|
||||||
|
priority=JobPriority(row['priority']),
|
||||||
|
token_budget=row['token_budget'],
|
||||||
|
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
|
||||||
|
retry_count=row['retry_count'],
|
||||||
|
max_retries=row['max_retries'],
|
||||||
|
created_at=row['created_at'],
|
||||||
|
started_at=row['started_at'],
|
||||||
|
completed_at=row['completed_at'],
|
||||||
|
error=row['error'],
|
||||||
|
result=json.loads(row['result']) if row['result'] else None,
|
||||||
|
checkpoint=checkpoint,
|
||||||
|
metadata=json.loads(row['metadata']) if row['metadata'] else {}
|
||||||
|
)
|
||||||
|
|
||||||
def save_job(self, job: Job):
|
def save_job(self, job: Job):
|
||||||
"""Save or update a job."""
|
"""Save or update a job."""
|
||||||
@@ -265,24 +292,7 @@ class OrchestratorDB:
|
|||||||
if not row:
|
if not row:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return Job(
|
return self._job_from_row(row)
|
||||||
id=row['id'],
|
|
||||||
pipeline=row['pipeline'],
|
|
||||||
task=json.loads(row['task']),
|
|
||||||
status=JobStatus(row['status']),
|
|
||||||
priority=JobPriority(row['priority']),
|
|
||||||
token_budget=row['token_budget'],
|
|
||||||
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
|
|
||||||
retry_count=row['retry_count'],
|
|
||||||
max_retries=row['max_retries'],
|
|
||||||
created_at=row['created_at'],
|
|
||||||
started_at=row['started_at'],
|
|
||||||
completed_at=row['completed_at'],
|
|
||||||
error=row['error'],
|
|
||||||
result=json.loads(row['result']) if row['result'] else None,
|
|
||||||
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
|
|
||||||
metadata=json.loads(row['metadata']) if row['metadata'] else {}
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
|
def get_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
|
||||||
"""Get next pending job (highest priority first)."""
|
"""Get next pending job (highest priority first)."""
|
||||||
@@ -303,24 +313,34 @@ class OrchestratorDB:
|
|||||||
if not row:
|
if not row:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return Job(
|
return self._job_from_row(row)
|
||||||
id=row['id'],
|
|
||||||
pipeline=row['pipeline'],
|
def claim_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
|
||||||
task=json.loads(row['task']),
|
"""Atomically claim the next pending job for execution."""
|
||||||
status=JobStatus(row['status']),
|
conn = self._get_conn()
|
||||||
priority=JobPriority(row['priority']),
|
try:
|
||||||
token_budget=row['token_budget'],
|
conn.execute("BEGIN IMMEDIATE")
|
||||||
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
|
query = "SELECT id FROM jobs WHERE status = 'pending'"
|
||||||
retry_count=row['retry_count'],
|
params = []
|
||||||
max_retries=row['max_retries'],
|
if pipeline:
|
||||||
created_at=row['created_at'],
|
query += " AND pipeline = ?"
|
||||||
started_at=row['started_at'],
|
params.append(pipeline)
|
||||||
completed_at=row['completed_at'],
|
query += " ORDER BY priority DESC, created_at ASC LIMIT 1"
|
||||||
error=row['error'],
|
row = conn.execute(query, params).fetchone()
|
||||||
result=json.loads(row['result']) if row['result'] else None,
|
if not row:
|
||||||
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
|
conn.commit()
|
||||||
metadata=json.loads(row['metadata']) if row['metadata'] else {}
|
return None
|
||||||
)
|
|
||||||
|
started_at = time.time()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE jobs SET status = ?, started_at = ? WHERE id = ?",
|
||||||
|
(JobStatus.RUNNING.value, started_at, row['id'])
|
||||||
|
)
|
||||||
|
claimed = conn.execute("SELECT * FROM jobs WHERE id = ?", (row['id'],)).fetchone()
|
||||||
|
conn.commit()
|
||||||
|
return self._job_from_row(claimed)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
def get_jobs_by_status(self, status: JobStatus, pipeline: Optional[str] = None) -> List[Job]:
|
def get_jobs_by_status(self, status: JobStatus, pipeline: Optional[str] = None) -> List[Job]:
|
||||||
"""Get all jobs with given status."""
|
"""Get all jobs with given status."""
|
||||||
@@ -338,27 +358,7 @@ class OrchestratorDB:
|
|||||||
rows = conn.execute(query, params).fetchall()
|
rows = conn.execute(query, params).fetchall()
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
return [
|
return [self._job_from_row(row) for row in rows]
|
||||||
Job(
|
|
||||||
id=row['id'],
|
|
||||||
pipeline=row['pipeline'],
|
|
||||||
task=json.loads(row['task']),
|
|
||||||
status=JobStatus(row['status']),
|
|
||||||
priority=JobPriority(row['priority']),
|
|
||||||
token_budget=row['token_budget'],
|
|
||||||
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
|
|
||||||
retry_count=row['retry_count'],
|
|
||||||
max_retries=row['max_retries'],
|
|
||||||
created_at=row['created_at'],
|
|
||||||
started_at=row['started_at'],
|
|
||||||
completed_at=row['completed_at'],
|
|
||||||
error=row['error'],
|
|
||||||
result=json.loads(row['result']) if row['result'] else None,
|
|
||||||
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
|
|
||||||
metadata=json.loads(row['metadata']) if row['metadata'] else {}
|
|
||||||
)
|
|
||||||
for row in rows
|
|
||||||
]
|
|
||||||
|
|
||||||
def save_checkpoint(self, job_id: str, checkpoint: JobCheckpoint):
|
def save_checkpoint(self, job_id: str, checkpoint: JobCheckpoint):
|
||||||
"""Save a checkpoint for a job."""
|
"""Save a checkpoint for a job."""
|
||||||
@@ -612,7 +612,7 @@ class PipelineOrchestrator:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# Get next job
|
# Get next job
|
||||||
job = self.db.get_next_job(pipeline)
|
job = self.db.claim_next_job(pipeline)
|
||||||
|
|
||||||
if not job:
|
if not job:
|
||||||
if not futures:
|
if not futures:
|
||||||
|
|||||||
Reference in New Issue
Block a user