Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 73 additions & 49 deletions sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import random
import re
import sys
import time
import unittest
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -332,56 +333,79 @@ def create_test_method(
"""
@mock.patch('apache_beam.Pipeline', TestPipeline)
def test_yaml_example(self):
with open(pipeline_spec_file, encoding="utf-8") as f:
lines = f.readlines()
expected_key = '# Expected:\n'
if expected_key in lines:
expected = lines[lines.index('# Expected:\n') + 1:]
else:
raise ValueError(
f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'")
for i, line in enumerate(expected):
expected[i] = line.replace('# ', '').replace('\n', '')
expected = [line for line in expected if line]

raw_spec_string = ''.join(lines)
# Filter for any jinja preprocessor - this has to be done before other
# preprocessors.
jinja_preprocessor = [
preprocessor for preprocessor in custom_preprocessors
if 'jinja_preprocessor' in preprocessor.__name__
]
if jinja_preprocessor:
jinja_preprocessor = jinja_preprocessor[0]
raw_spec_string = jinja_preprocessor(
raw_spec_string, self._testMethodName)
custom_preprocessors.remove(jinja_preprocessor)

pipeline_spec = yaml.load(
raw_spec_string, Loader=yaml_transform.SafeLineLoader)

with TestEnvironment() as env:
for fn in custom_preprocessors:
pipeline_spec = fn(pipeline_spec, expected, env)
with beam.Pipeline(options=PipelineOptions(
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
actual = [
yaml_transform.expand_pipeline(
p,
pipeline_spec,
[
yaml_provider.InlineProvider(
TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
])
def _retryable_cloud_failure(exn):
msg = str(exn).lower()
return (
'deadline exceeded' in msg or 'socket closed' in msg or
'statuscode.unavailable' in msg or
'statuscode.deadline_exceeded' in msg)

max_retries = 3 if '-cloud' in os.environ.get('TOX_ENV_NAME', '') else 1
for attempt in range(max_retries):
try:
with open(pipeline_spec_file, encoding="utf-8") as f:
lines = f.readlines()
expected_key = '# Expected:\n'
if expected_key in lines:
expected = lines[lines.index('# Expected:\n') + 1:]
else:
raise ValueError(
f"Missing '# Expected:' tag in example file '{pipeline_spec_file}'"
)
for i, line in enumerate(expected):
expected[i] = line.replace('# ', '').replace('\n', '')
expected = [line for line in expected if line]

raw_spec_string = ''.join(lines)
preprocessors = list(custom_preprocessors)
# Filter for any jinja preprocessor - this has to be done before other
# preprocessors.
jinja_preprocessor = [
preprocessor for preprocessor in preprocessors
if 'jinja_preprocessor' in preprocessor.__name__
]
if not actual[0]:
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
for transform in p.transforms_stack[0].parts[:-1]:
if transform.transform.label == 'log_for_testing':
actual += list(transform.outputs.values())
check_output(expected)(actual)
if jinja_preprocessor:
jinja_preprocessor = jinja_preprocessor[0]
raw_spec_string = jinja_preprocessor(
raw_spec_string, self._testMethodName)
preprocessors.remove(jinja_preprocessor)

pipeline_spec = yaml.load(
raw_spec_string, Loader=yaml_transform.SafeLineLoader)

with TestEnvironment() as env:
for fn in preprocessors:
pipeline_spec = fn(pipeline_spec, expected, env)
with beam.Pipeline(options=PipelineOptions(
pickle_library='cloudpickle',
**yaml_transform.SafeLineLoader.strip_metadata(pipeline_spec.get(
'options', {})))) as p:
actual = [
yaml_transform.expand_pipeline(
p,
pipeline_spec,
[
yaml_provider.InlineProvider(
TEST_PROVIDERS, INPUT_TRANSFORM_TEST_PROVIDERS)
])
]
if not actual[0]:
actual = list(p.transforms_stack[0].parts[-1].outputs.values())
for transform in p.transforms_stack[0].parts[:-1]:
if transform.transform.label == 'log_for_testing':
actual += list(transform.outputs.values())
check_output(expected)(actual)
break
except Exception as exn:
if attempt == max_retries - 1 or not _retryable_cloud_failure(exn):
raise
_LOGGER.warning(
'Retrying transient cloud test failure for %s (attempt %s/%s): %s',
pipeline_spec_file,
attempt + 2,
max_retries,
exn)
time.sleep(5 * (attempt + 1))

def _python_deps_involved(spec_filename):
return any(
Expand Down
19 changes: 11 additions & 8 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import subprocess
import sys
import tempfile
import time
import urllib.parse
import warnings
from collections.abc import Callable
Expand Down Expand Up @@ -1387,7 +1388,15 @@ def _create_venv_from_clone(
# Avoid hard dependency for environments where this is never used.
import clonevirtualenv
clonable_venv = cls._create_venv_to_clone(base_python)
clonevirtualenv.clone_virtualenv(clonable_venv, venv)
for attempt in range(3):
try:
clonevirtualenv.clone_virtualenv(clonable_venv, venv)
break
except shutil.Error:
if attempt == 2:
raise
shutil.rmtree(venv, ignore_errors=True)
time.sleep(1)
venv_pip = os.path.join(venv, 'bin', 'pip')
# Issue warning when installing packages from PyPI
_LOGGER.warning(
Expand All @@ -1412,13 +1421,7 @@ def _create_venv_from_clone(

@classmethod
def _create_venv_to_clone(cls, base_python: str) -> str:
# For '.dev', the default clone source is the venv that owns base_python.
# In CI that is often the active tox/sandbox tree; clonevirtualenv can
# race with ephemeral paths (tmp/, caches) under that tree. Use the
# scratch clonable venv in CI instead. Locally, keep cloning the dev venv
# for speed.
_ci = os.environ.get('CI', '').lower() in ('true', '1', 'yes')
if '.dev' in beam_version and not _ci:
if '.dev' in beam_version:
base_venv = os.path.dirname(os.path.dirname(base_python))
print('Cloning dev environment from', base_venv)
return base_venv
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ def test_env_content_sensitive(self):
after = yaml_provider.PypiExpansionService._key('base', [pkg])
self.assertNotEqual(before, after)


class JoinUrlOrFilepathTest(unittest.TestCase):
def test_join_url_relative_path(self):
self.assertEqual(
Expand Down
Loading