Skip to content

Commit 16b85ca

Browse files
committed
pass job data to executors
1 parent b39b3f7 commit 16b85ca

File tree

3 files changed

+34
-3
lines changed

3 files changed

+34
-3
lines changed

jupyter_scheduler/executors.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import tarfile
66
import traceback
77
from abc import ABC, abstractmethod
8-
from typing import Dict
8+
from typing import Dict, Optional
99

1010
import fsspec
1111
import nbconvert
@@ -37,11 +37,13 @@ def __init__(
3737
db_url: str,
3838
staging_paths: Dict[str, str],
3939
database_manager_class,
40+
job_data: Optional[Dict] = None, # NEW: Optional job data for passing metadata
4041
):
4142
self.job_id = job_id
4243
self.staging_paths = staging_paths
4344
self.root_dir = root_dir
4445
self.db_url = db_url
46+
self.job_data = job_data # Store for use by subclasses
4547

4648
self.database_manager = self._create_database_manager(database_manager_class)
4749

jupyter_scheduler/job_files_manager.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def generate_filepaths(self):
5757
"""A generator that produces filepaths"""
5858
output_formats = self.output_formats + ["input"]
5959
for output_format in output_formats:
60+
# Skip if this format is not in staging_paths (e.g., input file for CronJob jobs)
61+
if output_format not in self.staging_paths:
62+
continue
6063
input_filepath = self.staging_paths[output_format]
6164
output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format])
6265
if not os.path.exists(output_filepath) or self.redownload:
@@ -79,8 +82,19 @@ def generate_filepaths(self):
7982
yield input_filepath, output_filepath
8083

8184
if self.include_staging_files:
82-
staging_dir = os.path.dirname(self.staging_paths["input"])
83-
for file_relative_path in self.output_filenames["files"]:
85+
# Handle missing "input" key gracefully - it may not exist for CronJob jobs
86+
if "input" in self.staging_paths:
87+
staging_dir = os.path.dirname(self.staging_paths["input"])
88+
elif self.staging_paths:
89+
# Fall back to any available staging path directory
90+
staging_dir = os.path.dirname(next(iter(self.staging_paths.values())))
91+
else:
92+
# No staging paths available, skip
93+
return
94+
95+
# Handle missing "files" key gracefully - it may not exist if packaged_files was empty
96+
files_list = self.output_filenames.get("files", [])
97+
for file_relative_path in files_list:
8498
input_filepath = os.path.join(staging_dir, file_relative_path)
8599
output_filepath = os.path.join(self.output_dir, file_relative_path)
86100
if not os.path.exists(output_filepath) or self.redownload:

jupyter_scheduler/scheduler.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,20 @@ def create_job(self, model: CreateJob) -> str:
489489
#
490490
# See: https://github.com/python/cpython/issues/66285
491491
# See also: https://github.com/jupyter/jupyter_core/pull/362
492+
# Serialize job data for cross-process passing
493+
job_data = {
494+
'job_id': job.job_id,
495+
'name': job.name if hasattr(job, 'name') else None,
496+
'input_filename': job.input_filename if hasattr(job, 'input_filename') else None,
497+
'runtime_environment_name': job.runtime_environment_name if hasattr(job, 'runtime_environment_name') else None,
498+
'runtime_environment_parameters': job.runtime_environment_parameters if hasattr(job, 'runtime_environment_parameters') else None,
499+
'output_formats': job.output_formats if hasattr(job, 'output_formats') else [],
500+
'parameters': job.parameters if hasattr(job, 'parameters') else None,
501+
'tags': job.tags if hasattr(job, 'tags') else [],
502+
'package_input_folder': job.package_input_folder if hasattr(job, 'package_input_folder') else False,
503+
'packaged_files': job.packaged_files if hasattr(job, 'packaged_files') else [],
504+
}
505+
492506
mp_ctx = mp.get_context("spawn")
493507
p = mp_ctx.Process(
494508
target=self.execution_manager_class(
@@ -497,6 +511,7 @@ def create_job(self, model: CreateJob) -> str:
497511
root_dir=self.root_dir,
498512
db_url=self.db_url,
499513
database_manager_class=self.database_manager_class,
514+
job_data=job_data,
500515
).process
501516
)
502517
p.start()

0 commit comments

Comments
 (0)