diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 7ab7bcd9a9c6..b73af5e61a43 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 1 } diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json index 3a009261f4f9..5abe02fc09c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 2 + "modification": 1 } diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 5df3841d2363..833fd9b0d174 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 3 + "modification": 2 } diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 27b9cef9637a..0ca35b08c520 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -611,7 +611,7 @@ class BeamModulePlugin implements Plugin { def gax_version = "2.79.0" def google_ads_version = "33.0.0" def google_clients_version = "2.0.0" - def google_cloud_bigdataoss_version = "3.1.16" + def google_cloud_bigdataoss_version = "2.2.26" def google_code_gson_version = "2.10.1" def google_oauth_clients_version = "1.34.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom @@ -702,9 +702,9 @@ class BeamModulePlugin implements Plugin { aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version", azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14", bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", - bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:$google_cloud_bigdataoss_version", + bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version", - bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:$google_cloud_bigdataoss_version", + bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version", byte_buddy : "net.bytebuddy:byte-buddy:1.17.7", cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java index 97778ac4e1df..1ade4be6fdb5 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV1.java @@ -30,6 +30,7 @@ import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpStatusCodes; +import com.google.api.client.http.HttpTransport; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -52,6 +53,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; @@ -265,12 +267,8 @@ public boolean shouldRetry(IOException e) { .setReadChannelOptions(gcsReadOptions) .setGrpcEnabled(shouldUseGrpc) .build(); - try { - googleCloudStorage = - createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); - } catch (IOException e) { - throw new RuntimeException(e); - } + googleCloudStorage = + createGoogleCloudStorage(googleCloudStorageOptions, storageClient, credentials); this.batchRequestSupplier = () -> { // Capture reference to this so that the most recent storageClient and initializer @@ -727,16 +725,48 @@ public WritableByteChannel create(GcsPath path, CreateOptions options) throws IO } GoogleCloudStorage createGoogleCloudStorage( - GoogleCloudStorageOptions options, Storage storage, Credentials credentials) - throws IOException { - return GoogleCloudStorageImpl.builder() - .setOptions(options) - .setHttpTransport(storage.getRequestFactory().getTransport()) - .setCredentials(credentials) - // gcsio 3 expects httpRequestInitializer to be either absent or - // com.google.cloud.hadoop.util.RetryHttpInitializer when credentials not provided - .setHttpRequestInitializer(credentials != null ? httpRequestInitializer : null) - .build(); + GoogleCloudStorageOptions options, Storage storage, Credentials credentials) { + try { + return new GoogleCloudStorageImpl(options, storage, credentials); + } catch (NoSuchMethodError e) { + // gcs-connector 3.x drops the direct constructor and exclusively uses Builder + // TODO eliminate reflection once Beam drops Java 8 support and upgrades to gcsio 3.x + try { + final Method builderMethod = GoogleCloudStorageImpl.class.getMethod("builder"); + Object builder = builderMethod.invoke(null); + final Class builderClass = + Class.forName( + "com.google.cloud.hadoop.gcsio.AutoBuilder_GoogleCloudStorageImpl_Builder"); + + final Method setOptionsMethod = + builderClass.getMethod("setOptions", GoogleCloudStorageOptions.class); + setOptionsMethod.setAccessible(true); + builder = setOptionsMethod.invoke(builder, options); + + final Method setHttpTransportMethod = + builderClass.getMethod("setHttpTransport", HttpTransport.class); + setHttpTransportMethod.setAccessible(true); + builder = + setHttpTransportMethod.invoke(builder, storage.getRequestFactory().getTransport()); + + final Method setCredentialsMethod = + builderClass.getMethod("setCredentials", Credentials.class); + setCredentialsMethod.setAccessible(true); + builder = setCredentialsMethod.invoke(builder, credentials); + + final Method setHttpRequestInitializerMethod = + builderClass.getMethod("setHttpRequestInitializer", HttpRequestInitializer.class); + setHttpRequestInitializerMethod.setAccessible(true); + builder = setHttpRequestInitializerMethod.invoke(builder, httpRequestInitializer); + + final Method buildMethod = builderClass.getMethod("build"); + buildMethod.setAccessible(true); + return (GoogleCloudStorage) buildMethod.invoke(builder); + } catch (Exception reflectionError) { + throw new RuntimeException( + "Failed to construct GoogleCloudStorageImpl from gcsio 3.x Builder", reflectionError); + } + } } /** diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index d32ca162e3fd..a2b0e0af502b 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -184,8 +184,8 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep GoogleCloudStorageReadOptions readOptions = GoogleCloudStorageReadOptions.builder() .setFadvise(GoogleCloudStorageReadOptions.Fadvise.AUTO) - .setGzipEncodingSupportEnabled(true) - .setFastFailOnNotFoundEnabled(false) + .setSupportGzipEncoding(true) + .setFastFailOnNotFound(false) .build(); GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); @@ -193,10 +193,7 @@ public void testCreationWithExplicitGoogleCloudStorageReadOptions() throws Excep GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorage googleCloudStorageMock = Mockito.spy(GoogleCloudStorage.class); - Mockito.when( - googleCloudStorageMock.open( - Mockito.any(StorageResourceId.class), - Mockito.any(GoogleCloudStorageReadOptions.class))) + Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) .thenReturn(Mockito.mock(SeekableByteChannel.class)); gcsUtil.delegate.setCloudStorageImpl(googleCloudStorageMock); @@ -1009,7 +1006,7 @@ public void testGCSChannelCloseIdempotent() throws IOException { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorageReadOptions readOptions = - GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(false).build(); + GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build(); gcsUtil.delegate.setCloudStorageImpl( GoogleCloudStorageOptions.builder() @@ -1029,7 +1026,7 @@ public void testGCSReadMetricsIsSet() { GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); GoogleCloudStorageReadOptions readOptions = - GoogleCloudStorageReadOptions.builder().setFastFailOnNotFoundEnabled(true).build(); + GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(true).build(); gcsUtil.delegate.setCloudStorageImpl( GoogleCloudStorageOptions.builder() .setAppName("Beam") @@ -1676,10 +1673,8 @@ public static GcsUtilV1Mock createMockWithMockStorage( .thenReturn(Channels.newChannel(new ByteArrayOutputStream())); } else { SeekableByteChannel seekableByteChannel = new SeekableInMemoryByteChannel(readPayload); - Mockito.when(googleCloudStorageMock.open(Mockito.any(StorageResourceId.class))) - .thenReturn(seekableByteChannel); - Mockito.when( - googleCloudStorageMock.open(Mockito.any(StorageResourceId.class), Mockito.any())) + Mockito.when(googleCloudStorageMock.open(Mockito.any())).thenReturn(seekableByteChannel); + Mockito.when(googleCloudStorageMock.open(Mockito.any(), Mockito.any())) .thenReturn(seekableByteChannel); } return gcsUtilMock; diff --git a/sdks/java/extensions/sql/iceberg/build.gradle b/sdks/java/extensions/sql/iceberg/build.gradle index 893a485e7d86..1e319c97a8e2 100644 --- a/sdks/java/extensions/sql/iceberg/build.gradle +++ b/sdks/java/extensions/sql/iceberg/build.gradle @@ -48,7 +48,6 @@ dependencies { testImplementation library.java.google_api_services_bigquery testImplementation "org.apache.iceberg:iceberg-api:1.9.2" testImplementation "org.apache.iceberg:iceberg-core:1.9.2" - testRuntimeOnly library.java.bigdataoss_util_hadoop testImplementation project(":sdks:java:io:google-cloud-platform") testImplementation project(":sdks:java:extensions:google-cloud-platform-core") } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 1045ad4aeed0..4e4c15f367cb 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -57,14 +57,6 @@ configurations.runtimeClasspath { // Pin nimbus-jose-jwt to 9.37.4 to fix CVE-2025-53864 (transitive via hadoop-auth) resolutionStrategy.force 'com.nimbusds:nimbus-jose-jwt:9.37.4' - - // [iceberg] - // TODO(https://github.com/apache/beam/issues/38515): - // Remove below pins when parquet-hadoop upgrades to hadoop-common:3.4.2 - resolutionStrategy.force 'org.apache.hadoop:hadoop-common:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-client:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs-client:3.3.6' } shadowJar { @@ -91,10 +83,12 @@ dependencies { permitUnusedDeclared project(":sdks:java:extensions:kafka-factories") runtimeOnly project(":sdks:java:io:amazon-web-services2") // FileSystem may be used by Iceberg AddFiles - runtimeOnly project(":sdks:java:io:iceberg") - runtimeOnly project(":sdks:java:io:iceberg:hive") - runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") - runtimeOnly library.java.bigdataoss_util_hadoop + if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { + // iceberg ended support for Java 8 in 1.7.0 + runtimeOnly project(":sdks:java:io:iceberg") + runtimeOnly project(":sdks:java:io:iceberg:hive") + runtimeOnly project(path: ":sdks:java:io:iceberg:bqms", configuration: "shadow") + } runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 8142c5f5b90b..bbd55fee2fc8 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -59,7 +59,7 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation "org.apache.iceberg:iceberg-data:$iceberg_version" - implementation "org.apache.hadoop:hadoop-common:3.3.6" + implementation library.java.hadoop_common // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency provided "org.immutables:value:2.8.8" permitUnusedDeclared "org.immutables:value:2.8.8" @@ -70,7 +70,6 @@ dependencies { runtimeOnly "org.apache.iceberg:iceberg-azure:$iceberg_version" runtimeOnly "org.apache.iceberg:iceberg-azure-bundle:$iceberg_version" runtimeOnly library.java.bigdataoss_gcs_connector - runtimeOnly library.java.bigdataoss_util_hadoop runtimeOnly library.java.hadoop_client testImplementation project(":sdks:java:managed") @@ -118,12 +117,6 @@ dependencies { configurations.all { // iceberg-core needs avro:1.12.0 resolutionStrategy.force 'org.apache.avro:avro:1.12.0' - // TODO(https://github.com/apache/beam/issues/38515): - // Remove below pins when parquet-hadoop upgrades to hadoop-common:3.4.2 - resolutionStrategy.force 'org.apache.hadoop:hadoop-common:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-client:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs:3.3.6' - resolutionStrategy.force 'org.apache.hadoop:hadoop-hdfs-client:3.3.6' } hadoopVersions.each {kv -> diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 45ecc7cf71c3..56f21d2bc8b5 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -19,7 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.newHashSet; +import static org.apache.hadoop.util.Sets.newHashSet; import com.google.auto.value.AutoValue; import java.io.Serializable;