From 80ddaf5fc0eaaad60cdfd062407d95927d15e960 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Mon, 19 Jan 2026 00:01:08 +0530 Subject: [PATCH 01/17] Fix WriteToPubSub to pass ordering_key to publish() method Fixes #36201 --- sdks/python/apache_beam/io/gcp/pubsub.py | 16 ++++- .../io/gcp/pubsub_integration_test.py | 45 ++++++++++++ sdks/python/apache_beam/io/gcp/pubsub_test.py | 68 ++++++++++++++++++- 3 files changed, 124 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 276103f52760..4b04d55654fa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -656,12 +656,22 @@ def _flush(self): # Deserialize the protobuf to get the original PubsubMessage pubsub_msg = PubsubMessage._from_proto_str(elem) - # Publish with the correct data and attributes + # Publish with the correct data, attributes, and ordering_key if self.with_attributes and pubsub_msg.attributes: future = self._pub_client.publish( - self._topic, pubsub_msg.data, **pubsub_msg.attributes) + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key + if pubsub_msg.ordering_key else '', + **pubsub_msg.attributes) else: - future = self._pub_client.publish(self._topic, pubsub_msg.data) + if pubsub_msg.ordering_key: + future = self._pub_client.publish( + self._topic, + pubsub_msg.data, + ordering_key=pubsub_msg.ordering_key) + else: + future = self._pub_client.publish(self._topic, pubsub_msg.data) futures.append(future) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 8387fe734fc1..cf8323c45187 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -305,6 +305,51 @@ def test_batch_write_with_attributes(self): """Test WriteToPubSub in batch mode with attributes.""" self._test_batch_write(with_attributes=True) + @pytest.mark.it_postcommit + def test_batch_write_with_ordering_key(self): + """Test WriteToPubSub in batch mode with ordering keys.""" + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + + # Create test messages with ordering keys + test_messages = [ + PubsubMessage( + b'order_data001', {'attr': 'value1'}, ordering_key='key1'), + PubsubMessage( + b'order_data002', {'attr': 'value2'}, ordering_key='key1'), + PubsubMessage( + b'order_data003', {'attr': 'value3'}, ordering_key='key2') + ] + + pipeline_options = PipelineOptions() + pipeline_options.view_as(StandardOptions).streaming = False + + with TestPipeline(options=pipeline_options) as p: + messages = p | 'CreateMessages' >> Create(test_messages) + _ = messages | 'WriteToPubSub' >> WriteToPubSub( + self.output_topic.name, with_attributes=True) + + # Verify messages were published + time.sleep(10) + + response = self.sub_client.pull( + request={ + "subscription": self.output_sub.name, + "max_messages": 10, + }) + + self.assertEqual(len(response.received_messages), len(test_messages)) + + # Verify ordering keys were preserved + for received_message in response.received_messages: + self.assertIn('ordering_key', dir(received_message.message)) + self.sub_client.acknowledge( + request={ + "subscription": self.output_sub.name, + "ack_ids": [received_message.ack_id], + }) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 5650e920e635..e91942e454aa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -23,10 +23,9 @@ import logging import unittest +import apache_beam as beam import hamcrest as hc import mock - -import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write @@ -1098,6 +1097,71 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): Lineage.query(p.result.metrics(), Lineage.SINK), set(["pubsub:topic:fakeprj.a_topic"])) + def test_write_messages_with_ordering_key(self, mock_pubsub): + """Test WriteToPubSub with ordering_key in messages.""" + data = b'data' + ordering_key = 'order-123' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed as a keyword argument + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_with_ordering_key_no_attributes(self, mock_pubsub): + """Test WriteToPubSub with ordering_key but no attributes.""" + data = b'data' + ordering_key = 'order-456' + payloads = [PubsubMessage(data, None, ordering_key=ordering_key)] + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called with ordering_key + mock_pubsub.return_value.publish.assert_called() + call_args = mock_pubsub.return_value.publish.call_args + + # Check that ordering_key was passed + self.assertIn('ordering_key', call_args.kwargs) + self.assertEqual(call_args.kwargs['ordering_key'], ordering_key) + + def test_write_messages_without_ordering_key(self, mock_pubsub): + """Test WriteToPubSub without ordering_key (backward compatibility).""" + data = b'data' + attributes = {'key': 'value'} + payloads = [PubsubMessage(data, attributes)] # No ordering_key + + options = PipelineOptions([]) + options.view_as(StandardOptions).streaming = True + with TestPipeline(options=options) as p: + _ = ( + p + | Create(payloads) + | WriteToPubSub( + 'projects/fakeprj/topics/a_topic', with_attributes=True)) + + # Verify that publish was called + mock_pubsub.return_value.publish.assert_called() + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) From 32e94b130161efb3a25edced3de42471462bdf0c Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:42:47 +0530 Subject: [PATCH 02/17] Update pubsub_test.py --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index e91942e454aa..09abf8de6353 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -23,9 +23,9 @@ import logging import unittest -import apache_beam as beam import hamcrest as hc import mock +import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read from apache_beam.io import Write From 9bcffae8a835aed72fbf5646dc0fa2c6177af223 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Mon, 19 Jan 2026 11:44:59 +0530 Subject: [PATCH 03/17] Update pubsub_test.py --- sdks/python/apache_beam/io/gcp/pubsub_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index 09abf8de6353..14b361ae45fa 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -25,6 +25,7 @@ import hamcrest as hc import mock + import apache_beam as beam from apache_beam import Pipeline from apache_beam.io import Read From 354100a931b15f1b2994199f1c1a129d960aecee Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Tue, 20 Jan 2026 11:12:52 +0530 Subject: [PATCH 04/17] Trigger CI rerun From ffd8765cc3d2a470d21d557a108e54b5e53849b8 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Tue, 20 Jan 2026 12:22:04 +0530 Subject: [PATCH 05/17] Retry CI (flake) From 7fd0ea151377d826040d7abb636905bb55302bb9 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 25 Mar 2026 20:08:47 +0530 Subject: [PATCH 06/17] Apply yapf formatting --- .../apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java | 8 ++++++++ .../org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 5 ++++- sdks/python/apache_beam/io/external/gcp/pubsub.py | 12 ++++++++++-- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 658d1fc29e32..01e258f5f3cb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -53,6 +53,7 @@ public static class Configuration { private String topic; private @Nullable String idAttribute; private @Nullable String timestampAttribute; + private boolean publishWithOrderingKey = false; public void setTopic(String topic) { this.topic = topic; @@ -65,6 +66,10 @@ public void setIdLabel(@Nullable String idAttribute) { public void setTimestampAttribute(@Nullable String timestampAttribute) { this.timestampAttribute = timestampAttribute; } + + public void setPublishWithOrderingKey(Boolean publishWithOrderingKey) { + this.publishWithOrderingKey = publishWithOrderingKey != null && publishWithOrderingKey; + } } public static class WriteBuilder @@ -85,6 +90,9 @@ public PTransform, PDone> buildExternal(Configuration config if (config.timestampAttribute != null) { writeBuilder.setTimestampAttribute(config.timestampAttribute); } + if (config.publishWithOrderingKey) { + writeBuilder.setPublishWithOrderingKey(true); + } writeBuilder.setDynamicDestinations(false); return writeBuilder.build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index d62d294ed2a7..57005745044b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -1727,7 +1727,10 @@ public void startBundle(StartBundleContext c) throws IOException { this.pubsubClient = getPubsubClientFactory() .newClient( - getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class)); + getTimestampAttribute(), + null, + c.getPipelineOptions().as(PubsubOptions.class), + Write.this.getPubsubRootUrl()); } @ProcessElement diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index a2a3430f9a1a..dc171bb422b2 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -117,6 +117,7 @@ def expand(self, pbegin): # this is not implemented yet on the Java side: # ('with_attributes', bool), ('timestamp_attribute', typing.Optional[str]), + ('publish_with_ordering_key', bool), ]) @@ -135,6 +136,7 @@ def __init__( with_attributes=False, id_label=None, timestamp_attribute=None, + publish_with_ordering_key=False, expansion_service=None): """Initializes ``WriteToPubSub``. @@ -150,18 +152,24 @@ def __init__( in a ReadFromPubSub PTransform to deduplicate messages. timestamp_attribute: If set, will set an attribute for each Cloud Pub/Sub message with the given name and the message's publish time as the value. + publish_with_ordering_key: If True, enables ordering key support when + publishing messages. The ordering key must be set on each + PubsubMessage via the ``ordering_key`` attribute. Requires + messages to be routed to the same region. """ self.params = WriteToPubsubSchema( topic=topic, id_label=id_label, # with_attributes=with_attributes, - timestamp_attribute=timestamp_attribute) + timestamp_attribute=timestamp_attribute, + publish_with_ordering_key=publish_with_ordering_key) self.expansion_service = expansion_service self.with_attributes = with_attributes def expand(self, pvalue): if self.with_attributes: - pcoll = pvalue | 'ToProto' >> Map(pubsub.WriteToPubSub.to_proto_str) + pcoll = pvalue | 'ToProto' >> Map( + lambda m: m._to_proto_str(for_publish=True)) else: pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) From 26a34f26e6296d92b6ac0e0207e5a65e87024a7a Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 1 Apr 2026 22:52:38 +0530 Subject: [PATCH 07/17] Address review comments: use message_to_proto_str and skip ordering key test on Dataflow --- sdks/python/apache_beam/io/external/gcp/pubsub.py | 5 ++--- .../apache_beam/io/gcp/pubsub_integration_test.py | 13 ++++++++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py index dc171bb422b2..3125c0422275 100644 --- a/sdks/python/apache_beam/io/external/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py @@ -154,8 +154,7 @@ def __init__( message with the given name and the message's publish time as the value. publish_with_ordering_key: If True, enables ordering key support when publishing messages. The ordering key must be set on each - PubsubMessage via the ``ordering_key`` attribute. Requires - messages to be routed to the same region. + PubsubMessage via the ``ordering_key`` attribute. """ self.params = WriteToPubsubSchema( topic=topic, @@ -169,7 +168,7 @@ def __init__( def expand(self, pvalue): if self.with_attributes: pcoll = pvalue | 'ToProto' >> Map( - lambda m: m._to_proto_str(for_publish=True)) + pubsub.WriteToPubSub.message_to_proto_str) else: pcoll = pvalue | 'ToProto' >> Map( lambda x: pubsub.PubsubMessage(x, {})._to_proto_str()) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index cf8323c45187..a0d2fe49897c 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -307,7 +307,18 @@ def test_batch_write_with_attributes(self): @pytest.mark.it_postcommit def test_batch_write_with_ordering_key(self): - """Test WriteToPubSub in batch mode with ordering keys.""" + """Test WriteToPubSub in batch mode with ordering keys. + This test only applies to the Direct Runner. The ordering key fix in + _PubSubWriteDoFn._flush() does not apply to Dataflow, which uses its own + internal implementation. Dataflow users should use the XLang WriteToPubSub + path instead. + """ + if self.runner_name == 'TestDataflowRunner': + self.skipTest( + 'Ordering key support via _flush() is not applicable to Dataflow. ' + 'Use the XLang WriteToPubSub path for ordering key support on ' + 'Dataflow.') + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create From a94d896ff08bc7fb9bb68bce107dce6b1335ccf3 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 1 Apr 2026 23:11:54 +0530 Subject: [PATCH 08/17] Add Dataflow warning in WriteToPubSub.expand() for ordering key support --- sdks/python/apache_beam/io/gcp/pubsub.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 4b04d55654fa..6e8f8bba662a 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -430,7 +430,17 @@ def bytes_to_proto_str(element: Union[bytes, str]) -> bytes: def expand(self, pcoll): # Store pipeline options for use in DoFn self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None - + # Warn Dataflow users to use the XLang path for ordering key support, + # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation. + import logging + runner = self.pipeline_options.get_all_options().get( + 'runner', '') if self.pipeline_options else '' + if 'Dataflow' in str(runner): + logging.warning( + 'WriteToPubSub ordering_key support is not available on Dataflow ' + 'via this transform. Use the XLang WriteToPubSub path instead: ' + 'apache_beam.io.external.gcp.pubsub.WriteToPubSub with ' + 'publish_with_ordering_key=True.') if self.with_attributes: pcoll = pcoll | 'ToProtobufX' >> ParDo( _AddMetricsAndMap( From 2584b11aec02bce6e29cd3c3922b5f435829f0ee Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:30:15 +0530 Subject: [PATCH 09/17] Update pubsub_integration_test.py --- .../apache_beam/io/gcp/pubsub_integration_test.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index a0d2fe49897c..2fd86b50cdb3 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -308,16 +308,18 @@ def test_batch_write_with_attributes(self): @pytest.mark.it_postcommit def test_batch_write_with_ordering_key(self): """Test WriteToPubSub in batch mode with ordering keys. - This test only applies to the Direct Runner. The ordering key fix in - _PubSubWriteDoFn._flush() does not apply to Dataflow, which uses its own - internal implementation. Dataflow users should use the XLang WriteToPubSub + Dataflow Native PubSub Sink does not support ordering_key, therefore this + test only applies to runners that use Beam's Python WriteToPubSub Sink. + To use ordering_key, Dataflow users should use the XLang WriteToPubSub path instead. """ if self.runner_name == 'TestDataflowRunner': self.skipTest( - 'Ordering key support via _flush() is not applicable to Dataflow. ' - 'Use the XLang WriteToPubSub path for ordering key support on ' - 'Dataflow.') + 'Dataflow Native PubSub Sink does not support ordering_key ' + '(see https://github.com/apache/beam/issues/36201), therefore ' + 'this test only applies to runners that use Beam\'s Python ' + 'WriteToPubSub Sink. To use ordering_key, Dataflow users should ' + 'use the XLang WriteToPubSub path instead.') from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions From 67f18f04b974e1b0c3eff427c48df87e81d2f177 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Sat, 11 Apr 2026 20:19:05 +0530 Subject: [PATCH 10/17] Update PR and modification in beam_PostCommit_Python.json Updated PR number and modification count. --- .github/trigger_files/beam_PostCommit_Python.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index 91226bd08ee3..bc43f6124961 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "pr": "38069", + "pr": "37345", "modification": 41 -} +} From 81111408bb2de457fa329ece379752227717ed83 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Tue, 14 Apr 2026 12:22:28 +0530 Subject: [PATCH 11/17] Update pubsub_integration_test.py --- .../io/gcp/pubsub_integration_test.py | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 2fd86b50cdb3..63edc54fb501 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -308,7 +308,9 @@ def test_batch_write_with_attributes(self): @pytest.mark.it_postcommit def test_batch_write_with_ordering_key(self): """Test WriteToPubSub in batch mode with ordering keys. - Dataflow Native PubSub Sink does not support ordering_key, therefore this + + Dataflow Native PubSub Sink does not support ordering_key + (https://github.com/apache/beam/issues/36201), therefore this test only applies to runners that use Beam's Python WriteToPubSub Sink. To use ordering_key, Dataflow users should use the XLang WriteToPubSub path instead. @@ -325,44 +327,62 @@ def test_batch_write_with_ordering_key(self): from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create - # Create test messages with ordering keys - test_messages = [ - PubsubMessage( - b'order_data001', {'attr': 'value1'}, ordering_key='key1'), - PubsubMessage( - b'order_data002', {'attr': 'value2'}, ordering_key='key1'), - PubsubMessage( - b'order_data003', {'attr': 'value3'}, ordering_key='key2') - ] - - pipeline_options = PipelineOptions() - pipeline_options.view_as(StandardOptions).streaming = False + ordering_topic = self.pub_client.create_topic( + name=self.pub_client.topic_path( + self.project, 'psit_topic_ordering' + self.uuid)) + ordering_sub = self.sub_client.create_subscription( + name=self.sub_client.subscription_path( + self.project, 'psit_sub_ordering' + self.uuid), + topic=ordering_topic.name, + enable_message_ordering=True) + time.sleep(10) - with TestPipeline(options=pipeline_options) as p: - messages = p | 'CreateMessages' >> Create(test_messages) - _ = messages | 'WriteToPubSub' >> WriteToPubSub( - self.output_topic.name, with_attributes=True) + try: + test_messages = [ + PubsubMessage( + b'order_data001', {'attr': 'value1'}, ordering_key='key1'), + PubsubMessage( + b'order_data002', {'attr': 'value2'}, ordering_key='key1'), + PubsubMessage( + b'order_data003', {'attr': 'value3'}, ordering_key='key2') + ] - # Verify messages were published - time.sleep(10) + pipeline_options = PipelineOptions() + pipeline_options.view_as(StandardOptions).streaming = False - response = self.sub_client.pull( - request={ - "subscription": self.output_sub.name, - "max_messages": 10, - }) + with TestPipeline(options=pipeline_options) as p: + messages = p | 'CreateMessages' >> Create(test_messages) + _ = messages | 'WriteToPubSub' >> WriteToPubSub( + ordering_topic.name, with_attributes=True) - self.assertEqual(len(response.received_messages), len(test_messages)) + time.sleep(10) - # Verify ordering keys were preserved - for received_message in response.received_messages: - self.assertIn('ordering_key', dir(received_message.message)) - self.sub_client.acknowledge( + response = self.sub_client.pull( request={ - "subscription": self.output_sub.name, - "ack_ids": [received_message.ack_id], + "subscription": ordering_sub.name, + "max_messages": 10, }) + self.assertEqual(len(response.received_messages), len(test_messages)) + + received_map = { + msg.message.data: msg.message + for msg in response.received_messages + } + self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') + + ack_ids = [msg.ack_id for msg in response.received_messages] + if ack_ids: + self.sub_client.acknowledge( + request={ + "subscription": ordering_sub.name, + "ack_ids": ack_ids, + }) + finally: + test_utils.cleanup_subscriptions(self.sub_client, [ordering_sub]) + test_utils.cleanup_topics(self.pub_client, [ordering_topic]) if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) From 7189a01f5214bdd1a9451cd0ea1ccedd1a2e41d5 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:11:31 +0530 Subject: [PATCH 12/17] Update pubsub_integration_test.py --- .../apache_beam/io/gcp/pubsub_integration_test.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 63edc54fb501..cb2995fb3777 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -326,15 +326,17 @@ def test_batch_write_with_ordering_key(self): from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create + from google.pubsub_v1.types import Subscription ordering_topic = self.pub_client.create_topic( name=self.pub_client.topic_path( self.project, 'psit_topic_ordering' + self.uuid)) ordering_sub = self.sub_client.create_subscription( - name=self.sub_client.subscription_path( - self.project, 'psit_sub_ordering' + self.uuid), - topic=ordering_topic.name, - enable_message_ordering=True) + request=Subscription( + name=self.sub_client.subscription_path( + self.project, 'psit_sub_ordering' + self.uuid), + topic=ordering_topic.name, + enable_message_ordering=True)) time.sleep(10) try: From 81d31b3cc5430ffec73b3d4e12331aef5544a952 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Mon, 20 Apr 2026 23:10:24 +0530 Subject: [PATCH 13/17] Fix PubSub error --- sdks/python/apache_beam/io/gcp/pubsub.py | 13 ++-- .../io/gcp/pubsub_integration_test.py | 71 +++++++++---------- 2 files changed, 41 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub.py b/sdks/python/apache_beam/io/gcp/pubsub.py index 6e8f8bba662a..1fcdd45bc6e2 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub.py +++ b/sdks/python/apache_beam/io/gcp/pubsub.py @@ -31,8 +31,9 @@ """ # pytype: skip-file - +import logging import re +import time from typing import Any from typing import NamedTuple from typing import Optional @@ -432,7 +433,6 @@ def expand(self, pcoll): self.pipeline_options = pcoll.pipeline.options if pcoll.pipeline else None # Warn Dataflow users to use the XLang path for ordering key support, # since _PubSubWriteDoFn._flush() is not used by Dataflow's implementation. - import logging runner = self.pipeline_options.get_all_options().get( 'runner', '') if self.pipeline_options else '' if 'Dataflow' in str(runner): @@ -607,7 +607,7 @@ def __init__(self, transform): output_labels_supported = False # Log debug information for troubleshooting - import logging + runner_info = getattr( pipeline_options, 'runner', 'None') if pipeline_options else 'No options' @@ -638,7 +638,10 @@ def __init__(self, transform): def setup(self): from google.cloud import pubsub - self._pub_client = pubsub.PublisherClient() + self._pub_client = pubsub.PublisherClient( + publisher_options=pubsub.types.PublisherOptions( + enable_message_ordering=True, + )) self._topic = self._pub_client.topic_path( self.project, self.short_topic_name) @@ -657,8 +660,6 @@ def _flush(self): if not self._buffer: return - import time - # The elements in buffer are serialized protobuf bytes from the previous # transforms. We need to deserialize them to extract data and attributes. futures = [] diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index cb2995fb3777..425e30c35d35 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -36,6 +36,11 @@ from apache_beam.testing import test_utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.transforms import Create +from google.pubsub_v1.types import Subscription +from google.cloud import pubsub INPUT_TOPIC = 'psit_topic_input' OUTPUT_TOPIC = 'psit_topic_output' @@ -139,7 +144,6 @@ def setUp(self): self.uuid = str(uuid.uuid4()) # Set up PubSub environment. - from google.cloud import pubsub self.pub_client = pubsub.PublisherClient() self.input_topic = self.pub_client.create_topic( name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid)) @@ -228,9 +232,6 @@ def _test_batch_write(self, with_attributes): with_attributes: False - Writes message data only. True - Writes message data and attributes. """ - from apache_beam.options.pipeline_options import PipelineOptions - from apache_beam.options.pipeline_options import StandardOptions - from apache_beam.transforms import Create # Create test messages for batch mode test_messages = [ @@ -309,24 +310,19 @@ def test_batch_write_with_attributes(self): def test_batch_write_with_ordering_key(self): """Test WriteToPubSub in batch mode with ordering keys. - Dataflow Native PubSub Sink does not support ordering_key - (https://github.com/apache/beam/issues/36201), therefore this - test only applies to runners that use Beam's Python WriteToPubSub Sink. - To use ordering_key, Dataflow users should use the XLang WriteToPubSub - path instead. + Dataflow's Native Pub/Sub Sink does not support ordering_key + (see https://github.com/apache/beam/issues/36201), so this test + only applies to runners using Beam's Python WriteToPubSub Sink. + Dataflow users should use the XLang WriteToPubSub path instead + (apache_beam.io.external.gcp.pubsub.WriteToPubSub with + publish_with_ordering_key=True). """ if self.runner_name == 'TestDataflowRunner': self.skipTest( 'Dataflow Native PubSub Sink does not support ordering_key ' - '(see https://github.com/apache/beam/issues/36201), therefore ' - 'this test only applies to runners that use Beam\'s Python ' - 'WriteToPubSub Sink. To use ordering_key, Dataflow users should ' - 'use the XLang WriteToPubSub path instead.') - - from apache_beam.options.pipeline_options import PipelineOptions - from apache_beam.options.pipeline_options import StandardOptions - from apache_beam.transforms import Create - from google.pubsub_v1.types import Subscription + '(see https://github.com/apache/beam/issues/36201). ' + 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub ' + 'with publish_with_ordering_key=True instead.') ordering_topic = self.pub_client.create_topic( name=self.pub_client.topic_path( @@ -336,7 +332,8 @@ def test_batch_write_with_ordering_key(self): name=self.sub_client.subscription_path( self.project, 'psit_sub_ordering' + self.uuid), topic=ordering_topic.name, - enable_message_ordering=True)) + enable_message_ordering=True, + )) time.sleep(10) try: @@ -346,7 +343,7 @@ def test_batch_write_with_ordering_key(self): PubsubMessage( b'order_data002', {'attr': 'value2'}, ordering_key='key1'), PubsubMessage( - b'order_data003', {'attr': 'value3'}, ordering_key='key2') + b'order_data003', {'attr': 'value3'}, ordering_key='key2'), ] pipeline_options = PipelineOptions() @@ -361,30 +358,30 @@ def test_batch_write_with_ordering_key(self): response = self.sub_client.pull( request={ - "subscription": ordering_sub.name, - "max_messages": 10, + 'subscription': ordering_sub.name, + 'max_messages': 10, }) self.assertEqual(len(response.received_messages), len(test_messages)) - received_map = { - msg.message.data: msg.message - for msg in response.received_messages - } - self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') - self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') - self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') + received_ordering_keys = [ + msg.message.ordering_key for msg in response.received_messages + ] + expected_ordering_keys = sorted( + [msg.ordering_key for msg in test_messages]) + self.assertEqual(sorted(received_ordering_keys), expected_ordering_keys) ack_ids = [msg.ack_id for msg in response.received_messages] - if ack_ids: - self.sub_client.acknowledge( - request={ - "subscription": ordering_sub.name, - "ack_ids": ack_ids, - }) + self.sub_client.acknowledge( + request={ + 'subscription': ordering_sub.name, + 'ack_ids': ack_ids, + }) finally: - test_utils.cleanup_subscriptions(self.sub_client, [ordering_sub]) - test_utils.cleanup_topics(self.pub_client, [ordering_topic]) + self.sub_client.delete_subscription( + request={'subscription': ordering_sub.name}) + self.pub_client.delete_topic(request={'topic': ordering_topic.name}) + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) From 8a38d9cdb9338fd56064baca226a53f8e4eab637 Mon Sep 17 00:00:00 2001 From: Nikita Grover <145201799+nikitagrover19@users.noreply.github.com> Date: Tue, 21 Apr 2026 16:27:11 +0530 Subject: [PATCH 14/17] Update pubsub_integration_test.py --- .../io/gcp/pubsub_integration_test.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 425e30c35d35..3cdd508053ea 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -36,11 +36,6 @@ from apache_beam.testing import test_utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import StandardOptions -from apache_beam.transforms import Create -from google.pubsub_v1.types import Subscription -from google.cloud import pubsub INPUT_TOPIC = 'psit_topic_input' OUTPUT_TOPIC = 'psit_topic_output' @@ -144,6 +139,7 @@ def setUp(self): self.uuid = str(uuid.uuid4()) # Set up PubSub environment. + from google.cloud import pubsub self.pub_client = pubsub.PublisherClient() self.input_topic = self.pub_client.create_topic( name=self.pub_client.topic_path(self.project, INPUT_TOPIC + self.uuid)) @@ -232,7 +228,10 @@ def _test_batch_write(self, with_attributes): with_attributes: False - Writes message data only. True - Writes message data and attributes. """ - + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + # Create test messages for batch mode test_messages = [ PubsubMessage(b'batch_data001', {'batch_attr': 'value1'}), @@ -323,6 +322,11 @@ def test_batch_write_with_ordering_key(self): '(see https://github.com/apache/beam/issues/36201). ' 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub ' 'with publish_with_ordering_key=True instead.') + + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.pipeline_options import StandardOptions + from apache_beam.transforms import Create + from google.pubsub_v1.types import Subscription ordering_topic = self.pub_client.create_topic( name=self.pub_client.topic_path( @@ -364,12 +368,13 @@ def test_batch_write_with_ordering_key(self): self.assertEqual(len(response.received_messages), len(test_messages)) - received_ordering_keys = [ - msg.message.ordering_key for msg in response.received_messages - ] - expected_ordering_keys = sorted( - [msg.ordering_key for msg in test_messages]) - self.assertEqual(sorted(received_ordering_keys), expected_ordering_keys) + received_map = { + msg.message.data: msg.message + for msg in response.received_messages + } + self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') + self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') ack_ids = [msg.ack_id for msg in response.received_messages] self.sub_client.acknowledge( From e825e6a70167996df3c3ad8cb705bb041d4e0b2d Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Wed, 22 Apr 2026 22:32:23 +0530 Subject: [PATCH 15/17] Fix formatting: remove trailing whitespace --- sdks/python/apache_beam/io/gcp/pubsub_integration_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index 3cdd508053ea..fa4ffcd4522f 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -231,7 +231,7 @@ def _test_batch_write(self, with_attributes): from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create - + # Create test messages for batch mode test_messages = [ PubsubMessage(b'batch_data001', {'batch_attr': 'value1'}), @@ -322,7 +322,7 @@ def test_batch_write_with_ordering_key(self): '(see https://github.com/apache/beam/issues/36201). ' 'Use apache_beam.io.external.gcp.pubsub.WriteToPubSub ' 'with publish_with_ordering_key=True instead.') - + from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.transforms import Create From b2fa5a06cfc31ac6de4f2ba746ecd4689b525848 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Mon, 11 May 2026 23:25:58 +0530 Subject: [PATCH 16/17] Fix ordering key integration test: retry pull loop, fix indentation --- .../io/gcp/pubsub_integration_test.py | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index fa4ffcd4522f..e40aeb7c7b4a 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -360,23 +360,31 @@ def test_batch_write_with_ordering_key(self): time.sleep(10) - response = self.sub_client.pull( - request={ - 'subscription': ordering_sub.name, - 'max_messages': 10, - }) - - self.assertEqual(len(response.received_messages), len(test_messages)) + # Retry pulling to handle PubSub delivery delays + received_messages = [] + deadline = time.time() + 60 # wait up to 60 seconds + while time.time() < deadline: + response = self.sub_client.pull( + request={ + 'subscription': ordering_sub.name, + 'max_messages': 10, + }) + received_messages.extend(response.received_messages) + if len(received_messages) >= len(test_messages): + break + time.sleep(5) + + self.assertEqual(len(received_messages), len(test_messages)) received_map = { msg.message.data: msg.message - for msg in response.received_messages + for msg in received_messages } self.assertEqual(received_map[b'order_data001'].ordering_key, 'key1') self.assertEqual(received_map[b'order_data002'].ordering_key, 'key1') self.assertEqual(received_map[b'order_data003'].ordering_key, 'key2') - ack_ids = [msg.ack_id for msg in response.received_messages] + ack_ids = [msg.ack_id for msg in received_messages] self.sub_client.acknowledge( request={ 'subscription': ordering_sub.name, From 5e4f7cc6df9c804afef201fdc9b5d1016d484583 Mon Sep 17 00:00:00 2001 From: Nikita Grover Date: Tue, 12 May 2026 19:54:23 +0530 Subject: [PATCH 17/17] Fix import order: google imports before apache_beam (isort) --- sdks/python/apache_beam/io/gcp/pubsub_integration_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py index e40aeb7c7b4a..a7bf5abcb468 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py @@ -26,8 +26,6 @@ import uuid import pytest -from hamcrest.core.core.allof import all_of - from apache_beam.io.gcp import pubsub_it_pipeline from apache_beam.io.gcp.pubsub import PubsubMessage from apache_beam.io.gcp.pubsub import WriteToPubSub @@ -36,6 +34,7 @@ from apache_beam.testing import test_utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline +from hamcrest.core.core.allof import all_of INPUT_TOPIC = 'psit_topic_input' OUTPUT_TOPIC = 'psit_topic_output'