diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index ef900090c393..5425a3671dfc 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -23,6 +23,7 @@ import random import re import sys +import time import unittest from typing import Any from typing import Callable @@ -332,56 +333,79 @@ def create_test_method( """ @mock.patch('apache_beam.Pipeline', TestPipeline) def test_yaml_example(self): - with open(pipeline_spec_file, encoding="utf-8") as f: - lines = f.readlines() - expected_key = '# Expected:\n' - if expected_key in lines: - expected = lines[lines.index('# Expected:\n') + 1:] - else: - raise ValueError( - f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'") - for i, line in enumerate(expected): - expected[i] = line.replace('# ', '').replace('\n', '') - expected = [line for line in expected if line] - - raw_spec_string = ''.join(lines) - # Filter for any jinja preprocessor - this has to be done before other - # preprocessors. - jinja_preprocessor = [ - preprocessor for preprocessor in custom_preprocessors - if 'jinja_preprocessor' in preprocessor.__name__ - ] - if jinja_preprocessor: - jinja_preprocessor = jinja_preprocessor[0] - raw_spec_string = jinja_preprocessor( - raw_spec_string, self._testMethodName) - custom_preprocessors.remove(jinja_preprocessor) - - pipeline_spec = yaml.load( - raw_spec_string, Loader=yaml_transform.SafeLineLoader) - - with TestEnvironment() as env: - for fn in custom_preprocessors: - pipeline_spec = fn(pipeline_spec, expected, env) - with beam.Pipeline(options=PipelineOptions( - pickle_library='cloudpickle', - **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( - 'options', {})))) as p: - actual = [ - yaml_transform.expand_pipeline( - p, - pipeline_spec, - [ - yaml_provider.InlineProvider( - TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS) - ]) + def _retryable_cloud_failure(exn): + msg = str(exn).lower() + return ( + 'deadline exceeded' in msg or 'socket closed' in msg or + 'statuscode.unavailable' in msg or + 'statuscode.deadline_exceeded' in msg) + + max_retries = 3 if '-cloud' in os.environ.get('TOX_ENV_NAME', '') else 1 + for attempt in range(max_retries): + try: + with open(pipeline_spec_file, encoding="utf-8") as f: + lines = f.readlines() + expected_key = '# Expected:\n' + if expected_key in lines: + expected = lines[lines.index('# Expected:\n') + 1:] + else: + raise ValueError( + f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'" + ) + for i, line in enumerate(expected): + expected[i] = line.replace('# ', '').replace('\n', '') + expected = [line for line in expected if line] + + raw_spec_string = ''.join(lines) + preprocessors = list(custom_preprocessors) + # Filter for any jinja preprocessor - this has to be done before other + # preprocessors. + jinja_preprocessor = [ + preprocessor for preprocessor in preprocessors + if 'jinja_preprocessor' in preprocessor.__name__ ] - if not actual[0]: - actual = list(p.transforms_stack[0].parts[-1].outputs.values()) - for transform in p.transforms_stack[0].parts[:-1]: - if transform.transform.label == 'log_for_testing': - actual += list(transform.outputs.values()) - check_output(expected)(actual) + if jinja_preprocessor: + jinja_preprocessor = jinja_preprocessor[0] + raw_spec_string = jinja_preprocessor( + raw_spec_string, self._testMethodName) + preprocessors.remove(jinja_preprocessor) + + pipeline_spec = yaml.load( + raw_spec_string, Loader=yaml_transform.SafeLineLoader) + + with TestEnvironment() as env: + for fn in preprocessors: + pipeline_spec = fn(pipeline_spec, expected, env) + with beam.Pipeline(options=PipelineOptions( + pickle_library='cloudpickle', + **yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get( + 'options', {})))) as p: + actual = [ + yaml_transform.expand_pipeline( + p, + pipeline_spec, + [ + yaml_provider.InlineProvider( + TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS) + ]) + ] + if not actual[0]: + actual = list(p.transforms_stack[0].parts[-1].outputs.values()) + for transform in p.transforms_stack[0].parts[:-1]: + if transform.transform.label == 'log_for_testing': + actual += list(transform.outputs.values()) + check_output(expected)(actual) + break + except Exception as exn: + if attempt == max_retries - 1 or not _retryable_cloud_failure(exn): + raise + _LOGGER.warning( + 'Retrying transient cloud test failure for %s (attempt %s/%s): %s', + pipeline_spec_file, + attempt + 2, + max_retries, + exn) + time.sleep(5 * (attempt + 1)) def _python_deps_involved(spec_filename): return any( diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 093e68cb9c7d..354772c21653 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -32,6 +32,7 @@ import subprocess import sys import tempfile +import time import urllib.parse import warnings from collections.abc import Callable @@ -1387,7 +1388,15 @@ def _create_venv_from_clone( # Avoid hard dependency for environments where this is never used. import clonevirtualenv clonable_venv = cls._create_venv_to_clone(base_python) - clonevirtualenv.clone_virtualenv(clonable_venv, venv) + for attempt in range(3): + try: + clonevirtualenv.clone_virtualenv(clonable_venv, venv) + break + except shutil.Error: + if attempt == 2: + raise + shutil.rmtree(venv, ignore_errors=True) + time.sleep(1) venv_pip = os.path.join(venv, 'bin', 'pip') # Issue warning when installing packages from PyPI _LOGGER.warning( @@ -1412,13 +1421,7 @@ def _create_venv_from_clone( @classmethod def _create_venv_to_clone(cls, base_python: str) -> str: - # For '.dev', the default clone source is the venv that owns base_python. - # In CI that is often the active tox/sandbox tree; clonevirtualenv can - # race with ephemeral paths (tmp/, caches) under that tree. Use the - # scratch clonable venv in CI instead. Locally, keep cloning the dev venv - # for speed. - _ci = os.environ.get('CI', '').lower() in ('true', '1', 'yes') - if '.dev' in beam_version and not _ci: + if '.dev' in beam_version: base_venv = os.path.dirname(os.path.dirname(base_python)) print('Cloning dev environment from', base_venv) return base_venv diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py index e1e3ee847d96..931c90322acb 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py @@ -295,7 +295,6 @@ def test_env_content_sensitive(self): after = yaml_provider.PypiExpansionService._key('base', [pkg]) self.assertNotEqual(before, after) - class JoinUrlOrFilepathTest(unittest.TestCase): def test_join_url_relative_path(self): self.assertEqual(