-
Notifications
You must be signed in to change notification settings - Fork 262
Add missing handling for the 0 input partition count in computing partition count in GpuOptimizeWriteExchangeExec #13780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…eExchangeExec Signed-off-by: Jihoon Son <[email protected]>
Greptile OverviewGreptile SummaryThis PR fixes a divide-by-zero Changes:
Confidence Score: 5/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Client as Write Operation
participant GOWE as GpuOptimizeWriteExchangeExec
participant RDD as inputRDD
participant Shuffle as Shuffle/Partition Logic
Client->>GOWE: executeColumnar()
GOWE->>RDD: getNumPartitions()
RDD-->>GOWE: childNumPartitions (could be 0)
alt childNumPartitions == 0
GOWE->>GOWE: actualNumPartitions = 0
GOWE->>GOWE: mapOutputStatisticsFuture = Future.successful(null)
GOWE->>Shuffle: Create shuffle with 0 partitions
Shuffle-->>Client: Empty result (no divide-by-zero)
else childNumPartitions > 0
GOWE->>GOWE: Calculate actualNumPartitions<br/>(targetShuffleBlocks / childNumPartitions)
GOWE->>Shuffle: submitMapStage(shuffleDependency)
GOWE->>Shuffle: rebalancePartitions(stats)
Shuffle-->>Client: Optimized partitioned result
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 2 comments
|
build |
|
|
||
| @transient lazy val inputRDD: RDD[ColumnarBatch] = child.executeColumnar() | ||
|
|
||
| private lazy val childNumPartitions = inputRDD.getNumPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is potential source of inconsistency to serialize an attribute or computation where the underlying input might change after deserialization.
| private lazy val childNumPartitions = inputRDD.getNumPartitions | |
| @transient private lazy val childNumPartitions = inputRDD.getNumPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. How can this code be executed before serialization? inputRDD should never be used outside the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be correct but looking at this code I would always have to doubt given that one thing is transient and the other thing depending on it is not.
Fixes #13779.
Description
GpuOptimizeWriteExchangeExeccomputes theactualNumPartitionsbased on the partition count of its input. The computedactualNumPartitionsis used to dynamically optimize the partitioning. This logic is currently missing handling of the case when the input partition count is 0, which can cause the ArithmeticException error. This PR adds a proper handling of the case.Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)