Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 1
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run ",
"modification": 3
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ class BeamModulePlugin implements Plugin<Project> {
def gax_version = "2.79.0"
def google_ads_version = "33.0.0"
def google_clients_version = "2.0.0"
def google_cloud_bigdataoss_version = "3.1.16"
def google_cloud_bigdataoss_version = "2.2.26"
def google_code_gson_version = "2.10.1"
def google_oauth_clients_version = "1.34.1"
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
Expand Down Expand Up @@ -702,9 +702,9 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version",
azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:$google_cloud_bigdataoss_version",
bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version",
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:$google_cloud_bigdataoss_version",
bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version",
byte_buddy : "net.bytebuddy:byte-buddy:1.17.7",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.storage.Storage;
Expand All @@ -52,6 +53,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.AccessDeniedException;
Expand Down Expand Up @@ -265,12 +267,8 @@ public boolean shouldRetry(IOException e) {
.setReadChannelOptions(gcsReadOptions)
.setGrpcEnabled(shouldUseGrpc)
.build();
try {
googleCloudStorage =
createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
} catch (IOException e) {
throw new RuntimeException(e);
}
googleCloudStorage =
createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
Comment on lines +270 to +271
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The createGoogleCloudStorage method still needs to handle or declare IOException because the GoogleCloudStorageImpl constructor in gcsio 2.x throws it. The try-catch block should be restored here to maintain proper error handling and ensure the code compiles.

    try {
      googleCloudStorage =
          createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

this.batchRequestSupplier =
() -> {
// Capture reference to this so that the most recent storageClient and initializer
Expand Down Expand Up @@ -727,16 +725,48 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO
}

GoogleCloudStorage createGoogleCloudStorage(
GoogleCloudStorageOptions options, Storage storage, Credentials credentials)
throws IOException {
return GoogleCloudStorageImpl.builder()
.setOptions(options)
.setHttpTransport(storage.getRequestFactory().getTransport())
.setCredentials(credentials)
// gcsio 3 expects httpRequestInitializer to be either absent or
// com.google.cloud.hadoop.util.RetryHttpInitializer when credentials not provided
.setHttpRequestInitializer(credentials != null ? httpRequestInitializer : null)
.build();
GoogleCloudStorageOptions options, Storage storage, Credentials credentials) {
Comment on lines 727 to +728
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The GoogleCloudStorageImpl constructor in gcsio 2.x throws a checked IOException. Removing throws IOException from this method signature will cause a compilation error when using the 2.x library. Please restore the exception declaration.

  GoogleCloudStorage createGoogleCloudStorage(
      GoogleCloudStorageOptions options, Storage storage, Credentials credentials)
      throws IOException {

try {
return new GoogleCloudStorageImpl(options, storage, credentials);
} catch (NoSuchMethodError e) {
// gcs-connector 3.x drops the direct constructor and exclusively uses Builder
// TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x
try {
final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder");
Object builder = builderMethod.invoke(null);
final Class<?> builderClass =
Class.forName(
"com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder");
Comment on lines +737 to +739
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding the internal AutoBuilder class name is fragile and depends on implementation details of the gcsio library. It is safer to obtain the class directly from the builder instance returned by GoogleCloudStorageImpl.builder().

Suggested change
final Class<?> builderClass =
Class.forName(
"com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder");
final Class<?> builderClass = builder.getClass();


final Method setOptionsMethod =
builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class);
setOptionsMethod.setAccessible(true);
builder = setOptionsMethod.invoke(builder, options);

final Method setHttpTransportMethod =
builderClass.getMethod("setHttpTransport", HttpTransport.class);
setHttpTransportMethod.setAccessible(true);
builder =
setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport());

final Method setCredentialsMethod =
builderClass.getMethod("setCredentials", Credentials.class);
setCredentialsMethod.setAccessible(true);
builder = setCredentialsMethod.invoke(builder, credentials);

final Method setHttpRequestInitializerMethod =
builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class);
setHttpRequestInitializerMethod.setAccessible(true);
builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer);
Comment on lines +757 to +760
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The reflection logic for gcsio 3.x should preserve the original behavior of passing null for the httpRequestInitializer when credentials are null. This was specifically required for gcsio 3.x as noted in the previous implementation's comments.

Suggested change
final Method setHttpRequestInitializerMethod =
builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class);
setHttpRequestInitializerMethod.setAccessible(true);
builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer);
final Method setHttpRequestInitializerMethod =
builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class);
setHttpRequestInitializerMethod.setAccessible(true);
builder = setHttpRequestInitializerMethod.invoke(
builder, credentials != null ? httpRequestInitializer : null);


final Method buildMethod = builderClass.getMethod("build");
buildMethod.setAccessible(true);
return (GoogleCloudStorage) buildMethod.invoke(builder);
} catch (Exception reflectionError) {
throw new RuntimeException(
"Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,19 +184,16 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder()
.setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO)
.setGzipEncodingSupportEnabled(true)
.setFastFailOnNotFoundEnabled(false)
.setSupportGzipEncoding(true)
.setFastFailOnNotFound(false)
.build();

GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGoogleCloudStorageReadOptions(readOptions);

GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class);
Mockito.when(
googleCloudStorageMock.open(
Mockito.any(StorageResourceId.class),
Mockito.any(GoogleCloudStorageReadOptions.class)))
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(SeekableByteChannel.class));
gcsUtil.delegate.setCloudStorageImpl(googleCloudStorageMock);

Expand Down Expand Up @@ -1009,7 +1006,7 @@ public void testGCSChannelCloseIdempotent() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build();
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build();

gcsUtil.delegate.setCloudStorageImpl(
GoogleCloudStorageOptions.builder()
Expand All @@ -1029,7 +1026,7 @@ public void testGCSReadMetricsIsSet() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(true).build();
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(true).build();
gcsUtil.delegate.setCloudStorageImpl(
GoogleCloudStorageOptions.builder()
.setAppName("Beam")
Expand Down Expand Up @@ -1676,10 +1673,8 @@ public static GcsUtilV1Mock createMockWithMockStorage(
.thenReturn(Channels.newChannel(new ByteArrayOutputStream()));
} else {
SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readPayload);
Mockito.when(googleCloudStorageMock.open(Mockito.any(StorageResourceId.class)))
.thenReturn(seekableByteChannel);
Mockito.when(
googleCloudStorageMock.open(Mockito.any(StorageResourceId.class), Mockito.any()))
Mockito.when(googleCloudStorageMock.open(Mockito.any())).thenReturn(seekableByteChannel);
Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any()))
.thenReturn(seekableByteChannel);
}
return gcsUtilMock;
Expand Down
1 change: 0 additions & 1 deletion sdks/java/extensions/sql/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ dependencies {
testImplementation library.java.google_api_services_bigquery
testImplementation "org.apache.iceberg:iceberg-api:1.9.2"
testImplementation "org.apache.iceberg:iceberg-core:1.9.2"
testRuntimeOnly library.java.bigdataoss_util_hadoop
testImplementation project(":sdks:java:io:google-cloud-platform")
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")
}
Expand Down
18 changes: 6 additions & 12 deletions sdks/java/io/expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ configurations.runtimeClasspath {

// Pin nimbus-jose-jwt to 9.37.4 to fix CVE-2025-53864 (transitive via hadoop-auth)
resolutionStrategy.force 'com.nimbusds:nimbus-jose-jwt:9.37.4'

// [iceberg]
// TODO(https://github.com/apache/beam/issues/38515):
// Remove below pins when parquet-hadoop upgrades to hadoop-common:3.4.2
resolutionStrategy.force 'org.apache.hadoop:hadoop-common:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-client:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs-client:3.3.6'
}

shadowJar {
Expand All @@ -91,10 +83,12 @@ dependencies {
permitUnusedDeclared project(":sdks:java:extensions:kafka-factories")
runtimeOnly project(":sdks:java:io:amazon-web-services2") // FileSystem may be used by Iceberg AddFiles

runtimeOnly project(":sdks:java:io:iceberg")
runtimeOnly project(":sdks:java:io:iceberg:hive")
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
runtimeOnly library.java.bigdataoss_util_hadoop
if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') {
// iceberg ended support for Java 8 in 1.7.0
runtimeOnly project(":sdks:java:io:iceberg")
runtimeOnly project(":sdks:java:io:iceberg:hive")
runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow")
}

runtimeOnly library.java.kafka_clients
runtimeOnly library.java.slf4j_jdk14
Expand Down
9 changes: 1 addition & 8 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ dependencies {
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"
implementation "org.apache.hadoop:hadoop-common:3.3.6"
implementation library.java.hadoop_common
// TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency
provided "org.immutables:value:2.8.8"
permitUnusedDeclared "org.immutables:value:2.8.8"
Expand All @@ -70,7 +70,6 @@ dependencies {
runtimeOnly "org.apache.iceberg:iceberg-azure:$iceberg_version"
runtimeOnly "org.apache.iceberg:iceberg-azure-bundle:$iceberg_version"
runtimeOnly library.java.bigdataoss_gcs_connector
runtimeOnly library.java.bigdataoss_util_hadoop
runtimeOnly library.java.hadoop_client

testImplementation project(":sdks:java:managed")
Expand Down Expand Up @@ -118,12 +117,6 @@ dependencies {
configurations.all {
// iceberg-core needs avro:1.12.0
resolutionStrategy.force 'org.apache.avro:avro:1.12.0'
// TODO(https://github.com/apache/beam/issues/38515):
// Remove below pins when parquet-hadoop upgrades to hadoop-common:3.4.2
resolutionStrategy.force 'org.apache.hadoop:hadoop-common:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-client:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs:3.3.6'
resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs-client:3.3.6'
}

hadoopVersions.each {kv ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet;
import static org.apache.hadoop.util.Sets.newHashSet;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Beam prefers using its vendored Guava dependencies (org.apache.beam.vendor.guava...) over external utility classes like org.apache.hadoop.util.Sets to ensure classpath stability and avoid version conflicts with user-provided Hadoop environments.

Suggested change
import static org.apache.hadoop.util.Sets.newHashSet;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet;


import com.google.auto.value.AutoValue;
import java.io.Serializable;
Expand Down
Loading