diff --git a/conftest.py b/conftest.py index 5c584923..bc72aa4e 100644 --- a/conftest.py +++ b/conftest.py @@ -4,6 +4,7 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from jupyter_scheduler.managers import SQLAlchemyDatabaseManager from jupyter_scheduler.orm import Base from jupyter_scheduler.scheduler import Scheduler from jupyter_scheduler.tests.mocks import MockEnvironmentManager @@ -59,6 +60,8 @@ def jp_scheduler(jp_scheduler_db_url, jp_scheduler_root_dir, jp_scheduler_db): db_url=jp_scheduler_db_url, root_dir=str(jp_scheduler_root_dir), environments_manager=MockEnvironmentManager(), + database_manager=SQLAlchemyDatabaseManager(), + database_manager_class="jupyter_scheduler.managers.SQLAlchemyDatabaseManager", ) diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 7e1a9974..cdfd2cd5 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -1,10 +1,11 @@ +import importlib import io import os import shutil import tarfile import traceback from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, Optional import fsspec import nbconvert @@ -29,11 +30,31 @@ class ExecutionManager(ABC): _model = None _db_session = None - def __init__(self, job_id: str, root_dir: str, db_url: str, staging_paths: Dict[str, str]): + def __init__( + self, + job_id: str, + root_dir: str, + db_url: str, + staging_paths: Dict[str, str], + database_manager_class, + job_data: Optional[Dict] = None, # NEW: Optional job data for passing metadata + ): self.job_id = job_id self.staging_paths = staging_paths self.root_dir = root_dir self.db_url = db_url + self.job_data = job_data # Store for use by subclasses + + self.database_manager = self._create_database_manager(database_manager_class) + + def _create_database_manager(self, database_manager_class): + try: + module_name, class_name = database_manager_class.rsplit(".", 1) + module = importlib.import_module(module_name) + DatabaseManagerClass = getattr(module, class_name) + return DatabaseManagerClass() + except (ValueError, ImportError, AttributeError) as e: + raise ValueError(f"Invalid database_manager_class '{database_manager_class}': {e}") @property def model(self): @@ -46,7 +67,7 @@ def model(self): @property def db_session(self): if self._db_session is None: - self._db_session = create_session(self.db_url) + self._db_session = create_session(self.db_url, self.database_manager) return self._db_session diff --git a/jupyter_scheduler/extension.py b/jupyter_scheduler/extension.py index 1a4ba373..7d0d59e5 100644 --- a/jupyter_scheduler/extension.py +++ b/jupyter_scheduler/extension.py @@ -45,6 +45,13 @@ class SchedulerApp(ExtensionApp): def _db_url_default(self): return f"sqlite:///{jupyter_data_dir()}/scheduler.sqlite" + database_manager_class = Type( + default_value="jupyter_scheduler.managers.SQLAlchemyDatabaseManager", + klass="jupyter_scheduler.managers.DatabaseManager", + config=True, + help=_i18n("Database manager class for custom database backends."), + ) + environment_manager_class = Type( default_value="jupyter_scheduler.environments.CondaEnvironmentManager", klass="jupyter_scheduler.environments.EnvironmentManager", @@ -69,7 +76,8 @@ def _db_url_default(self): def initialize_settings(self): super().initialize_settings() - create_tables(self.db_url, self.drop_tables) + database_manager = self.database_manager_class() + create_tables(self.db_url, self.drop_tables, database_manager=database_manager) environments_manager = self.environment_manager_class() @@ -78,6 +86,8 @@ def initialize_settings(self): environments_manager=environments_manager, db_url=self.db_url, config=self.config, + database_manager=database_manager, + database_manager_class=self.database_manager_class, ) job_files_manager = self.job_files_manager_class(scheduler=scheduler) diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index 15916d45..ae149898 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -57,13 +57,44 @@ def generate_filepaths(self): """A generator that produces filepaths""" output_formats = self.output_formats + ["input"] for output_format in output_formats: + # Skip if this format is not in staging_paths (e.g., input file for CronJob jobs) + if output_format not in self.staging_paths: + continue input_filepath = self.staging_paths[output_format] output_filepath = os.path.join(self.output_dir, self.output_filenames[output_format]) if not os.path.exists(output_filepath) or self.redownload: yield input_filepath, output_filepath + + if self.staging_paths: + staging_dir = os.path.dirname(next(iter(self.staging_paths.values()))) + if os.path.exists(staging_dir): + explicit_files = set() + for output_format in output_formats: + if output_format in self.staging_paths: + explicit_files.add(os.path.basename(self.staging_paths[output_format])) + + for file_name in os.listdir(staging_dir): + file_path = os.path.join(staging_dir, file_name) + if os.path.isfile(file_path) and file_name not in explicit_files: + input_filepath = file_path + output_filepath = os.path.join(self.output_dir, file_name) + if not os.path.exists(output_filepath) or self.redownload: + yield input_filepath, output_filepath + if self.include_staging_files: - staging_dir = os.path.dirname(self.staging_paths["input"]) - for file_relative_path in self.output_filenames["files"]: + # Handle missing "input" key gracefully - it may not exist for CronJob jobs + if "input" in self.staging_paths: + staging_dir = os.path.dirname(self.staging_paths["input"]) + elif self.staging_paths: + # Fall back to any available staging path directory + staging_dir = os.path.dirname(next(iter(self.staging_paths.values()))) + else: + # No staging paths available, skip + return + + # Handle missing "files" key gracefully - it may not exist if packaged_files was empty + files_list = self.output_filenames.get("files", []) + for file_relative_path in files_list: input_filepath = os.path.join(staging_dir, file_relative_path) output_filepath = os.path.join(self.output_dir, file_relative_path) if not os.path.exists(output_filepath) or self.redownload: diff --git a/jupyter_scheduler/managers.py b/jupyter_scheduler/managers.py new file mode 100644 index 00000000..cd83deb7 --- /dev/null +++ b/jupyter_scheduler/managers.py @@ -0,0 +1,66 @@ +from abc import ABC, abstractmethod +from sqlite3 import OperationalError + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from jupyter_scheduler.orm import Base as DefaultBase +from jupyter_scheduler.orm import update_db_schema + + +class DatabaseManager(ABC): + """Base class for database managers. + + Database managers handle database operations for jupyter-scheduler. + Subclasses can implement custom storage backends (K8s, Redis, etc.) + while maintaining compatibility with the scheduler's session interface. + """ + + @abstractmethod + def create_session(self, db_url: str): + """Create a database session. + + Args: + db_url: Database URL (e.g., "k8s://namespace", "redis://localhost") + + Returns: + Session object compatible with SQLAlchemy session interface + """ + pass + + @abstractmethod + def create_tables(self, db_url: str, drop_tables: bool = False, Base=None): + """Create database tables/schema. + + Args: + db_url: Database URL + drop_tables: Whether to drop existing tables first + Base: SQLAlchemy Base for custom schemas (tests) + """ + pass + + +class SQLAlchemyDatabaseManager(DatabaseManager): + """Default database manager using SQLAlchemy.""" + + def create_session(self, db_url: str): + """Create SQLAlchemy session factory.""" + engine = create_engine(db_url, echo=False) + Session = sessionmaker(bind=engine) + return Session + + def create_tables(self, db_url: str, drop_tables: bool = False, Base=None): + """Create database tables using SQLAlchemy.""" + if Base is None: + Base = DefaultBase + + engine = create_engine(db_url) + update_db_schema(engine, Base) + + try: + if drop_tables: + Base.metadata.drop_all(engine) + except OperationalError: + pass + finally: + Base.metadata.create_all(engine) diff --git a/jupyter_scheduler/models.py b/jupyter_scheduler/models.py index 38e240e0..a8984430 100644 --- a/jupyter_scheduler/models.py +++ b/jupyter_scheduler/models.py @@ -77,6 +77,7 @@ class CreateJob(BaseModel): input_filename: str = None runtime_environment_name: str runtime_environment_parameters: Optional[Dict[str, EnvironmentParameterValues]] + environment_variables: Optional[Dict[str, str]] = None output_formats: Optional[List[str]] = None idempotency_token: Optional[str] = None job_definition_id: Optional[str] = None @@ -128,6 +129,7 @@ class DescribeJob(BaseModel): input_filename: str = None runtime_environment_name: str runtime_environment_parameters: Optional[Dict[str, EnvironmentParameterValues]] + environment_variables: Optional[Dict[str, str]] = None output_formats: Optional[List[str]] = None idempotency_token: Optional[str] = None job_definition_id: Optional[str] = None @@ -193,6 +195,7 @@ class UpdateJob(BaseModel): status: Optional[Status] = None name: Optional[str] = None compute_type: Optional[str] = None + environment_variables: Optional[Dict[str, str]] = None class DeleteJob(BaseModel): @@ -204,6 +207,7 @@ class CreateJobDefinition(BaseModel): input_filename: str = None runtime_environment_name: str runtime_environment_parameters: Optional[Dict[str, EnvironmentParameterValues]] + environment_variables: Optional[Dict[str, str]] = None output_formats: Optional[List[str]] = None parameters: Optional[Dict[str, str]] = None tags: Optional[Tags] = None @@ -226,6 +230,7 @@ class DescribeJobDefinition(BaseModel): input_filename: str = None runtime_environment_name: str runtime_environment_parameters: Optional[Dict[str, EnvironmentParameterValues]] + environment_variables: Optional[Dict[str, str]] = None output_formats: Optional[List[str]] = None parameters: Optional[Dict[str, str]] = None tags: Optional[Tags] = None @@ -248,6 +253,7 @@ class Config: class UpdateJobDefinition(BaseModel): runtime_environment_name: Optional[str] runtime_environment_parameters: Optional[Dict[str, EnvironmentParameterValues]] + environment_variables: Optional[Dict[str, str]] = None output_formats: Optional[List[str]] = None parameters: Optional[Dict[str, str]] = None tags: Optional[Tags] = None diff --git a/jupyter_scheduler/orm.py b/jupyter_scheduler/orm.py index dbbbfad8..70552017 100644 --- a/jupyter_scheduler/orm.py +++ b/jupyter_scheduler/orm.py @@ -89,6 +89,7 @@ class CommonColumns: # Any default values specified for new columns will be ignored during the migration process. package_input_folder = Column(Boolean) packaged_files = Column(JsonType, default=[]) + environment_variables = Column(JsonType(4096)) class Job(CommonColumns, Base): @@ -146,21 +147,9 @@ def update_db_schema(engine, Base): connection.execute(alter_statement) -def create_tables(db_url, drop_tables=False, Base=Base): - engine = create_engine(db_url) - update_db_schema(engine, Base) +def create_tables(db_url, drop_tables=False, Base=Base, *, database_manager): + database_manager.create_tables(db_url, drop_tables, Base) - try: - if drop_tables: - Base.metadata.drop_all(engine) - except OperationalError: - pass - finally: - Base.metadata.create_all(engine) - -def create_session(db_url): - engine = create_engine(db_url, echo=False) - Session = sessionmaker(bind=engine) - - return Session +def create_session(db_url, database_manager): + return database_manager.create_session(db_url) diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 867034c6..b8bd06e1 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -405,19 +405,23 @@ def __init__( environments_manager: Type[EnvironmentManager], db_url: str, config=None, + database_manager=None, + database_manager_class=None, **kwargs, ): super().__init__( root_dir=root_dir, environments_manager=environments_manager, config=config, **kwargs ) self.db_url = db_url + self.database_manager = database_manager + self.database_manager_class = database_manager_class if self.task_runner_class: self.task_runner = self.task_runner_class(scheduler=self, config=config) @property def db_session(self): if not self._db_session: - self._db_session = create_session(self.db_url) + self._db_session = create_session(self.db_url, self.database_manager) return self._db_session @@ -485,6 +489,30 @@ def create_job(self, model: CreateJob) -> str: # # See: https://github.com/python/cpython/issues/66285 # See also: https://github.com/jupyter/jupyter_core/pull/362 + # Serialize job data for cross-process passing + job_data = { + "job_id": job.job_id, + "name": job.name if hasattr(job, "name") else None, + "input_filename": job.input_filename if hasattr(job, "input_filename") else None, + "runtime_environment_name": ( + job.runtime_environment_name + if hasattr(job, "runtime_environment_name") + else None + ), + "runtime_environment_parameters": ( + job.runtime_environment_parameters + if hasattr(job, "runtime_environment_parameters") + else None + ), + "output_formats": job.output_formats if hasattr(job, "output_formats") else [], + "parameters": job.parameters if hasattr(job, "parameters") else None, + "tags": job.tags if hasattr(job, "tags") else [], + "package_input_folder": ( + job.package_input_folder if hasattr(job, "package_input_folder") else False + ), + "packaged_files": job.packaged_files if hasattr(job, "packaged_files") else [], + } + mp_ctx = mp.get_context("spawn") p = mp_ctx.Process( target=self.execution_manager_class( @@ -492,6 +520,8 @@ def create_job(self, model: CreateJob) -> str: staging_paths=staging_paths, root_dir=self.root_dir, db_url=self.db_url, + database_manager_class=self.database_manager_class, + job_data=job_data, ).process ) p.start() diff --git a/jupyter_scheduler/tests/test_execution_manager.py b/jupyter_scheduler/tests/test_execution_manager.py index 66546be3..11e9d3c3 100644 --- a/jupyter_scheduler/tests/test_execution_manager.py +++ b/jupyter_scheduler/tests/test_execution_manager.py @@ -53,6 +53,7 @@ def test_add_side_effects_files( root_dir=jp_scheduler_root_dir, db_url=jp_scheduler_db_url, staging_paths={"input": staged_notebook_file_path}, + database_manager_class="jupyter_scheduler.managers.SQLAlchemyDatabaseManager", ) manager.add_side_effects_files(staged_notebook_dir) diff --git a/jupyter_scheduler/tests/test_orm.py b/jupyter_scheduler/tests/test_orm.py index e2aab07e..68e69aa7 100644 --- a/jupyter_scheduler/tests/test_orm.py +++ b/jupyter_scheduler/tests/test_orm.py @@ -13,7 +13,16 @@ @pytest.fixture -def initial_db(jp_scheduler_db_url) -> tuple[Type[DeclarativeMeta], sessionmaker, str]: +def database_manager(): + from jupyter_scheduler.managers import SQLAlchemyDatabaseManager + + return SQLAlchemyDatabaseManager() + + +@pytest.fixture +def initial_db( + jp_scheduler_db_url, database_manager +) -> tuple[Type[DeclarativeMeta], sessionmaker, str]: TestBase = declarative_base() class MockInitialJob(TestBase): @@ -24,9 +33,9 @@ class MockInitialJob(TestBase): initial_job = MockInitialJob(runtime_environment_name="abc", input_filename="input.ipynb") - create_tables(db_url=jp_scheduler_db_url, Base=TestBase) + create_tables(db_url=jp_scheduler_db_url, Base=TestBase, database_manager=database_manager) - Session = create_session(jp_scheduler_db_url) + Session = create_session(jp_scheduler_db_url, database_manager) session = Session() session.add(initial_job) @@ -52,7 +61,9 @@ class MockUpdatedJob(TestBase): return MockUpdatedJob -def test_create_tables_with_new_column(jp_scheduler_db_url, initial_db, updated_job_model): +def test_create_tables_with_new_column( + jp_scheduler_db_url, initial_db, updated_job_model, database_manager +): TestBase, Session, initial_job_id = initial_db session = Session() @@ -61,7 +72,7 @@ def test_create_tables_with_new_column(jp_scheduler_db_url, initial_db, updated_ session.close() JobModel = updated_job_model - create_tables(db_url=jp_scheduler_db_url, Base=TestBase) + create_tables(db_url=jp_scheduler_db_url, Base=TestBase, database_manager=database_manager) session = Session() updated_columns = {col["name"] for col in inspect(session.bind).get_columns("jobs")} diff --git a/src/handler.ts b/src/handler.ts index a32588ba..572bae0f 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -364,6 +364,7 @@ export namespace Scheduler { schedule?: string; timezone?: string; package_input_folder?: boolean; + environment_variables?: { [key: string]: string }; } export interface IUpdateJobDefinition { @@ -418,6 +419,7 @@ export namespace Scheduler { output_formats?: string[]; compute_type?: string; package_input_folder?: boolean; + environment_variables?: { [key: string]: string }; } export interface ICreateJobFromDefinition { diff --git a/src/index.tsx b/src/index.tsx index 3612e207..91157857 100644 --- a/src/index.tsx +++ b/src/index.tsx @@ -82,20 +82,24 @@ const schedulerPlugin: JupyterFrontEndPlugin = { INotebookTracker, ITranslator, ILayoutRestorer, - Scheduler.IAdvancedOptions, Scheduler.TelemetryHandler ], - optional: [ILauncher], + optional: [ + ILauncher, + Scheduler.IAdvancedOptionsOverride, + Scheduler.IAdvancedOptions + ], autoStart: true, activate: activatePlugin }; - -// Disable this plugin and replace with custom plugin to change the advanced options UI const advancedOptions: JupyterFrontEndPlugin = { id: '@jupyterlab/scheduler:IAdvancedOptions', autoStart: true, provides: Scheduler.IAdvancedOptions, activate: (app: JupyterFrontEnd) => { + console.log('🔄 DEFAULT jupyter-scheduler advanced options plugin is activating'); + console.log(' Plugin ID: @jupyterlab/scheduler:IAdvancedOptions'); + console.log(' Note: This should be disabled by K8s extension via disabledExtensions'); return AdvancedOptions; } }; @@ -190,10 +194,20 @@ function activatePlugin( notebookTracker: INotebookTracker, translator: ITranslator, restorer: ILayoutRestorer, - advancedOptions: Scheduler.IAdvancedOptions, telemetryHandler: Scheduler.TelemetryHandler, - launcher: ILauncher | null + launcher: ILauncher | null, + advancedOptionsOverride: Scheduler.IAdvancedOptions | null, + advancedOptionsDefault: Scheduler.IAdvancedOptions | null ): void { + console.log('🔍 SCHEDULER PLUGIN ADVANCED OPTIONS RESOLUTION:'); + console.log(' Override token expected:', Scheduler.IAdvancedOptionsOverride); + console.log(' Override received:', advancedOptionsOverride); + console.log(' Default received:', advancedOptionsDefault); + + // Use override if available, otherwise use default + const advancedOptions = advancedOptionsOverride || advancedOptionsDefault; + console.log(' Using:', advancedOptions); + const trans = translator.load('jupyterlab'); const api = new SchedulerService({}); verifyServerExtension({ api, translator }); @@ -237,7 +251,7 @@ function activatePlugin( app, translator, eventLogger, - advancedOptions: advancedOptions + advancedOptions: advancedOptions || AdvancedOptions }); // Create new main area widget mainAreaWidget = new MainAreaWidget({ @@ -372,6 +386,9 @@ function activatePlugin( } } +console.log('📦 DEFAULT jupyter-scheduler extension is being loaded...'); +console.log(' Plugins: schedulerPlugin, advancedOptions, telemetry'); + const plugins: JupyterFrontEndPlugin[] = [ schedulerPlugin, advancedOptions, diff --git a/src/mainviews/create-job.tsx b/src/mainviews/create-job.tsx index 98fee5fc..9cc9c809 100644 --- a/src/mainviews/create-job.tsx +++ b/src/mainviews/create-job.tsx @@ -322,7 +322,8 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { idempotency_token: props.model.idempotencyToken, tags: props.model.tags, runtime_environment_parameters: props.model.runtimeEnvironmentParameters, - package_input_folder: props.model.packageInputFolder + package_input_folder: props.model.packageInputFolder, + environment_variables: props.model.environmentVariables }; if (props.model.parameters !== undefined) { @@ -371,7 +372,8 @@ export function CreateJob(props: ICreateJobProps): JSX.Element { runtime_environment_parameters: props.model.runtimeEnvironmentParameters, schedule: props.model.schedule, timezone: props.model.timezone, - package_input_folder: props.model.packageInputFolder + package_input_folder: props.model.packageInputFolder, + environment_variables: props.model.environmentVariables }; if (props.model.parameters !== undefined) { diff --git a/src/model.ts b/src/model.ts index 91d8f58c..42ffea84 100644 --- a/src/model.ts +++ b/src/model.ts @@ -100,6 +100,7 @@ export interface ICreateJobModel // Is the create button disabled due to a submission in progress? createInProgress?: boolean; packageInputFolder?: boolean; + environmentVariables?: { [key: string]: string }; } export const defaultScheduleFields: ModelWithScheduleFields = { diff --git a/src/tokens.ts b/src/tokens.ts index ee4d5f00..7aa2dffb 100644 --- a/src/tokens.ts +++ b/src/tokens.ts @@ -46,6 +46,11 @@ export namespace Scheduler { '@jupyterlab/scheduler:IAdvancedOptions' ); + // Override token for extensions that want to provide custom advanced options + export const IAdvancedOptionsOverride = new Token( + '@jupyterlab/scheduler:IAdvancedOptionsOverride' + ); + export interface IEvent { name: string; }