From 47b019ce8178fd0be533e019208f1b4a87a78b5a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 25 May 2026 20:36:09 -0400 Subject: [PATCH 1/6] Add fallback platform for stager. --- .../apache_beam/runners/portability/stager.py | 67 ++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 136c320da009..729ed00899eb 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -88,6 +88,14 @@ # One of the choices for user to use for requirements cache during staging SKIP_REQUIREMENTS_CACHE = 'skip' +# Ordered list of manylinux tags from newest (strictest) to oldest (most compatible) +# used for cross-platform binary dependency downloads. +_MANYLINUX_PLATFORMS = [ + 'manylinux_2_28_x86_64', + 'manylinux2014_x86_64', # equivalent to manylinux_2_17 + 'manylinux2010_x86_64', # equivalent to manylinux_2_12 +] + _LOGGER = logging.getLogger(__name__) @@ -762,15 +770,13 @@ def _populate_requirements_cache( # Download to a temporary directory first, then copy to cache. # This allows us to track exactly which packages are needed for this # requirements file. - download_dir = tempfile.mkdtemp(dir=temp_directory) + download_dir = None cmd_args = [ Stager._get_python_executable(), '-m', 'pip', 'download', - '--dest', - download_dir, '--find-links', cache_dir, '-r', @@ -781,23 +787,54 @@ def _populate_requirements_cache( ] if populate_cache_with_sdists: - cmd_args.extend(['--no-binary', ':all:']) + download_dir = tempfile.mkdtemp(dir=temp_directory) + cmd_args.extend(['--dest', download_dir, '--no-binary', ':all:']) + _LOGGER.info('Executing command: %s', cmd_args) + processes.check_output(cmd_args, stderr=processes.STDOUT) else: language_implementation_tag = 'cp' abi_suffix = 'm' if sys.version_info < (3, 8) else '' abi_tag = 'cp%d%d%s' % ( sys.version_info[0], sys.version_info[1], abi_suffix) - platform_tag = Stager._get_platform_for_default_sdk_container() - cmd_args.extend([ - '--implementation', - language_implementation_tag, - '--abi', - abi_tag, - '--platform', - platform_tag - ]) - _LOGGER.info('Executing command: %s', cmd_args) - processes.check_output(cmd_args, stderr=processes.STDOUT) + preferred_platform = Stager._get_platform_for_default_sdk_container() + + # Fallback platform tags in case the preferred modern tag is too strict + # for some dependencies on PyPI. + try: + start_idx = _MANYLINUX_PLATFORMS.index(preferred_platform) + platforms = _MANYLINUX_PLATFORMS[start_idx:] + except ValueError: + platforms = [preferred_platform] + + last_exception = None + for platform in platforms: + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + attempt_cmd_args = cmd_args + [ + '--dest', + attempt_download_dir, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform + ] + _LOGGER.info('Executing command: %s', attempt_cmd_args) + try: + processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) + download_dir = attempt_download_dir + last_exception = None + break + except Exception as e: + _LOGGER.warning( + 'Pip download failed with platform %s, trying fallback: %s', + platform, + e) + shutil.rmtree(attempt_download_dir) + last_exception = e + + if last_exception: + raise last_exception # Get list of downloaded packages and copy them to the cache downloaded_packages = set() From 9e422d196965bcd658706fdbf4ac6f32751dc084 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 10:20:38 -0400 Subject: [PATCH 2/6] Force wheel if we have other platform to try --- sdks/python/apache_beam/runners/portability/stager.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 729ed00899eb..b587d6a8a1d1 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -807,7 +807,7 @@ def _populate_requirements_cache( platforms = [preferred_platform] last_exception = None - for platform in platforms: + for idx, platform in enumerate(platforms): attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) attempt_cmd_args = cmd_args + [ '--dest', @@ -819,6 +819,10 @@ def _populate_requirements_cache( '--platform', platform ] + # Force binary wheel only if we have more platform fallbacks to try + if idx < len(platforms) - 1: + attempt_cmd_args.extend(['--only-binary', ':all:']) + _LOGGER.info('Executing command: %s', attempt_cmd_args) try: processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) From bb20201d69601aaa2d6cef92cd4b48e6352e5109 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 10:45:25 -0400 Subject: [PATCH 3/6] More comments. --- sdks/python/apache_beam/runners/portability/stager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index b587d6a8a1d1..1cbde2d72f80 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -819,7 +819,9 @@ def _populate_requirements_cache( '--platform', platform ] - # Force binary wheel only if we have more platform fallbacks to try + # Force binary wheel only if we have more platform fallbacks to try. + # For the last platform, we omit this flag so it can natively fall back + # to downloading a source distribution (sdist) if no matching wheel is found. if idx < len(platforms) - 1: attempt_cmd_args.extend(['--only-binary', ':all:']) From 09c1dfd49ed7bd64b2142da9171c3a1eaa4bb690 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 11:22:28 -0400 Subject: [PATCH 4/6] Trigger PostCommit Python --- .github/trigger_files/beam_PostCommit_Python.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 91226bd08ee3..931ae76b69d5 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", "pr": "38069", - "modification": 41 + "modification": 42 } From 5ecec1bfc966d56edda46cf45268c091c6590504 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 15:40:18 -0400 Subject: [PATCH 5/6] Refactor code. --- .../apache_beam/runners/portability/stager.py | 48 +++++-------------- 1 file changed, 11 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 1cbde2d72f80..846e7fc0b20e 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -89,11 +89,13 @@ SKIP_REQUIREMENTS_CACHE = 'skip' # Ordered list of manylinux tags from newest (strictest) to oldest (most compatible) -# used for cross-platform binary dependency downloads. +# paired with the minimum pip version required to support the tag. +# See https://github.com/pypa/manylinux. _MANYLINUX_PLATFORMS = [ - 'manylinux_2_28_x86_64', - 'manylinux2014_x86_64', # equivalent to manylinux_2_17 - 'manylinux2010_x86_64', # equivalent to manylinux_2_12 + ('manylinux_2_28_x86_64', '20.3'), + ('manylinux2014_x86_64', '19.3'), # equivalent to manylinux_2_17 + ('manylinux2010_x86_64', + '0.0'), # equivalent to manylinux_2_12, the fallback if pip is too old ] _LOGGER = logging.getLogger(__name__) @@ -725,30 +727,6 @@ def _extract_local_packages(requirements_file): else: return [], requirements_file - @staticmethod - def _get_platform_for_default_sdk_container(): - """ - Get the platform for apache beam SDK container based on Pip version. - - Note: pip is still expected to download compatible wheel of a package - with platform tag manylinux1 if the package on PyPI doesn't - have (manylinux2014) or (manylinux2010) wheels. - Reference: https://www.python.org/dev/peps/pep-0599/#id21 - """ - - # TODO(anandinguva): When https://github.com/pypa/pip/issues/10760 is - # addressed, download wheel based on glibc version in Beam's Python - # Base image - pip_version = distribution('pip').version - # See more information about manylinux at - # https://github.com/pypa/manylinux - if version.parse(pip_version) >= version.parse('20.3'): - return 'manylinux_2_28_x86_64' - elif version.parse(pip_version) >= version.parse('19.3'): - return 'manylinux2014_x86_64' - else: - return 'manylinux2010_x86_64' - @staticmethod @retry.with_exponential_backoff( num_retries=4, retry_filter=retry_on_non_zero_exit) @@ -796,15 +774,11 @@ def _populate_requirements_cache( abi_suffix = 'm' if sys.version_info < (3, 8) else '' abi_tag = 'cp%d%d%s' % ( sys.version_info[0], sys.version_info[1], abi_suffix) - preferred_platform = Stager._get_platform_for_default_sdk_container() - - # Fallback platform tags in case the preferred modern tag is too strict - # for some dependencies on PyPI. - try: - start_idx = _MANYLINUX_PLATFORMS.index(preferred_platform) - platforms = _MANYLINUX_PLATFORMS[start_idx:] - except ValueError: - platforms = [preferred_platform] + pip_version = distribution('pip').version + platforms = [ + platform for platform, min_pip_version in _MANYLINUX_PLATFORMS + if version.parse(pip_version) >= version.parse(min_pip_version) + ] last_exception = None for idx, platform in enumerate(platforms): From a435fbd81daef2d2c598e7a22887f07b1d9d3e52 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Tue, 26 May 2026 15:49:16 -0400 Subject: [PATCH 6/6] Refactor more. --- .../apache_beam/runners/portability/stager.py | 93 ++++++++++--------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 846e7fc0b20e..ade7cd81e113 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -770,51 +770,7 @@ def _populate_requirements_cache( _LOGGER.info('Executing command: %s', cmd_args) processes.check_output(cmd_args, stderr=processes.STDOUT) else: - language_implementation_tag = 'cp' - abi_suffix = 'm' if sys.version_info < (3, 8) else '' - abi_tag = 'cp%d%d%s' % ( - sys.version_info[0], sys.version_info[1], abi_suffix) - pip_version = distribution('pip').version - platforms = [ - platform for platform, min_pip_version in _MANYLINUX_PLATFORMS - if version.parse(pip_version) >= version.parse(min_pip_version) - ] - - last_exception = None - for idx, platform in enumerate(platforms): - attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) - attempt_cmd_args = cmd_args + [ - '--dest', - attempt_download_dir, - '--implementation', - language_implementation_tag, - '--abi', - abi_tag, - '--platform', - platform - ] - # Force binary wheel only if we have more platform fallbacks to try. - # For the last platform, we omit this flag so it can natively fall back - # to downloading a source distribution (sdist) if no matching wheel is found. - if idx < len(platforms) - 1: - attempt_cmd_args.extend(['--only-binary', ':all:']) - - _LOGGER.info('Executing command: %s', attempt_cmd_args) - try: - processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) - download_dir = attempt_download_dir - last_exception = None - break - except Exception as e: - _LOGGER.warning( - 'Pip download failed with platform %s, trying fallback: %s', - platform, - e) - shutil.rmtree(attempt_download_dir) - last_exception = e - - if last_exception: - raise last_exception + download_dir = Stager._download_pypi_packages(cmd_args, temp_directory) # Get list of downloaded packages and copy them to the cache downloaded_packages = set() @@ -828,6 +784,53 @@ def _populate_requirements_cache( return downloaded_packages + @staticmethod + def _download_pypi_packages(cmd_args, temp_directory): + language_implementation_tag = 'cp' + abi_suffix = 'm' if sys.version_info < (3, 8) else '' + abi_tag = 'cp%d%d%s' % ( + sys.version_info[0], sys.version_info[1], abi_suffix) + pip_version = distribution('pip').version + platforms = [ + platform for platform, min_pip_version in _MANYLINUX_PLATFORMS + if version.parse(pip_version) >= version.parse(min_pip_version) + ] + + last_exception = None + for idx, platform in enumerate(platforms): + attempt_download_dir = tempfile.mkdtemp(dir=temp_directory) + attempt_cmd_args = cmd_args + [ + '--dest', + attempt_download_dir, + '--implementation', + language_implementation_tag, + '--abi', + abi_tag, + '--platform', + platform + ] + # Force binary wheel only if we have more platform fallbacks to try. + # For the last platform, we omit this flag so it can natively fall back + # to downloading a source distribution (sdist) if no matching wheel is found. + if idx < len(platforms) - 1: + attempt_cmd_args.extend(['--only-binary', ':all:']) + + _LOGGER.info('Executing command: %s', attempt_cmd_args) + try: + processes.check_output(attempt_cmd_args, stderr=processes.STDOUT) + last_exception = None + return attempt_download_dir + except Exception as e: + _LOGGER.warning( + 'Pip download failed with platform %s, trying fallback: %s', + platform, + e) + shutil.rmtree(attempt_download_dir) + last_exception = e + + if last_exception: + raise last_exception + @staticmethod def _build_setup_package( setup_file: str,