1+ import sys
12from functools import partial
23from pathlib import Path
34
45import pytest
5- import sys
66
77from azure .ai .ml import dsl , load_component , load_job
88from azure .ai .ml .entities import PipelineJob
@@ -33,13 +33,13 @@ def test_init_finalize_job_from_yaml(self) -> None:
3333 assert pipeline_job_dict ["properties" ]["settings" ]["on_init" ] == "a"
3434 assert pipeline_job_dict ["properties" ]["settings" ]["on_finalize" ] == "c"
3535
36- @pytest .mark .skipif (
37- condition = sys .version_info >= (3 , 13 ), reason = "historical implementation doesn't support Python 3.13+"
38- )
3936 def test_init_finalize_job_from_sdk (self ) -> None :
4037 from azure .ai .ml ._internal .dsl import set_pipeline_settings
4138 from azure .ai .ml .dsl import pipeline
4239
40+ # Access class attributes directly to avoid capturing self in closures
41+ component_func = TestInitFinalizeJob .component_func
42+
4343 def assert_pipeline_job_init_finalize_job (pipeline_job : PipelineJob ):
4444 assert pipeline_job ._validate_init_finalize_job ().passed
4545 assert pipeline_job .settings .on_init == "init_job"
@@ -51,10 +51,10 @@ def assert_pipeline_job_init_finalize_job(pipeline_job: PipelineJob):
5151 # pipeline.settings.on_init/on_finalize
5252 @pipeline ()
5353 def job_settings_func ():
54- init_job = self . component_func () # noqa: F841
55- work1 = self . component_func () # noqa: F841
56- work2 = self . component_func () # noqa: F841
57- finalize_job = self . component_func () # noqa: F841
54+ init_job = component_func () # noqa: F841
55+ work1 = component_func () # noqa: F841
56+ work2 = component_func () # noqa: F841
57+ finalize_job = component_func () # noqa: F841
5858
5959 pipeline1 = job_settings_func ()
6060 pipeline1 .settings .on_init = "init_job"
@@ -64,10 +64,10 @@ def job_settings_func():
6464 # dsl.settings()
6565 @pipeline ()
6666 def dsl_settings_func ():
67- init_job = self . component_func ()
68- work1 = self . component_func () # noqa: F841
69- work2 = self . component_func () # noqa: F841
70- finalize_job = self . component_func () # noqa: F841
67+ init_job = component_func () # noqa: F841
68+ work1 = component_func () # noqa: F841
69+ work2 = component_func () # noqa: F841
70+ finalize_job = component_func () # noqa: F841
7171 # `set_pipeline_settings` can receive either `BaseNode` or str, both should work
7272 set_pipeline_settings (on_init = init_job , on_finalize = "finalize_job" )
7373
@@ -80,10 +80,10 @@ def dsl_settings_func():
8080 on_finalize = "finalize_job" ,
8181 )
8282 def in_decorator_func ():
83- init_job = self . component_func () # noqa: F841
84- work1 = self . component_func () # noqa: F841
85- work2 = self . component_func () # noqa: F841
86- finalize_job = self . component_func () # noqa: F841
83+ init_job = component_func () # noqa: F841
84+ work1 = component_func () # noqa: F841
85+ work2 = component_func () # noqa: F841
86+ finalize_job = component_func () # noqa: F841
8787
8888 pipeline3 = in_decorator_func ()
8989 assert_pipeline_job_init_finalize_job (pipeline3 )
@@ -97,14 +97,15 @@ def test_invalid_init_finalize_job_from_yaml(self) -> None:
9797 == "On_finalize job should not have connection to other execution node."
9898 )
9999
100- @pytest .mark .skipif (
101- condition = sys .version_info >= (3 , 13 ), reason = "historical implementation doesn't support Python 3.13+"
102- )
103100 def test_invalid_init_finalize_job_from_sdk (self ) -> None :
101+ # Access class attributes directly to avoid capturing self in closures
102+ component_func = TestInitFinalizeJob .component_func
103+ hello_world_func = TestInitFinalizeJob .hello_world_func
104+
104105 # invalid case: job name not exists
105106 @dsl .pipeline ()
106107 def invalid_init_finalize_job_func ():
107- self . component_func ()
108+ component_func ()
108109
109110 invalid_pipeline1 = invalid_init_finalize_job_func ()
110111 invalid_pipeline1 .settings .on_init = "init_job"
@@ -120,8 +121,8 @@ def invalid_init_finalize_job_func():
120121 # invalid case: no normal node, on_init/on_finalize job is not isolated
121122 @dsl .pipeline ()
122123 def init_finalize_with_invalid_connection_func (int_param : int , str_param : str ):
123- node1 = self . hello_world_func (component_in_number = int_param , component_in_path = str_param )
124- node2 = self . hello_world_func ( # noqa: F841
124+ node1 = hello_world_func (component_in_number = int_param , component_in_path = str_param )
125+ node2 = hello_world_func ( # noqa: F841
125126 component_in_number = int_param ,
126127 component_in_path = node1 .outputs .component_out_path ,
127128 )
@@ -152,28 +153,28 @@ def init_finalize_with_invalid_connection_func(int_param: int, str_param: str):
152153 # invalid case: set on_init for pipeline component
153154 @dsl .pipeline
154155 def subgraph_func ():
155- node = self . component_func ()
156+ node = component_func ()
156157 set_pipeline_settings (on_init = node ) # set on_init for subgraph (pipeline component)
157158
158159 @dsl .pipeline
159160 def subgraph_with_init_func ():
160161 subgraph_func ()
161- self . component_func ()
162+ component_func ()
162163
163164 with pytest .raises (UserErrorException ) as e :
164165 subgraph_with_init_func ()
165166 assert str (e .value ) == "On_init/on_finalize is not supported for pipeline component."
166167
167- @pytest .mark .skipif (
168- condition = sys .version_info >= (3 , 13 ), reason = "historical implementation doesn't support Python 3.13+"
169- )
170168 def test_init_finalize_job_with_subgraph (self ) -> None :
171169 from azure .ai .ml ._internal .dsl import set_pipeline_settings
172170
171+ # Access class attributes directly to avoid capturing self in closures
172+ component_func = TestInitFinalizeJob .component_func
173+
173174 # happy path
174175 @dsl .pipeline ()
175176 def subgraph_func ():
176- node = self . component_func ()
177+ node = component_func ()
177178 node .compute = "cpu-cluster"
178179
179180 @dsl .pipeline ()
0 commit comments