Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.lang.Math.toIntExact

import scala.collection.JavaConverters._

import ai.rapids.cudf.{ColumnVector => CudfColumnVector, Table}
import ai.rapids.cudf.{ColumnVector => CudfColumnVector, OrderByArg, Scalar, Table}
import com.nvidia.spark.rapids.{GpuBoundReference, GpuColumnVector, GpuExpression, GpuLiteral, RapidsHostColumnVector, SpillableColumnarBatch, SpillPriorities}
import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource}
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingSeq
Expand Down Expand Up @@ -54,12 +54,9 @@ class GpuIcebergPartitioner(val spec: PartitionSpec,
private val partitionExprs: Seq[GpuExpression] = spec.fields().asScala.map(getPartitionExpr).toSeq

private val keyColNum: Int = spec.fields().size()
private val inputColNum: Int = dataSparkType.fields.length

// key column indices in the table: [key columns, input columns]
private val keyColIndices: Array[Int] = (0 until keyColNum).toArray
// input column indices in the table: [key columns, input columns]
private val inputColumnIndices: Array[Int] = (keyColNum until (keyColNum + inputColNum)).toArray

/**
* Make a new table: [key columns, input columns]
Expand Down Expand Up @@ -109,7 +106,7 @@ class GpuIcebergPartitioner(val spec: PartitionSpec,
// note: the result does not contain the key columns
val splitRet = withResource(keysAndInputTable) { _ =>
keysAndInputTable.groupBy(keyColIndices: _*)
.contiguousSplitGroupsAndGenUniqKeys(inputColumnIndices)
.contiguousSplitGroupsAndGenUniqKeys()
}

// generate results
Expand Down Expand Up @@ -182,4 +179,86 @@ object GpuIcebergPartitioner {
}).toArray
}
}

private def addRowIdxToTable(table: Table): Table = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update after this PR is merged: #13688

val cols = new Array[CudfColumnVector](table.getNumberOfColumns + 1)

val rowIdxCol = withResource(Scalar.fromInt(0)) { zero =>
CudfColumnVector.sequence(zero, table.getRowCount.toInt)
}
cols(table.getNumberOfColumns) = rowIdxCol

withResource(cols) { _ =>
for (idx <- 0 until table.getNumberOfColumns) {
cols(idx) = table.getColumn(idx).incRefCount()
}

new Table(cols: _*)
}
}

def partitionBy(keys: ColumnarBatch,
keyType: Types.StructType,
keySparkType: StructType,
values: ColumnarBatch,
valueSparkType: Array[DataType]): Seq[ColumnarBatchWithPartition] = {
require(keys.numRows() == values.numRows(),
s"Keys row count ${keys.numRows()} not matching with values row count ${values.numRows()}")

val keySortOrders = (0 until keys.numCols()).map(OrderByArg.asc(_, true))
val keyAggCols = (0 until keys.numCols()).toArray

withResource(GpuColumnVector.from(keys)) { keysTable =>
withResource(GpuColumnVector.from(values)) { valuesTable =>

val sortedKeysWithRowIdx = withResource(addRowIdxToTable(keysTable)) { t =>
t.orderBy(keySortOrders: _*)
}

val (partitionKeys, splits) = withResource(sortedKeysWithRowIdx) { _ =>
val sortedUniqueKeyTable = {
val uniqueKeyTable = keysTable.groupBy(keyAggCols: _*)
.aggregate()
withResource(uniqueKeyTable) { _ =>
uniqueKeyTable.orderBy(keySortOrders: _*)
}
}

withResource(sortedUniqueKeyTable) { _ =>
val partKeys = toPartitionKeys(keyType, keySparkType, sortedUniqueKeyTable)

val splitIdCv = sortedKeysWithRowIdx.upperBound(sortedUniqueKeyTable, keySortOrders: _*)
val splitIds = withResource(splitIdCv) { cv =>
GpuColumnVector.toIntArray(cv)
}

val sortedRowIdxCol = sortedKeysWithRowIdx
.getColumn(keys.numCols())

val splits= withResource(valuesTable.gather(sortedRowIdxCol)) { sortedValuesTable =>
sortedValuesTable.contiguousSplit(splitIds: _*)
}

val (leftSplits, last) = (splits.init, splits.last)
withResource(last) { _ =>
closeOnExcept(leftSplits) { _ =>
require(partKeys.length == leftSplits.length,
s"Partition key length ${partKeys.length}" +
s"not matching with number of column batches ${splits.length}")
require(last.getRowCount == 0, s"Expecting last split empty, but has " +
s"${last.getRowCount} rows")
(partKeys, leftSplits)
}
}
}
}

partitionKeys.zip(splits).map {
case (partKey, split) => ColumnarBatchWithPartition(
SpillableColumnarBatch(split, valueSparkType, SpillPriorities
.ACTIVE_BATCHING_PRIORITY), partKey)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@ import scala.reflect.ClassTag
import scala.util.Try

import com.nvidia.spark.rapids.{AppendDataExecMeta, AtomicCreateTableAsSelectExecMeta, AtomicReplaceTableAsSelectExecMeta, FileFormatChecks, GpuExec, GpuExpression, GpuRowToColumnarExec, GpuScan, IcebergFormatType, OverwriteByExpressionExecMeta, OverwritePartitionsDynamicExecMeta, RapidsConf, ScanMeta, ScanRule, ShimReflectionUtils, SparkPlanMeta, StaticInvokeMeta, TargetSize, WriteFileOp}
import com.nvidia.spark.rapids.shims.ReplaceDataExecMeta
import com.nvidia.spark.rapids.shims.{ReplaceDataExecMeta, WriteDeltaExecMeta}
import org.apache.iceberg.spark.GpuTypeToSparkType.toSparkType
import org.apache.iceberg.spark.functions.{BucketFunction, GpuBucketExpression}
import org.apache.iceberg.spark.source.{GpuSparkScan, GpuSparkWrite}
import org.apache.iceberg.spark.source.{GpuSparkPositionDeltaWrite, GpuSparkScan, GpuSparkWrite}
import org.apache.iceberg.spark.source.GpuSparkPositionDeltaWrite.tableOf
import org.apache.iceberg.spark.supportsCatalog

import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, GpuAppendDataExec, GpuOverwriteByExpressionExec, GpuOverwritePartitionsDynamicExec, GpuReplaceDataExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, AtomicCreateTableAsSelectExec, AtomicReplaceTableAsSelectExec, GpuAppendDataExec, GpuOverwriteByExpressionExec, GpuOverwritePartitionsDynamicExec, GpuReplaceDataExec, GpuWriteDeltaExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec, WriteDeltaExec}
import org.apache.spark.sql.execution.datasources.v2.rapids.{GpuAtomicCreateTableAsSelectExec, GpuAtomicReplaceTableAsSelectExec}


Expand Down Expand Up @@ -252,6 +254,8 @@ class IcebergProviderImpl extends IcebergProvider {
cpuExec match {
case replaceData: ReplaceDataExec =>
tagForGpu(replaceData, meta.asInstanceOf[ReplaceDataExecMeta])
case writeDelta: WriteDeltaExec =>
tagForGpu(writeDelta, meta.asInstanceOf[WriteDeltaExecMeta])
case appendData: AppendDataExec =>
tagForGpu(appendData, meta.asInstanceOf[AppendDataExecMeta])
case createTable: AtomicCreateTableAsSelectExec =>
Expand All @@ -271,6 +275,8 @@ class IcebergProviderImpl extends IcebergProvider {
cpuExec match {
case replaceData: ReplaceDataExec =>
convertToGpu(replaceData, meta.asInstanceOf[ReplaceDataExecMeta])
case writeDelta: WriteDeltaExec =>
convertToGpu(writeDelta, meta.asInstanceOf[WriteDeltaExecMeta])
case appendData: AppendDataExec =>
convertToGpu(appendData, meta.asInstanceOf[AppendDataExecMeta])
case createTable: AtomicCreateTableAsSelectExec =>
Expand Down Expand Up @@ -313,4 +319,34 @@ class IcebergProviderImpl extends IcebergProvider {
cpuExec.refreshCache,
GpuSparkWrite.convert(cpuExec.write))
}

private def tagForGpu(cpuExec: WriteDeltaExec, meta: WriteDeltaExecMeta): Unit = {
if (!meta.conf.isIcebergEnabled) {
meta.willNotWorkOnGpu("Iceberg input and output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG} to true")
}

if (!meta.conf.isIcebergWriteEnabled) {
meta.willNotWorkOnGpu("Iceberg output has been disabled. To enable set " +
s"${RapidsConf.ENABLE_ICEBERG_WRITE} to true")
}

val outputSchema = toSparkType(tableOf(cpuExec.write).schema())
FileFormatChecks.tag(meta, outputSchema,
IcebergFormatType, WriteFileOp)

GpuSparkPositionDeltaWrite.tagForGpu(cpuExec.write, meta)
}

private def convertToGpu(cpuExec: WriteDeltaExec, meta: WriteDeltaExecMeta): GpuExec = {
var child: SparkPlan = meta.childPlans.head.convertIfNeeded()
if (!child.supportsColumnar) {
child = GpuRowToColumnarExec(child, TargetSize(meta.conf.gpuTargetBatchSizeBytes))
}
GpuWriteDeltaExec(
child,
cpuExec.refreshCache,
cpuExec.projections,
GpuSparkPositionDeltaWrite.convert(cpuExec.write))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.iceberg.utils

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.Arm.closeOnExcept
import com.nvidia.spark.rapids.GpuColumnVector
import org.apache.iceberg.types.Types.StructType

import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* Gpu version of {@link org.apache.iceberg.util.StructProjection}
*/
class GpuStructProjection(val positionMap: Array[Int]) {
def project(batch: ColumnarBatch): ColumnarBatch = {
val cols = new Array[ColumnVector](positionMap.length)
closeOnExcept(cols) { _ =>
for (idx <- positionMap.indices) {
cols(idx) = batch.column(positionMap(idx))
.asInstanceOf[GpuColumnVector]
.incRefCount()
}

new ColumnarBatch(cols, batch.numRows())
}
}
}

object GpuStructProjection {
def apply(dataType: StructType, projectType: StructType): GpuStructProjection = {
if (projectType.fields().asScala.exists(!_.`type`().isPrimitiveType)) {
throw new IllegalArgumentException(
"Gpu struct projection currently only supports primitive types")
}
val positionMap = new Array[Int](projectType.fields().size())

for (idx <- positionMap.indices) {
val projectedField = projectType.fields().get(idx)
dataType.fields()
.asScala
.zipWithIndex
.find(p => p._1.fieldId() == projectedField.fieldId())
match {
case Some(f) => positionMap(idx) = f._2
case None => throw new IllegalArgumentException(
s"Field ${projectedField.name()} not found in projection source")
}
}

new GpuStructProjection(positionMap)
}
}

This file was deleted.

Loading