-
Notifications
You must be signed in to change notification settings - Fork 262
Add delete support for iceberg's merge on read mode. #13725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: liurenjie1024 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds GPU acceleration support for Iceberg's merge-on-read DELETE operations by implementing position delete file writes on GPU.
Key Changes
-
New
GpuSparkPositionDeltaWrite: Core implementation that handles DELETE operations by writing position delete files instead of rewriting data files. Uses reflection to extract context from CPU implementation and converts operations to GPU-accelerated columnar processing. -
GpuDeleteOnlyDeltaWriter: Processes delete records by extracting metadata (spec ID, partition, file path, position), filtering by partition spec, and writing position deletes through delegated writers. Handles both partitioned and unpartitioned tables. -
Rolling file writers: New
GpuRollingPositionDeleteWriterandGpuRollingDataWriterthat automatically split output into multiple files when target file size is reached, matching Iceberg's file size management strategy. -
GPU execution pipeline: Added
GpuWriteDeltaExecand delta writing tasks that filter operations by type (DELETE/UPDATE/INSERT) and apply projections on GPU before delegating to appropriate writers. -
Partitioning utilities: Enhanced
GpuIcebergPartitionerwithpartitionBymethod and newGpuStructProjectionfor extracting partition values from nested structs on GPU.
Issues Found
- Resource leak (line 382-387 in GpuSparkPositionDeltaWrite.scala):
SpillableColumnarBatchobjects in successfully written partitions are not closed, causing GPU memory leak. ThecloseOnExceptonly handles exception cases.
Test Coverage
Comprehensive tests cover both merge-on-read and copy-on-write modes across multiple scenarios: partitioned/unpartitioned tables, complex predicates, unsupported partition transforms, unsupported file formats, and configuration-based fallbacks.
Confidence Score: 3/5
- This PR requires a fix for a GPU memory leak before merging
- The implementation is well-structured with comprehensive test coverage and follows established patterns from the copy-on-write implementation. However, there is a critical resource management bug in GpuDeleteOnlyDeltaWriter.delete() where SpillableColumnarBatch objects are not properly closed on the success path (line 382-387). This will cause GPU memory leaks in production. Once this issue is fixed, the PR would be safe to merge.
- Pay close attention to
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala- the resource management issue at lines 382-387 must be fixed
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 4/5 | New file implementing GPU-accelerated position delete writes for Iceberg merge-on-read DELETE operations. Handles metadata extraction, partition filtering, and position delete file writing. |
| iceberg/src/main/scala/org/apache/iceberg/io/rolling.scala | 5/5 | New rolling file writer implementation for GPU operations. Handles automatic file rolling when target size is reached and manages position delete file metadata. |
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala | 4/5 | Added GpuWriteDeltaExec for merge-on-read operations and GPU delta writing tasks that apply projections and filter operations by DELETE/UPDATE/INSERT type. |
| integration_tests/src/main/python/iceberg/iceberg_delete_test.py | 5/5 | Comprehensive test coverage for both copy-on-write and merge-on-read DELETE modes, including fallback scenarios for unsupported features. |
Sequence Diagram
sequenceDiagram
participant User
participant GpuWriteDeltaExec
participant GpuDeltaWithMetadataWritingSparkTask
participant GpuDeleteOnlyDeltaWriter
participant GpuIcebergPartitioner
participant GpuRollingPositionDeleteWriter
participant GpuPositionDeleteFileWriter
User->>GpuWriteDeltaExec: DELETE FROM table WHERE condition
GpuWriteDeltaExec->>GpuDeltaWithMetadataWritingSparkTask: process columnar batch
GpuDeltaWithMetadataWritingSparkTask->>GpuDeltaWithMetadataWritingSparkTask: filterByOperation(DELETE_OPERATION)
GpuDeltaWithMetadataWritingSparkTask->>GpuDeltaWithMetadataWritingSparkTask: apply rowIdProjection
GpuDeltaWithMetadataWritingSparkTask->>GpuDeltaWithMetadataWritingSparkTask: apply metadataProjection
GpuDeltaWithMetadataWritingSparkTask->>GpuDeltaWithMetadataWritingSparkTask: filter by DELETE rows
GpuDeltaWithMetadataWritingSparkTask->>GpuDeleteOnlyDeltaWriter: delete(metadata, rowId)
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: extract partition values
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: extract position deletes (file_path, pos)
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: get unique spec IDs
loop For each spec ID
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: filter by spec ID
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: apply partition projection
alt Partitioned table
GpuDeleteOnlyDeltaWriter->>GpuIcebergPartitioner: partitionBy(keys, values)
GpuIcebergPartitioner->>GpuIcebergPartitioner: sort by partition keys
GpuIcebergPartitioner->>GpuIcebergPartitioner: group and split
GpuIcebergPartitioner-->>GpuDeleteOnlyDeltaWriter: partitioned batches
else Unpartitioned table
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: create single partition
end
loop For each partition
GpuDeleteOnlyDeltaWriter->>GpuRollingPositionDeleteWriter: write(batch, spec, partition)
GpuRollingPositionDeleteWriter->>GpuPositionDeleteFileWriter: write position deletes
GpuPositionDeleteFileWriter->>GpuPositionDeleteFileWriter: track referenced data files
alt File size >= target
GpuRollingPositionDeleteWriter->>GpuRollingPositionDeleteWriter: close current file
GpuRollingPositionDeleteWriter->>GpuRollingPositionDeleteWriter: open new file
end
end
end
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: commit()
GpuDeleteOnlyDeltaWriter->>GpuRollingPositionDeleteWriter: close and get result
GpuRollingPositionDeleteWriter-->>GpuDeleteOnlyDeltaWriter: DeleteWriteResult
GpuDeleteOnlyDeltaWriter-->>GpuWriteDeltaExec: DeltaTaskCommit
GpuWriteDeltaExec-->>User: DELETE complete
26 files reviewed, 1 comment
| closeOnExcept(partitions.to[mutable.Queue]) { buffer => | ||
| while (buffer.nonEmpty) { | ||
| val p = buffer.dequeue() | ||
| delegate.write(p.batch, spec, p.partition) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Resource leak: partitions sequence not closed on success path
When delegate.write succeeds for all partitions, the closeOnExcept only protects against exceptions. The SpillableColumnarBatch objects in the successfully written partitions are never closed, causing a GPU memory leak.
| closeOnExcept(partitions.to[mutable.Queue]) { buffer => | |
| while (buffer.nonEmpty) { | |
| val p = buffer.dequeue() | |
| delegate.write(p.batch, spec, p.partition) | |
| } | |
| } | |
| partitions.foreach { p => | |
| try { | |
| delegate.write(p.batch, spec, p.partition) | |
| } catch { | |
| case t: Throwable => | |
| partitions.foreach(_.close()) | |
| throw t | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Implements GPU-accelerated DELETE support for Iceberg tables in merge-on-read mode by writing position delete files.
Key Changes
- Core Implementation:
GpuSparkPositionDeltaWriteandGpuDeleteOnlyDeltaWriterhandle merge-on-read DELETE operations by writing position delete files (file path + row position) instead of rewriting entire data files - Partitioning Support: New
partitionBymethod inGpuIcebergPartitionerto partition delete records by partition keys - File Writers:
GpuRollingPositionDeleteWriterandGpuFanoutPositionDeleteWriterfor writing delete files with rolling file behavior - Integration: Wires up
WriteDeltaExecsupport inIcebergProviderImplto enable GPU execution path - Test Coverage: Comprehensive tests covering both copy-on-write and merge-on-read modes with parametrized tests for various scenarios (unpartitioned, partitioned, complex predicates, fallback cases)
Critical Issue Found
Resource leak in GpuSparkPositionDeltaWrite.scala:382-387 where ColumnarBatchWithPartition objects containing SpillableColumnarBatch are not closed on the success path, leading to GPU memory leak.
Confidence Score: 2/5
- This PR has a critical GPU memory leak that must be fixed before merge
- Score reflects a confirmed resource management bug in the core delete path (line 382-387) that will leak GPU memory on every successful delete operation. While test coverage is comprehensive and the overall architecture is sound, this memory leak is a blocking issue that will cause GPU OOM errors in production workloads.
- Critical:
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala- fix resource leak at lines 382-387
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| integration_tests/src/main/python/iceberg/iceberg_delete_test.py | 5/5 | Adds comprehensive test coverage for merge-on-read DELETE operations with parametrized tests for both copy-on-write and merge-on-read modes |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 2/5 | Core implementation of GPU-accelerated merge-on-read DELETE operations. Contains resource leak at line 382-387 where SpillableColumnarBatch objects are not properly closed. |
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 4/5 | Adds partitionBy method for partitioning deletes by key, properly managing GPU memory through SpillableColumnarBatch |
| iceberg/src/main/scala/org/apache/iceberg/io/rolling.scala | 4/5 | Implements GPU position delete file writers with rolling file behavior for merge-on-read mode |
Sequence Diagram
sequenceDiagram
participant User
participant Spark
participant GpuWriteDeltaExec
participant GpuDeleteOnlyDeltaWriter
participant GpuIcebergPartitioner
participant GpuFanoutPositionDeleteWriter
participant GpuRollingPositionDeleteWriter
participant ParquetWriter
User->>Spark: DELETE FROM iceberg_table WHERE condition
Spark->>GpuWriteDeltaExec: Execute delete in merge-on-read mode
GpuWriteDeltaExec->>GpuDeleteOnlyDeltaWriter: delete(metadata, rowId)
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: Extract partition values & position deletes
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: Get unique spec IDs from metadata
loop For each spec ID
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: Filter by spec ID
GpuDeleteOnlyDeltaWriter->>GpuIcebergPartitioner: partitionBy(keys, values)
GpuIcebergPartitioner->>GpuIcebergPartitioner: Sort and group by partition keys
GpuIcebergPartitioner-->>GpuDeleteOnlyDeltaWriter: Seq[ColumnarBatchWithPartition]
loop For each partition
GpuDeleteOnlyDeltaWriter->>GpuFanoutPositionDeleteWriter: write(batch, spec, partition)
GpuFanoutPositionDeleteWriter->>GpuRollingPositionDeleteWriter: write(SpillableColumnarBatch)
GpuRollingPositionDeleteWriter->>ParquetWriter: Write delete file (file_path, position)
Note over GpuDeleteOnlyDeltaWriter: Resource leak: ColumnarBatchWithPartition not closed
end
end
GpuDeleteOnlyDeltaWriter->>GpuFanoutPositionDeleteWriter: close()
GpuFanoutPositionDeleteWriter-->>GpuDeleteOnlyDeltaWriter: DeleteWriteResult
GpuWriteDeltaExec-->>Spark: Commit message with delete files
Spark-->>User: Delete complete
2 files reviewed, 1 comment
| closeOnExcept(partitions.to[mutable.Queue]) { buffer => | ||
| while (buffer.nonEmpty) { | ||
| val p = buffer.dequeue() | ||
| delegate.write(p.batch, spec, p.partition) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Resource leak confirmed: ColumnarBatchWithPartition objects not closed on success
When all writes succeed, the closeOnExcept only handles exceptions. Each ColumnarBatchWithPartition in the buffer wraps a SpillableColumnarBatch that must be closed to free GPU memory. Currently these are never closed on the success path.
Need to ensure proper cleanup regardless of success/failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements GPU-accelerated DELETE operations for Apache Iceberg tables in merge-on-read mode. The implementation adds position delete file writing capabilities that complement the existing copy-on-write support.
Key changes:
- Added
GpuSparkPositionDeltaWriteclass to handle merge-on-read DELETE operations using position delete files - Implemented
GpuDeleteOnlyDeltaWriterwith proper resource management using spillable batches and GPU retry support - Added support for both fanout and clustered position delete writers (
GpuFanoutPositionDeleteWriter,GpuClusteredPositionDeleteWriter) - Implemented
GpuPositionDeleteFileWriterandGpuRollingPositionDeleteWriterfor writing position delete files with rolling file behavior - Extended
GpuIcebergPartitionerwithpartitionBymethod for partitioning delete records - Added comprehensive integration tests covering both copy-on-write and merge-on-read modes
Resource management:
The resource leak initially identified in previous comments was fixed in commit 9113cbbd. The implementation now properly uses SpillableColumnarBatch with withRetryNoSplit to handle GPU memory pressure and retries. All batches are properly closed either by the parquet appender on success or by closeOnExcept handlers on failure.
Confidence Score: 5/5
- This PR is safe to merge with the resource leak fix already applied
- The implementation follows established patterns from the existing copy-on-write support, includes proper resource management with spillable batches and GPU retry handling, and has comprehensive test coverage for both modes. The previously identified resource leak was fixed in commit 9113cbb.
- No files require special attention - the resource leak has been resolved
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 5/5 | Implements GPU-accelerated position delete writes for Iceberg merge-on-read DELETE operations; resource leak fixed in commit 9113cbb with proper spillable batch management |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 5/5 | No changes to core logic; existing GPU write operations remain stable |
| integration_tests/src/main/python/iceberg/iceberg_delete_test.py | 5/5 | Comprehensive test coverage for both copy-on-write and merge-on-read DELETE operations with various scenarios including partitioning, fallback cases, and edge cases |
Sequence Diagram
sequenceDiagram
participant User
participant Spark
participant GpuWriteDeltaExec
participant GpuPositionDeltaWriterFactory
participant GpuDeleteOnlyDeltaWriter
participant GpuPartitioner
participant GpuFanout/ClusteredWriter
participant GpuRollingPositionDeleteWriter
participant GpuPositionDeleteFileWriter
participant ParquetAppender
User->>Spark: DELETE FROM table WHERE condition
Spark->>GpuWriteDeltaExec: execute delete operation
GpuWriteDeltaExec->>GpuPositionDeltaWriterFactory: createWriter(partitionId, taskId)
GpuPositionDeltaWriterFactory->>GpuDeleteOnlyDeltaWriter: new writer
loop For each delete batch
GpuWriteDeltaExec->>GpuDeleteOnlyDeltaWriter: delete(metadata, rowId)
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: extract partition values & position deletes
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: create spillable batches with retry support
loop For each unique spec ID
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: filter by spec ID
GpuDeleteOnlyDeltaWriter->>GpuPartitioner: partitionBy(keys, values)
GpuPartitioner-->>GpuDeleteOnlyDeltaWriter: Seq[ColumnarBatchWithPartition]
loop For each partition
GpuDeleteOnlyDeltaWriter->>GpuFanout/ClusteredWriter: write(batch, spec, partition)
GpuFanout/ClusteredWriter->>GpuRollingPositionDeleteWriter: write(batch)
GpuRollingPositionDeleteWriter->>GpuPositionDeleteFileWriter: write(spillableBatch)
GpuPositionDeleteFileWriter->>ParquetAppender: add(spillableBatch)
Note over ParquetAppender: Closes spillable batch after write
end
end
end
GpuWriteDeltaExec->>GpuDeleteOnlyDeltaWriter: commit()
GpuDeleteOnlyDeltaWriter->>GpuFanout/ClusteredWriter: close()
GpuFanout/ClusteredWriter->>GpuRollingPositionDeleteWriter: close()
GpuRollingPositionDeleteWriter->>GpuPositionDeleteFileWriter: close()
GpuPositionDeleteFileWriter-->>GpuDeleteOnlyDeltaWriter: DeleteWriteResult
GpuDeleteOnlyDeltaWriter-->>Spark: DeltaTaskCommit
3 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR fixes a build break by adding the missing positionDeleteSparkType parameter to GpuSparkFileWriterFactory in the regular write path. This aligns the signature with merge-on-read delete operations.
Key changes:
- Added
SparkPositionDeltaWritesupport inGpuSparkWrite.supports()to enable GPU acceleration for Iceberg merge-on-read DELETE operations - Refactored
tagForGpuWrite()to accept optional parameters for both data and delete file formats, allowing validation of delete file format (Parquet only) - Added missing
positionDeleteSparkTypeparameter toGpuSparkFileWriterFactoryconstructor call at line 330, fixing the build break introduced when the factory signature was updated
The changes are minimal and focused on fixing the compilation error while maintaining consistency across write paths.
Confidence Score: 4/5
- Safe to merge with minimal risk - fixes build break with focused parameter addition
- Score reflects that this is a straightforward build fix adding a required constructor parameter. The changes are minimal and align the regular write path with the merge-on-read delete path. However, the PR is part of a larger feature addition, and there's an existing resource leak issue (from previous comments) in related code that should be addressed separately.
- No files in this PR require special attention - the change is a simple parameter addition to fix a build break
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkWrite.scala | 4/5 | Adds SparkPositionDeltaWrite support and refactors tagForGpuWrite to handle delete file formats; adds missing positionDeleteSparkType parameter to GpuSparkFileWriterFactory |
Sequence Diagram
sequenceDiagram
participant Client
participant GpuSparkWrite
participant GpuSparkPositionDeltaWrite
participant GpuWriterFactory
participant GpuSparkFileWriterFactory
participant GpuPositionDeltaWriterFactory
Note over Client,GpuPositionDeltaWriterFactory: Delete Command for Merge-on-Read Mode
Client->>GpuSparkWrite: supports(SparkPositionDeltaWrite)
GpuSparkWrite-->>Client: true
Client->>GpuSparkWrite: tagForGpuWrite(dataFormat, deleteFormat, ...)
Note over GpuSparkWrite: Check Parquet support for both<br/>data and delete formats
GpuSparkWrite-->>Client: GPU support validated
alt Regular Write (Append/Overwrite)
Client->>GpuSparkWrite: toBatch()
GpuSparkWrite->>GpuWriterFactory: createDataWriterFactory()
GpuWriterFactory->>GpuSparkFileWriterFactory: new(table, format, dsSchema, ..., positionDeleteSparkType)
Note over GpuSparkFileWriterFactory: Now includes positionDeleteSparkType<br/>parameter (fixes build break)
else Merge-on-Read Delete
Client->>GpuSparkPositionDeltaWrite: toBatch()
GpuSparkPositionDeltaWrite->>GpuPositionDeltaWriterFactory: createDeltaWriterFactory()
GpuPositionDeltaWriterFactory->>GpuSparkFileWriterFactory: new(table, dataFormat, ..., positionDeleteSparkType)
Note over GpuSparkFileWriterFactory: Uses positionDeleteSparkType<br/>for delete file schema
end
1 file reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds GPU support for Iceberg DELETE operations in merge-on-read mode by implementing position delete file writers.
Key Changes
- New GPU Writers: Implements
GpuSparkPositionDeltaWrite,GpuDeleteOnlyDeltaWriter, and rolling/fanout/clustered position delete writers - Partitioning Support: Adds
GpuIcebergPartitioner.partitionByto partition position deletes by data file and partition spec - Integration: Connects
WriteDeltaExecto GPU execution path throughIcebergProviderImpl - Utilities: Adds
GpuStructProjectionfor field projection in partition metadata
Critical Issues Found
- Resource Leak (GpuSparkPositionDeltaWrite.scala:382):
SpillableColumnarBatchobjects in successfully written partitions are never closed, causing GPU memory leak - NPE Risk (rolling.scala:65):
currentWriter.getcalled without checking if initialized - Reference Counting (GpuIcebergPartitioner.scala:222): Column references assigned without
incRefCount(), risking double-free or leaks
Architecture
The implementation follows Iceberg's layered writer pattern:
- Query executor calls
GpuDeleteOnlyDeltaWriter.delete() - Metadata and row IDs are extracted and partitioned by spec ID
- Each partition is written through fanout/clustered delegates to rolling writers
- Rolling writers manage file size targets and create multiple delete files as needed
- Position delete files track which rows to delete from data files
Confidence Score: 2/5
- This PR has critical GPU memory leaks that will cause production failures
- Score of 2 reflects three critical resource management bugs that will cause GPU memory leaks in production. The partition batches leak (line 382) will accumulate with every delete operation, the NPE risk could crash tasks, and the reference counting issue could cause memory corruption. These must be fixed before merging.
- Pay critical attention to GpuSparkPositionDeltaWrite.scala (resource leak at line 382), rolling.scala (NPE at line 65), and GpuIcebergPartitioner.scala (reference counting at line 222)
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 2/5 | Adds GPU position delete writer for merge-on-read deletes. Critical resource leak at line 382 where successfully written partitions aren't closed. |
| iceberg/src/main/scala/org/apache/iceberg/io/rolling.scala | 2/5 | New rolling file writer implementation. NPE risk at line 65 if currentWriter not initialized before first write call. |
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 2/5 | Adds partitionBy function for position deletes. Resource management issue at line 222 - column references assigned without incRefCount. |
Sequence Diagram
sequenceDiagram
participant Spark as Spark Executor
participant Writer as GpuDeleteOnlyDeltaWriter
participant Partitioner as GpuIcebergPartitioner
participant Delegate as PartitioningWriter
participant RollingWriter as GpuRollingPositionDeleteWriter
participant FileWriter as GpuPositionDeleteFileWriter
participant GPU as GPU Memory
Spark->>Writer: delete(metadata, rowId)
Writer->>Writer: extractToStruct(metadata, partitionOrdinal)
Writer->>Writer: extractPositionDeletes(rowId)
Writer->>GPU: SpillableColumnarBatch (partition values)
Writer->>GPU: SpillableColumnarBatch (position deletes)
Writer->>Writer: uniqueSpecIds(metadata)
loop For each unique spec ID
Writer->>Writer: filter by spec ID
Writer->>Partitioner: partitionBy(keys, values)
Partitioner->>Partitioner: addRowIdxToTable(keysTable)
Partitioner->>Partitioner: orderBy & group by keys
Partitioner->>Partitioner: contiguousSplit by partition
Partitioner-->>Writer: Seq[ColumnarBatchWithPartition]
Note over Writer: ISSUE: Partitions not closed after write!
loop For each partition
Writer->>Delegate: write(batch, spec, partition)
Delegate->>RollingWriter: write(SpillableColumnarBatch)
RollingWriter->>FileWriter: write(batch)
FileWriter->>FileWriter: extract file paths to referencedDataFiles
FileWriter->>FileWriter: appender.add(batch)
alt File size >= targetFileSize
RollingWriter->>RollingWriter: closeCurrentWriter()
RollingWriter->>RollingWriter: openCurrentWriter()
end
end
Note over Writer,GPU: Memory leak: partition batches<br/>never closed on success path
end
Writer->>Delegate: close()
Delegate->>RollingWriter: close()
RollingWriter->>FileWriter: close()
FileWriter-->>RollingWriter: DeleteWriteResult
RollingWriter-->>Delegate: DeleteWriteResult
Delegate-->>Writer: DeleteWriteResult
Writer-->>Spark: WriterCommitMessage
1 file reviewed, 3 comments
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala
Outdated
Show resolved
Hide resolved
| throw new IllegalStateException("Cannot write to a closed writer") | ||
| } | ||
|
|
||
| currentWriter.get.write(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Potential NPE: currentWriter.get can fail if writer not initialized
The write method calls currentWriter.get without checking if currentWriter is defined. If openCurrentWriter() wasn't called during object construction, this will throw NoSuchElementException.
| currentWriter.get.write(batch) | |
| if (currentWriter.isEmpty) { | |
| openCurrentWriter() | |
| } | |
| currentWriter.get.write(batch) |
iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Adds GPU-accelerated delete command support for Iceberg's merge-on-read mode by implementing position delete file writes. The implementation includes GpuSparkPositionDeltaWrite, GpuDeleteOnlyDeltaWriter, and supporting infrastructure for partitioning and writing position deletes.
Key Changes:
- Implements
GpuSparkPositionDeltaWritefor handling DELETE operations in merge-on-read mode - Adds
GpuDeleteOnlyDeltaWriterthat partitions position deletes and writes them via clustered/fanout writers - Creates
GpuRollingPositionDeleteWriterandGpuPositionDeleteFileWriterfor rolling file writes - Extends
GpuIcebergPartitionerwithpartitionBymethod for partitioning delete records - Adds support for
WriteDeltaExecinIcebergProviderImpl
Critical Issues Found:
- Resource leak at GpuSparkPositionDeltaWrite.scala:386-391 -
SpillableColumnarBatchobjects not closed after writes
Confidence Score: 2/5
- This PR has a critical GPU memory leak that will cause resource exhaustion in production
- Score of 2 reflects a confirmed resource leak where
SpillableColumnarBatchobjects are not closed after successful writes. Previous reviewers have identified this issue multiple times. The leak occurs becausecloseOnExceptonly handles exception paths, leaving successfully written batches unclosed, causing GPU memory to leak with each delete operation. - iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala - Fix resource leak at lines 386-391
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 2/5 | Implements GPU-accelerated position delete writes for Iceberg merge-on-read deletes. Critical resource leak confirmed at line 386-391 where SpillableColumnarBatch objects are not closed after successful writes. |
| iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala | 4/5 | Adds partitionBy method and addRowIdxToTable helper for partitioning position deletes. Column references handled correctly in addRowIdxToTable since CUDF Tables don't increment refcounts when constructed. |
Sequence Diagram
sequenceDiagram
participant User
participant WriteDeltaExec
participant GpuSparkPositionDeltaWrite
participant GpuDeleteOnlyDeltaWriter
participant GpuIcebergPartitioner
participant PartitioningWriter
participant GpuRollingPositionDeleteWriter
participant GpuPositionDeleteFileWriter
participant ParquetAppender
User->>WriteDeltaExec: DELETE operation
WriteDeltaExec->>GpuSparkPositionDeltaWrite: create delta write
GpuSparkPositionDeltaWrite->>GpuDeleteOnlyDeltaWriter: create writer
loop For each delete batch
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: extract metadata & rowIds
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: get unique spec IDs
loop For each spec ID
GpuDeleteOnlyDeltaWriter->>GpuDeleteOnlyDeltaWriter: filter by spec ID
GpuDeleteOnlyDeltaWriter->>GpuIcebergPartitioner: partitionBy(keys, values)
GpuIcebergPartitioner-->>GpuDeleteOnlyDeltaWriter: partitioned batches
loop For each partition
GpuDeleteOnlyDeltaWriter->>PartitioningWriter: write(batch, spec, partition)
PartitioningWriter->>GpuRollingPositionDeleteWriter: route to partition writer
GpuRollingPositionDeleteWriter->>GpuPositionDeleteFileWriter: write(batch)
GpuPositionDeleteFileWriter->>ParquetAppender: add(batch)
ParquetAppender->>ParquetAppender: writeSpillableAndClose(batch)
Note right of GpuDeleteOnlyDeltaWriter: ⚠️ Batch not explicitly closed<br/>causing GPU memory leak
end
end
end
GpuDeleteOnlyDeltaWriter->>PartitioningWriter: close()
PartitioningWriter-->>GpuDeleteOnlyDeltaWriter: DeleteWriteResult
GpuDeleteOnlyDeltaWriter-->>User: commit message
2 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements GPU-accelerated support for Iceberg's merge-on-read DELETE operations by adding position delete file writing capabilities.
Key Changes
- Adds
GpuSparkPositionDeltaWritefor handling merge-on-read DELETE operations that write position delete files instead of rewriting data files - Implements
GpuWriteDeltaExecand delta writing tasks (GpuDeltaWritingSparkTask,GpuDeltaWithMetadataWritingSparkTask) to handle row-level operations - Creates rolling file writers (
GpuRollingPositionDeleteWriter) that manage file size limits and write multiple delete files as needed - Adds
GpuIcebergPartitionerfor GPU-based partitioning of delete records according to Iceberg partition specs - Implements proper integration with Iceberg's file format (Parquet) through
GpuIcebergParquetAppender
Issues Found
- Critical GPU memory leaks in
WriteToDataSourceV2Exec.scalawhere filtered batches are not closed when row count is zero (lines 468-480, 485-503) - The leaks occur in both delete and update paths when using
closeOnExceptinstead ofwithResourcefor resource management
Confidence Score: 2/5
- Not safe to merge - contains critical GPU memory leaks that will cause resource exhaustion
- Score reflects critical resource management bugs in the delta writing implementation. The leaks in
WriteToDataSourceV2Exec.scalawill cause GPU memory to accumulate whenever delete or update operations produce zero rows after filtering, leading to OOM errors in production workloads. The core Iceberg integration logic appears sound, but the memory leaks must be fixed before merge. - Pay close attention to
sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala- contains critical GPU memory leaks inGpuDeltaWithMetadataWritingSparkTask.writemethod
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| sql-plugin/src/main/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala | 2/5 | Adds GPU delta write support for Iceberg; contains critical GPU memory leaks in zero-row edge cases |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 4/5 | New file implementing GPU position delete writes for merge-on-read; resource management appears correct |
| iceberg/src/main/scala/org/apache/iceberg/io/rolling.scala | 5/5 | Implements rolling file writers for GPU; clean implementation with proper resource handling |
Sequence Diagram
sequenceDiagram
participant Spark as Spark Executor
participant GpuWriteDelta as GpuWriteDeltaExec
participant DeltaWriter as GpuDeltaWithMetadata<br/>WritingSparkTask
participant PositionWriter as GpuDeleteOnlyDeltaWriter
participant RollingWriter as GpuRollingPosition<br/>DeleteWriter
participant FileWriter as GpuPositionDelete<br/>FileWriter
participant Appender as GpuIcebergParquet<br/>Appender
Spark->>GpuWriteDelta: execute()
GpuWriteDelta->>DeltaWriter: write(batch)
Note over DeltaWriter: Filter by operation type<br/>(DELETE/UPDATE/INSERT)
DeltaWriter->>DeltaWriter: filterByOperation(DELETE)
DeltaWriter->>DeltaWriter: project rowIds + metadata
DeltaWriter->>PositionWriter: delete(metadata, rowIds)
Note over PositionWriter: Extract partition spec<br/>and partition values
PositionWriter->>PositionWriter: partition by spec
loop For each partition
PositionWriter->>RollingWriter: write(batch, spec, partition)
Note over RollingWriter: Check if file size<br/>exceeds target
alt New file needed
RollingWriter->>RollingWriter: closeCurrentWriter()
RollingWriter->>RollingWriter: openCurrentWriter()
end
RollingWriter->>FileWriter: write(SpillableColumnarBatch)
Note over FileWriter: Track referenced<br/>data files
FileWriter->>Appender: add(SpillableColumnarBatch)
Note over Appender: Write to Parquet file<br/>and close batch
end
PositionWriter->>PositionWriter: commit()
PositionWriter->>RollingWriter: close()
RollingWriter->>FileWriter: close()
FileWriter-->>PositionWriter: DeleteWriteResult
PositionWriter-->>Spark: WriterCommitMessage
4 files reviewed, 2 comments
| val rowIds = withResource(rowIdProjection.project(batch)) { rowIds => | ||
| GpuColumnVector.filter(rowIds, rowIdDataTypes, deleteFilter) | ||
| } | ||
|
|
||
| closeOnExcept(rowIds) { _ => | ||
| if (rowIds.numRows() > 0) { | ||
| val metadataBatch = withResource(metadataProjection.project(batch)) { metaBatch => | ||
| GpuColumnVector.filter(metaBatch, metadataDataTypes, deleteFilter) | ||
| } | ||
|
|
||
| writer.delete(metadataBatch, rowIds) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: GPU memory leak: rowIds not closed when numRows() == 0
When the delete filter produces zero rows, rowIds is never closed because closeOnExcept only handles exceptions. Use withResource instead.
| val rowIds = withResource(rowIdProjection.project(batch)) { rowIds => | |
| GpuColumnVector.filter(rowIds, rowIdDataTypes, deleteFilter) | |
| } | |
| closeOnExcept(rowIds) { _ => | |
| if (rowIds.numRows() > 0) { | |
| val metadataBatch = withResource(metadataProjection.project(batch)) { metaBatch => | |
| GpuColumnVector.filter(metaBatch, metadataDataTypes, deleteFilter) | |
| } | |
| writer.delete(metadataBatch, rowIds) | |
| } | |
| } | |
| withResource(rowIdProjection.project(batch)) { rowIds => | |
| val filteredRowIds = GpuColumnVector.filter(rowIds, rowIdDataTypes, deleteFilter) | |
| withResource(filteredRowIds) { _ => | |
| if (filteredRowIds.numRows() > 0) { | |
| val metadataBatch = withResource(metadataProjection.project(batch)) { metaBatch => | |
| GpuColumnVector.filter(metaBatch, metadataDataTypes, deleteFilter) | |
| } | |
| writer.delete(metadataBatch, filteredRowIds) | |
| } | |
| } | |
| } |
| val rowIds = withResource(rowIdProjection.project(batch)) { rowIds => | ||
| GpuColumnVector.filter(rowIds, rowIdDataTypes, updateFilter) | ||
| } | ||
|
|
||
| closeOnExcept(rowIds) { _ => | ||
| if (rowIds.numRows() > 0) { | ||
| val rows = withResource(rowProjection.project(batch)) { rows => | ||
| GpuColumnVector.filter(rows, rowDataTypes, updateFilter) | ||
| } | ||
|
|
||
| closeOnExcept(rows) { _ => | ||
| val metadataBatch = withResource(metadataProjection.project(batch)) { metadata => | ||
| GpuColumnVector.filter(metadata, metadataDataTypes, updateFilter) | ||
| } | ||
|
|
||
| writer.update(metadataBatch, rowIds, rows) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: GPU memory leak: rowIds and rows not closed when numRows() == 0
Same issue as delete path - resources leaked on zero-row case. Use withResource pattern.
| val rowIds = withResource(rowIdProjection.project(batch)) { rowIds => | |
| GpuColumnVector.filter(rowIds, rowIdDataTypes, updateFilter) | |
| } | |
| closeOnExcept(rowIds) { _ => | |
| if (rowIds.numRows() > 0) { | |
| val rows = withResource(rowProjection.project(batch)) { rows => | |
| GpuColumnVector.filter(rows, rowDataTypes, updateFilter) | |
| } | |
| closeOnExcept(rows) { _ => | |
| val metadataBatch = withResource(metadataProjection.project(batch)) { metadata => | |
| GpuColumnVector.filter(metadata, metadataDataTypes, updateFilter) | |
| } | |
| writer.update(metadataBatch, rowIds, rows) | |
| } | |
| } | |
| } | |
| withResource(rowIdProjection.project(batch)) { rowIds => | |
| val filteredRowIds = GpuColumnVector.filter(rowIds, rowIdDataTypes, updateFilter) | |
| withResource(filteredRowIds) { _ => | |
| if (filteredRowIds.numRows() > 0) { | |
| val rows = withResource(rowProjection.project(batch)) { rows => | |
| GpuColumnVector.filter(rows, rowDataTypes, updateFilter) | |
| } | |
| withResource(rows) { _ => | |
| val metadataBatch = withResource(metadataProjection.project(batch)) { metadata => | |
| GpuColumnVector.filter(metadata, metadataDataTypes, updateFilter) | |
| } | |
| writer.update(metadataBatch, filteredRowIds, rows) | |
| } | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR adds GPU-accelerated delete command support for Iceberg's merge-on-read mode by implementing position delete file writes.
Key Changes
- Added
GpuSparkPositionDeltaWriteto handle merge-on-read DELETE operations with GPU acceleration - Implemented
GpuDeleteOnlyDeltaWriterthat writes position delete files instead of rewriting data files - Created GPU file writers (
GpuRollingPositionDeleteWriter,GpuClusteredPositionDeleteWriter,GpuFanoutPositionDeleteWriter) for different write patterns - Added
GpuStructProjectionutility for efficient columnar batch field projection - Enhanced
GpuIcebergPartitionerwithpartitionBymethod for partitioning position deletes - Extended
GpuTypeToSparkTypewith Iceberg metadata column support
Critical Issues Found
- GPU Memory Leak (GpuSparkPositionDeltaWrite.scala:386-391):
SpillableColumnarBatchobjects not closed after successful writes - only exception path is protected - Resource Management Issue (GpuIcebergPartitioner.scala:220-226): Column view references may be incorrectly closed on exception path
Confidence Score: 1/5
- Critical GPU memory leak will cause OOM errors in production
- Score reflects critical resource leak at line 386-391 in GpuSparkPositionDeltaWrite.scala that will cause GPU memory exhaustion under load. The
closeOnExceptpattern only protects exception paths, leavingSpillableColumnarBatchobjects unclosed on successful writes. This is a blocking issue for production use. - GpuSparkPositionDeltaWrite.scala requires immediate attention to fix resource leak at lines 386-391. GpuIcebergPartitioner.scala needs review of resource ownership at lines 220-226.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| iceberg/src/main/scala/org/apache/iceberg/spark/GpuTypeToSparkType.scala | 5/5 | Added GPU type conversion utilities - straightforward implementation with proper metadata handling |
| iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala | 1/5 | Implements GPU position delete writes for merge-on-read deletes - critical GPU memory leak at line 386-391 |
Sequence Diagram
sequenceDiagram
participant Exec as GpuWriteDeltaExec
participant Write as GpuSparkPositionDeltaWrite
participant Factory as GpuPositionDeltaWriterFactory
participant Writer as GpuDeleteOnlyDeltaWriter
participant Partitioner as GpuIcebergPartitioner
participant Delegate as PartitioningWriter
Exec->>Write: toBatch()
Write->>Write: createDeltaWriterFactory()
Write-->>Exec: GpuPositionDeltaBatchWrite
Exec->>Factory: createWriter(partitionId, taskId)
Factory->>Writer: new GpuDeleteOnlyDeltaWriter()
Factory-->>Exec: DeltaWriter
Exec->>Writer: delete(metadata, rowId)
Writer->>Writer: extractToStruct(metadata)
Writer->>Writer: extractPositionDeletes(rowId)
Writer->>Writer: uniqueSpecIds(metadata)
loop For each unique spec ID
Writer->>Writer: Filter by spec ID
Writer->>Partitioner: partitionBy(keys, values)
Partitioner->>Partitioner: Sort and group keys
Partitioner->>Partitioner: Split values by partition
Partitioner-->>Writer: Seq[ColumnarBatchWithPartition]
loop For each partition
Writer->>Delegate: write(batch, spec, partition)
Note right of Writer: ISSUE: SpillableColumnarBatch<br/>not closed on success
end
end
Writer->>Delegate: close()
Delegate->>Delegate: Write position delete files
Writer-->>Exec: WriterCommitMessage
2 files reviewed, 2 comments
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala
Outdated
Show resolved
Hide resolved
iceberg/src/main/scala/com/nvidia/spark/rapids/iceberg/GpuIcebergPartitioner.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/SchemaUtils.scala
Outdated
Show resolved
Hide resolved
...lugin/src/main/spark350/scala/org/apache/spark/sql/catalyst/GpuProjectingColumnarBatch.scala
Outdated
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Show resolved
Hide resolved
...n/spark350/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
22 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
22 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, 1 comment
| closeOnExcept(mutable.Queue(partitions: _*)) { buffer => | ||
| while (buffer.nonEmpty) { | ||
| val p = buffer.dequeue() | ||
| delegate.write(p.batch, spec, p.partition) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: GPU memory leak: SpillableColumnarBatch in partitions not closed on success
closeOnExcept only protects against exceptions. When all writes succeed, each ColumnarBatchWithPartition in the queue contains a SpillableColumnarBatch that must be closed to free GPU memory, but they're never closed.
| closeOnExcept(mutable.Queue(partitions: _*)) { buffer => | |
| while (buffer.nonEmpty) { | |
| val p = buffer.dequeue() | |
| delegate.write(p.batch, spec, p.partition) | |
| } | |
| } | |
| withResource(mutable.Queue(partitions: _*)) { buffer => | |
| while (buffer.nonEmpty) { | |
| val p = buffer.dequeue() | |
| withResource(p.batch) { _ => | |
| delegate.write(p.batch, spec, p.partition) | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, 1 comment
| closeOnExcept(mutable.Queue(partitions: _*)) { buffer => | ||
| while (buffer.nonEmpty) { | ||
| val p = buffer.dequeue() | ||
| delegate.write(p.batch, spec, p.partition) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Resource leak: SpillableColumnarBatch in partitions not closed on success path
closeOnExcept only handles exceptions. When delegate.write succeeds for all partitions, the SpillableColumnarBatch inside each ColumnarBatchWithPartition (p.batch) is never closed, causing GPU memory leaks.
| closeOnExcept(mutable.Queue(partitions: _*)) { buffer => | |
| while (buffer.nonEmpty) { | |
| val p = buffer.dequeue() | |
| delegate.write(p.batch, spec, p.partition) | |
| } | |
| } | |
| withResource(mutable.Queue(partitions: _*)) { buffer => | |
| while (buffer.nonEmpty) { | |
| val p = buffer.dequeue() | |
| withResource(p.batch) { _ => | |
| delegate.write(p.batch, spec, p.partition) | |
| } | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, no comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, 1 comment
iceberg/src/main/scala/org/apache/iceberg/spark/source/GpuSparkPositionDeltaWrite.scala
Show resolved
Hide resolved
res-life
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
build |
|
#13688 merged, could you please update this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
22 files reviewed, no comments
|
build |
Fixes #13634
Description
Add delete command support for iceberg's merge on read mode.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)