Skip to content

[FEA] Hash aggregation DECIMAL128 SUM #19251

@revans2

Description

@revans2

Is your feature request related to a problem? Please describe.
This is related to #19243, but doing a decimal128 SUM aggregation in CUDF requires that we use a sort based aggregation instead of a hash based one. This can slow things down a lot.

Spark supports an ANSI mode. In this mode if an overflow is detected it should result in an error. When ANSI mode is disabled, for integer values an overflow just overflows, but decimal values replace the overflowed data with a null. It is great how consistent Spark is.

But this has resulted in us having several different implementations for SUM aggregations.

For a non-ANSI long SUM we just used CUDF for the processing.

(0 until 5).foreach { _ =>
   spark.time(spark.range(6000000000L).selectExpr("id % 2 as k", "id % 100 as v").groupBy("k").agg(sum(col("v"))).show())
}

When I profiled this took about 850 ms to 950 ms for each iteration of the loop.

The kernel that took the most time was % MOD at 1.519s, with single_pass_shmem_aggs_kernel the second at 1.293s, and then some others around hash aggregation mapping_indices_kernel - 895ms and DeviceSelectSweepKernel 462ms. There were more, but you get the idea.

For ANSI mode long SUM I decided to try and use decimal128 as the aggregation. This way I could detect the overflow without a lot of work and throw an exception if we saw it.

spark.conf.set("spark.sql.ansi.enabled", true)
(0 until 5).foreach { _ =>
   spark.time(spark.range(6000000000L).selectExpr("id % 2 as k", "id % 100 as v").groupBy("k").agg(sum(col("v"))).show())
}

When profiled this took about 4.3 seconds per iteration of the loop.

nsys reported that radix_sort took the most time at 8.729s followed by UniqueAgent at 3.847s, then cast to decimal128 from the long at 2.267 seconds, MOD at 1.405s, and then finally the SUM operations itself at 1.056s.

The sort is clearly expensive. Much more than creating and using a hash map.

For DECIMAL128 aggregations we play some games and split the DECIMAL128 into 4 INT32s. Then we do a long aggregation on each of the int 32 values, and finally combine them back together with another kernel which lets us detect if any of the 32 bit values overflowed.

(0 until 5).foreach { _ =>
  spark.time(spark.range(6000000000L).selectExpr("id % 2 as k", "CAST(id % 100 as DECIMAL(38,0)) as v").groupBy("k").agg(sum(col("v"))).show())
}

When profiled this took about 2.4 seconds per loop.

nsys reported that now single_pass_shmem_aggs_kernel took the most time at 5.899s followed by the kernel that extracts the 32-bit chunks at 3.538s, then MOD at 1.638s, mapping_indices_kernel at 1.244s, CAST TO DECIMAL128 at 1.093s and finally DeviceSelectSweepKernel at 0.435s

It is clear that our hacky game makes it possible to do a DECIMAL128 SUM using a hash table instead of sorting the data, and it is faster than what exists today.

Describe the solution you'd like

But I think we can do even better. atomicAdd always returns old or the value before doing the addition operation. With this information we can do the atomic add as if it were 4 32-bit add operations, or 2 64-bit add operations. We would have to calculate any carry bit ourselves after the lower order atomic operations succeeds before moving to a higher order atomicAdd.

128-bit atomicCAS is also supported on devices of compute capability 9.x and higher. So we could have two implementations. One that uses atomicCAS 128 for those that support it and another that uses a 32 or 64 bit atomicCAS/atomicAdd that effectively does the same thing, but in a lot more steps.

When I asked Gemini to write it for me it used an atomicCAS I am no cuda expert, but the code looks okay to me at first glance, but it might need a synchronization to avoid some thread divergence.

// Helper function to calculate the carry-out for a 32-bit addition.
// This function determines if adding 'add_val' and 'carry_in' to 'old_val'
// would result in a carry beyond the 32-bit range.
// Parameters:
//   old_val: The 32-bit value before the addition.
//   add_val: The 32-bit value being added.
//   carry_in: The carry-in from the previous (less significant) 32-bit addition.
// Returns:
//   1 if a carry-out is generated, 0 otherwise.
__device__ unsigned int calculate_carry(unsigned int old_val, unsigned int add_val, unsigned int carry_in) {
    // Use unsigned long long to safely perform the addition and detect overflow
    // within the 32-bit range without wrapping around prematurely.
    unsigned long long sum = (unsigned long long)old_val + add_val + carry_in;
    // A carry is generated if the sum exceeds the maximum 32-bit unsigned value (0xFFFFFFFFU).
    return (sum > 0xFFFFFFFFU) ? 1 : 0;
}

// CUDA device function for atomic 128-bit addition.
// This function atomically adds a 128-bit value to another 128-bit value
// located in global or shared memory. It achieves this by performing a sequence
// of four 32-bit atomic compare-and-swap (CAS) operations, propagating carries.
//
// Parameters:
//   target_128_ptr: Pointer to the 128-bit value (represented as an array of 4 unsigned ints)
//                   in global/shared memory that will be atomically updated.
//                   target_128_ptr[0] holds the least significant 32 bits (LSB),
//                   and target_128_ptr[3] holds the most significant 32 bits (MSB).
//   add_128_ptr:    Pointer to the 128-bit value (array of 4 unsigned ints) to be added.
//                   add_128_ptr[0] is LSB, add_128_ptr[3] is MSB.
//
// Returns:
//   true if a 128-bit unsigned overflow occurred (i.e., a carry-out was generated
//   from the most significant 32-bit part), false otherwise.
__device__ bool atomicAdd128(unsigned int* target_128_ptr, const unsigned int* add_128_ptr) {
    unsigned int carry_in = 0; // Initialize carry-in for the LSB part to 0.
    bool overflow = false;     // Flag to indicate a 128-bit overflow.

    // Iterate through the 4 32-bit parts, from least significant (index 0) to most significant (index 3).
    for (int i = 0; i < 4; ++i) {
        unsigned int old_part_val;      // Stores the value read from memory before a successful CAS.
        unsigned int new_part_val;      // Stores the calculated new value to be written.
        unsigned int expected_part_val; // Stores the value we expect to find in memory for CAS.

        // Get the 32-bit value to add for the current part.
        unsigned int current_add_part = add_128_ptr[i];

        // This loop ensures atomicity for the current 32-bit part.
        // It retries the operation if another thread modifies the memory location
        // between our read and the atomicCAS attempt.
        do {
            // 1. Read the current value of this 32-bit part from global/shared memory.
            expected_part_val = target_128_ptr[i];

            // 2. Calculate the new value for this part. This calculation includes
            //    the current value from memory, the part of the value to add,
            //    and any carry-in from the previous (less significant) part.
            new_part_val = expected_part_val + current_add_part + carry_in;

            // 3. Attempt to atomically compare and swap.
            //    atomicCAS(&address, old_value, new_value) works as follows:
            //    - It reads the value at 'address'.
            //    - If the read value is equal to 'old_value', it writes 'new_value' to 'address'.
            //    - It returns the value that was *originally* at 'address' before the operation.
            //    So, 'old_part_val' will contain 'expected_part_val' if the CAS was successful,
            //    or the new value if another thread updated it before our CAS.
            old_part_val = atomicCAS(&target_128_ptr[i], expected_part_val, new_part_val);

            // 4. Check if the CAS was successful.
            //    If 'old_part_val' is not equal to 'expected_part_val', it means another thread
            //    modified 'target_128_ptr[i]' between our initial read (step 1) and this CAS (step 3).
            //    In this case, we need to retry the entire loop for this part with the new
            //    'expected_part_val' (which is now stored in 'old_part_val').
        } while (old_part_val != expected_part_val);

        // At this point, 'old_part_val' contains the value that was successfully replaced
        // by 'new_part_val' in memory. This is the value *before* our successful atomic update
        // for this specific 32-bit part.
        // We use this 'old_part_val' (the value before our addition) along with the
        // 'current_add_part' and the 'carry_in' to calculate the carry-out for the next part.
        carry_in = calculate_carry(old_part_val, current_add_part, carry_in);
    }

    // After the loop has processed all 4 parts, if there is still a carry-in
    // from the most significant part (index 3), it indicates that a 128-bit
    // unsigned overflow has occurred.
    overflow = (carry_in > 0);

    return overflow;
}

Describe alternatives you've considered
I can do the same hack that we have been doing for decimal128 SUM. I would just cut it in half and do it all for an INT64 as input instead. But I would rather have CUDF get faster, and use less memory if possible.

Metadata

Metadata

Assignees

Labels

PerformancePerformance related issueSparkFunctionality that helps Spark RAPIDSfeature requestNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions