diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py index 269d02b3efbd..f2d03e0e898c 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py @@ -55,7 +55,9 @@ def main(argv): with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter( known_args.fully_qualified_name_glob): - address = '0.0.0.0:{}'.format(known_args.port) + # Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback + # connections on dual-stack (IPv4/IPv6) systems. + address = 'localhost:{}'.format(known_args.port) server = grpc.server(thread_pool_executor.shared_unbounded_instance()) if known_args.serve_loopback_worker: beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( @@ -71,9 +73,15 @@ def main(argv): artifact_service.ArtifactRetrievalService( artifact_service.BeamFilesystemHandler(None).file_reader), server) - server.add_insecure_port(address) + # Ensure gRPC server successfully binds. If this fails (e.g., due to port collision), + # add_insecure_port returns 0. We raise an error to crash the subprocess immediately, + # allowing the parent process to detect it and fail fast rather than hanging. + bound_port = server.add_insecure_port(address) + if not bound_port: + raise RuntimeError( + "Failed to bind expansion service to {}".format(address)) server.start() - _LOGGER.info('Listening for expansion requests at %d', known_args.port) + _LOGGER.info('Listening for expansion requests at %d', bound_port) def cleanup(unused_signum, unused_frame): _LOGGER.info('Shutting down expansion service.') diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index d21cb486b8f4..b22e6badb5e7 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -186,45 +186,53 @@ def __exit__(self, *unused_args): self.stop() def start(self): - try: - process, endpoint = self.start_process() - wait_secs = .1 - channel_options = [ - ("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1), - # Default: 20000ms (20s), increased to 10 minutes for stability - ("grpc.keepalive_timeout_ms", 600_000), - # Default: 2, set to 0 to allow unlimited pings without data - ("grpc.http2.max_pings_without_data", 0), - # Default: False, set to True to allow keepalive pings when no calls - ("grpc.keepalive_permit_without_calls", True), - # Default: 2, set to 0 to allow unlimited ping strikes - ("grpc.http2.max_ping_strikes", 0), - # Default: 0 (disabled), enable socket reuse for better handling - ("grpc.so_reuseport", 1), - ] - self._grpc_channel = grpc.insecure_channel( - endpoint, options=channel_options) - channel_ready = grpc.channel_ready_future(self._grpc_channel) - while True: - if process is not None and process.poll() is not None: - _LOGGER.error("Started job service with %s", process.args) - raise RuntimeError( - 'Service failed to start up with error %s' % process.poll()) - try: - channel_ready.result(timeout=wait_secs) - break - except (grpc.FutureTimeoutError, grpc.RpcError): - wait_secs *= 1.2 - logging.log( - logging.WARNING if wait_secs > 1 else logging.DEBUG, - 'Waiting for grpc channel to be ready at %s.', - endpoint) - return self._stub_class(self._grpc_channel) - except: # pylint: disable=bare-except - _LOGGER.exception("Error bringing up service") - self.stop() - raise + max_attempts = 3 + for attempt in range(max_attempts): + try: + process, endpoint = self.start_process() + wait_secs = .1 + channel_options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + # Default: 20000ms (20s), increased to 10 minutes for stability + ("grpc.keepalive_timeout_ms", 600_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), + # Default: 2, set to 0 to allow unlimited ping strikes + ("grpc.http2.max_ping_strikes", 0), + # Default: 0 (disabled), enable socket reuse for better handling + ("grpc.so_reuseport", 1), + ] + self._grpc_channel = grpc.insecure_channel( + endpoint, options=channel_options) + channel_ready = grpc.channel_ready_future(self._grpc_channel) + while True: + if process is not None and process.poll() is not None: + _LOGGER.error("Started job service with %s", process.args) + raise RuntimeError( + 'Service failed to start up with error %s' % process.poll()) + try: + channel_ready.result(timeout=wait_secs) + break + except (grpc.FutureTimeoutError, grpc.RpcError): + wait_secs *= 1.2 + logging.log( + logging.WARNING if wait_secs > 1 else logging.DEBUG, + 'Waiting for grpc channel to be ready at %s.', + endpoint) + return self._stub_class(self._grpc_channel) + except Exception as e: + _LOGGER.warning( + "Error bringing up service on attempt %d: %s", + attempt + 1, + e, + exc_info=True) + self.stop() + if attempt == max_attempts - 1: + raise + time.sleep(1) def start_process(self): if self._owner_id is not None: