Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def main(argv):
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):

address = '0.0.0.0:{}'.format(known_args.port)
# Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback
# connections on dual-stack (IPv4/IPv6) systems.
address = 'localhost:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
Expand All @@ -71,9 +73,15 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
server.add_insecure_port(address)
# Ensure gRPC server successfully binds. If this fails (e.g., due to port collision),
# add_insecure_port returns 0. We raise an error to crash the subprocess immediately,
# allowing the parent process to detect it and fail fast rather than hanging.
bound_port = server.add_insecure_port(address)
if not bound_port:
raise RuntimeError(
"Failed to bind expansion service to {}".format(address))
server.start()
_LOGGER.info('Listening for expansion requests at %d', known_args.port)
_LOGGER.info('Listening for expansion requests at %d', bound_port)

def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
Expand Down
86 changes: 47 additions & 39 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,53 @@ def __exit__(self, *unused_args):
self.stop()

def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Started job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except: # pylint: disable=bare-except
_LOGGER.exception("Error bringing up service")
self.stop()
raise
max_attempts = 3
for attempt in range(max_attempts):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Started job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except Exception as e:
_LOGGER.warning(
"Error bringing up service on attempt %d: %s",
attempt + 1,
e,
exc_info=True)
self.stop()
if attempt == max_attempts - 1:
raise
Comment thread
shunping marked this conversation as resolved.
time.sleep(1)

def start_process(self):
if self._owner_id is not None:
Expand Down
Loading