Skip to content

Commit e399d43

Browse files
authored
feat(pubsub/v2): add missing ingestion samples (#5406)
* feat(pubsub/v2): add missing ingestion samples * move old sample region tag * use proper name * fix region tags
1 parent dceef4d commit e399d43

7 files changed

+268
-6
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package topics
16+
17+
// [START pubsub_create_topic_with_aws_msk_ingestion]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/pubsub/v2"
24+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
25+
)
26+
27+
func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA string) error {
28+
// projectID := "my-project-id"
29+
// topicID := "my-topic"
30+
31+
// // AWS MSK ingestion settings.
32+
// clusterARN := "cluster-arn"
33+
// mskTopic := "msk-topic"
34+
// awsRoleARN := "aws-role-arn"
35+
// gcpSA := "gcp-service-account"
36+
37+
ctx := context.Background()
38+
client, err := pubsub.NewClient(ctx, projectID)
39+
if err != nil {
40+
return fmt.Errorf("pubsub.NewClient: %w", err)
41+
}
42+
defer client.Close()
43+
44+
topicpb := &pubsubpb.Topic{
45+
Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
46+
IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
47+
Source: &pubsubpb.IngestionDataSourceSettings_AwsMsk_{
48+
AwsMsk: &pubsubpb.IngestionDataSourceSettings_AwsMsk{
49+
ClusterArn: clusterARN,
50+
Topic: mskTopic,
51+
AwsRoleArn: awsRoleARN,
52+
GcpServiceAccount: gcpSA,
53+
},
54+
},
55+
},
56+
}
57+
topic, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
58+
if err != nil {
59+
return fmt.Errorf("CreateTopic: %w", err)
60+
}
61+
fmt.Fprintf(w, "Created topic with AWS MSK ingestion settings: %v\n", topic)
62+
return nil
63+
}
64+
65+
// [END pubsub_create_topic_with_aws_msk_ingestion]
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package topics
16+
17+
// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/pubsub/v2"
24+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
25+
)
26+
27+
func createTopicWithAzureEventHubsIngestion(w io.Writer, projectID, topicID, resourceGroup, namespace, eventHub, clientID, tenantID, subID, gcpSA string) error {
28+
// projectID := "my-project-id"
29+
// topicID := "my-topic"
30+
31+
// // Azure Event Hubs ingestion settings.
32+
// resourceGroup := "resource-group"
33+
// namespace := "namespace"
34+
// eventHub := "event-hub"
35+
// clientID := "client-id"
36+
// tenantID := "tenant-id"
37+
// subID := "subscription-id"
38+
// gcpSA := "gcp-service-account"
39+
40+
ctx := context.Background()
41+
client, err := pubsub.NewClient(ctx, projectID)
42+
if err != nil {
43+
return fmt.Errorf("pubsub.NewClient: %w", err)
44+
}
45+
defer client.Close()
46+
47+
topicpb := &pubsubpb.Topic{
48+
Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
49+
IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
50+
Source: &pubsubpb.IngestionDataSourceSettings_AzureEventHubs_{
51+
AzureEventHubs: &pubsubpb.IngestionDataSourceSettings_AzureEventHubs{
52+
ResourceGroup: resourceGroup,
53+
Namespace: namespace,
54+
EventHub: eventHub,
55+
ClientId: clientID,
56+
TenantId: tenantID,
57+
SubscriptionId: subID,
58+
GcpServiceAccount: gcpSA,
59+
},
60+
},
61+
},
62+
}
63+
topic, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
64+
if err != nil {
65+
return fmt.Errorf("CreateTopic: %w", err)
66+
}
67+
fmt.Fprintf(w, "Created topic with Azure Event Hubs ingestion: %v\n", topic)
68+
return nil
69+
}
70+
71+
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package topics
16+
17+
// [START pubsub_create_topic_with_confluent_cloud_ingestion]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/pubsub/v2"
24+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
25+
)
26+
27+
func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
28+
// projectID := "my-project-id"
29+
// topicID := "my-topic"
30+
31+
// // Confluent Cloud ingestion settings.
32+
// bootstrapServer := "bootstrap-server"
33+
// clusterID := "cluster-id"
34+
// confluentTopic := "confluent-topic"
35+
// poolID := "identity-pool-id"
36+
// gcpSA := "gcp-service-account"
37+
38+
ctx := context.Background()
39+
client, err := pubsub.NewClient(ctx, projectID)
40+
if err != nil {
41+
return fmt.Errorf("pubsub.NewClient: %w", err)
42+
}
43+
defer client.Close()
44+
45+
topicpb := &pubsubpb.Topic{
46+
Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
47+
IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
48+
Source: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud_{
49+
ConfluentCloud: &pubsubpb.IngestionDataSourceSettings_ConfluentCloud{
50+
BootstrapServer: bootstrapServer,
51+
ClusterId: clusterID,
52+
Topic: confluentTopic,
53+
IdentityPoolId: poolID,
54+
GcpServiceAccount: gcpSA,
55+
},
56+
},
57+
},
58+
}
59+
topic, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
60+
if err != nil {
61+
return fmt.Errorf("CreateTopic: %w", err)
62+
}
63+
fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", topic)
64+
return nil
65+
}
66+
67+
// [END pubsub_create_topic_with_confluent_cloud_ingestion]

pubsub/topics/topics_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,65 @@ func TestCreateTopicWithSMT(t *testing.T) {
376376
})
377377
}
378378

379+
func TestTopicAmazonMSKIngestion(t *testing.T) {
380+
tc := testutil.SystemTest(t)
381+
buf := new(bytes.Buffer)
382+
383+
srv := pstest.NewServer()
384+
t.Setenv("PUBSUB_EMULATOR_HOST", srv.Addr)
385+
386+
clusterARN := "cluster-arn"
387+
mskTopic := "msk-topic"
388+
awsRoleARN := "aws-role-arn"
389+
gcpSA := "gcp-service-account"
390+
391+
if err := createTopicWithAWSMSKIngestion(buf, tc.ProjectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA); err != nil {
392+
t.Fatalf("failed to create a topic with AWS MSK ingestion: %v", err)
393+
}
394+
}
395+
396+
func TestTopicAzureEventHubsIngestion(t *testing.T) {
397+
tc := testutil.SystemTest(t)
398+
buf := new(bytes.Buffer)
399+
400+
srv := pstest.NewServer()
401+
t.Setenv("PUBSUB_EMULATOR_HOST", srv.Addr)
402+
403+
resourceGroup := "resource-group"
404+
namespace := "namespace"
405+
eventHub := "event-hub"
406+
clientID := "client-id"
407+
tenantID := "tenant-id"
408+
subID := "subscription-id"
409+
gcpSA := "gcp-service-account"
410+
411+
err := createTopicWithAzureEventHubsIngestion(buf, tc.ProjectID, topicID, resourceGroup,
412+
namespace, eventHub, clientID, tenantID, subID, gcpSA)
413+
if err != nil {
414+
t.Fatalf("failed to create a topic with event hubs ingestion: %v", err)
415+
}
416+
}
417+
418+
func TestTopicConfluentCloudIngestion(t *testing.T) {
419+
tc := testutil.SystemTest(t)
420+
buf := new(bytes.Buffer)
421+
422+
srv := pstest.NewServer()
423+
t.Setenv("PUBSUB_EMULATOR_HOST", srv.Addr)
424+
425+
bootstrapServer := "bootstrap-server"
426+
clusterID := "cluster-id"
427+
confluentTopic := "confluent-topic"
428+
poolID := "identity-pool-id"
429+
gcpSA := "gcp-service-account"
430+
431+
err := createTopicWithConfluentCloudIngestion(buf, tc.ProjectID, topicID,
432+
bootstrapServer, clusterID, confluentTopic, poolID, gcpSA)
433+
if err != nil {
434+
t.Fatalf("failed to create a topic with confluent cloud ingestion: %v", err)
435+
}
436+
}
437+
379438
func createTopic(ctx context.Context, client *pubsub.Client, topicName string) error {
380439
_, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{
381440
Name: topicName,

pubsub/v1samples/topics/create_topic_aws_msk_ingestion.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package topics
1616

17-
// [START pubsub_create_topic_with_aws_msk_ingestion]
17+
// [START pubsub_old_version_create_topic_with_aws_msk_ingestion]
1818
import (
1919
"context"
2020
"fmt"
@@ -58,4 +58,4 @@ func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN,
5858
return nil
5959
}
6060

61-
// [END pubsub_create_topic_with_aws_msk_ingestion]
61+
// [END pubsub_old_version_create_topic_with_aws_msk_ingestion]

pubsub/v1samples/topics/create_topic_azure_event_hubs_ingestion.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package topics
1616

17-
// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
17+
// [START pubsub_old_version_create_topic_with_azure_event_hubs_ingestion]
1818
import (
1919
"context"
2020
"fmt"
@@ -64,4 +64,4 @@ func createTopicWithAzureEventHubsIngestion(w io.Writer, projectID, topicID, res
6464
return nil
6565
}
6666

67-
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
67+
// [END pubsub_old_version_create_topic_with_azure_event_hubs_ingestion]

pubsub/v1samples/topics/create_topic_confluent_cloud_ingestion.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
package topics
1616

17-
// [START pubsub_create_topic_with_confluent_cloud_ingestion]
17+
// [START pubsub_old_version_create_topic_with_confluent_cloud_ingestion]
1818
import (
1919
"context"
2020
"fmt"
@@ -60,4 +60,4 @@ func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, boo
6060
return nil
6161
}
6262

63-
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
63+
// [END pubsub_old_version_create_topic_with_confluent_cloud_ingestion]

0 commit comments

Comments
 (0)