Skip to content

Conversation

@hll1213181368
Copy link
Contributor

Use multithreads to improve slotRange migration speed of sendSnapshotByRawKV.

@git-hulk
Copy link
Member

git-hulk commented Nov 1, 2025

@hll1213181368 Do you test how much it will be improved if you increase the number of threads?

@hll1213181368
Copy link
Contributor Author

hll1213181368 commented Nov 1, 2025

@git-hulk @PragmaTwice

Test Case: one kvrocks cluster has 4 shard, every shard has 1TB data.
Set the migrate parameters : migrate-type is raw-key-value, migrate-batch-rate-limit-mb is 256MB/s,migrate-batch-size-kb is 20MB.

Test result:
1)The migration slotRange time is basically sentSnapshotByRawKV.
2)The time consumption of sentSnapshotByRawKV can be mainly divided into four steps:
1. Iterate query of rocksdb data;
2. Rate limitor;
3. SockSend:Sending data to the target node(APPLYBATCH);
4. SockReadLine:Determine whether the data migrate successfully to the target node was .
3)migrate-batch-rate-limit-mb:use to ratelimitor to get token, when the migrate-batch-rate-limit-mb is 256MB/s, the ratelitor time consumption is almost negligible. And It cannot improve the processing speed of SockSend and SockReadLine. =》The migrate parameters are already the optimal solution.
4)try to use multithreads to optimize sendSnapshotByRawKV.
5)Test 10 slotRange. The following is a comparison and analysis of the time consumption before and after using multithreading.

image


// Wait til finish
for (auto &result : results) {
auto s = result.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if one of the threads fails? Others are still migrating?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hll1213181368 Seems this comment was resolved? Did I miss anything?

auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
int total_slots = slot_range.end - slot_range.start + 1;
unsigned int max_parallelism = std::thread::hardware_concurrency();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to have an option instead of just querying nproc.

@PragmaTwice PragmaTwice changed the title chore(cluster): parallel to sendSnapshotByRawKV perf(cluster): use multithreads to optimize sendSnapshotByRawKV Nov 2, 2025
Copy link
Member

@PragmaTwice PragmaTwice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a performance improvement, I think it would be great if a benchmark (before and after this patch) is attached.

@git-hulk
Copy link
Member

git-hulk commented Nov 5, 2025

@hll1213181368 There are still two comments to be resolved, and you won't be able to mark them as resolved without any answer or actions.

For #3240 (comment), we need to handle the partial failure case.

For #3240 (comment), you should support a configuration, or explain what do you think.

@hll1213181368
Copy link
Contributor Author

@hll1213181368 There are still two comments to be resolved, and you won't be able to mark them as resolved without any answer or actions.

For #3240 (comment), we need to handle the partial failure case.

For #3240 (comment), you should support a configuration, or explain what do you think.

@git-hulk @PragmaTwice I have response this two comment. Do you have good idea?

@git-hulk
Copy link
Member

git-hulk commented Nov 17, 2025

@hll1213181368 I didn't see that. Please check if your reply comments are in pending status.

@hll1213181368
Copy link
Contributor Author

hll1213181368 commented Nov 18, 2025

if one of the threads fails, Others thread still migrate. But the total result is NotOK. This slotRange migrate failed. we need to analysis the problem or retry to migrate this slotRange.

@git-hulk Do you think so? Or do you have any good idea?

@hll1213181368
Copy link
Contributor Author

hll1213181368 commented Nov 18, 2025

@PragmaTwice how to set threads for this migrate task. I don't think more better idea.
how do you think this idea, The config of kvrocks to add parameters.

for example: migrate-fixed-thread、migrate-min-threads、migrate-max-threads.

int calculate_parallelism(const SlotRange& slot_range, const ThreadConfig& config) {
    int total_slots = slot_range.end - slot_range.start + 1;
    
    // prefer to use migrate_fixed_threads
    if (config.fixed_threads > 0) {
        return std::min(config.migrate_fixed_threads, total_slots);
    }
    
    unsigned int hardware_threads = std::thread::hardware_concurrency();
    int detected_threads = hardware_threads > 0 ? static_cast<int>(hardware_threads) : 1;
    
    // configuration constraints between migrate_min_threads and migrate_max_threads
    int parallelism = detected_threads;
    if (config.max_threads > 0) {
        parallelism = std::min(parallelism, config.migrate_max_threads);
    }
    parallelism = std::max(parallelism, config.migrate_min_threads);
    
    return std::min(parallelism, total_slots);
}

@git-hulk @PragmaTwice Later I will commit another pr. parallel to improve AsyncScanDBSize task. Similar to this deal principle.

@git-hulk
Copy link
Member

@hll1213181368 I didn't see that. Please check if your reply comments are in pending status.

@PragmaTwice how to set threads for this migrate task. I don't think more better idea. image

@git-hulk @PragmaTwice Later I will commit another pr. parallel to improve AsyncScanDBSize task. Similar to this deal principle.

Those comments are pending and not visible to others. Please release them via the comment button.

@sonarqubecloud
Copy link

@hll1213181368
Copy link
Contributor Author

hll1213181368 commented Nov 19, 2025

@git-hulk Above I have reply comments. Better to solve one thread fails is to try migrate this failed slotRange again.

@git-hulk
Copy link
Member

if one of the threads fails, Others thread still migrate. But the total result is NotOK. This slotRange migrate failed. we need to analysis the problem or retry to migrate this slotRange.

@git-hulk Do you think so? Or do you have any good idea?

I'm good with that allows the partial failure while doing the parallel migration. But the current implementation doesn't respect this behavior. We should collect and wait for all migration threads to finish, then check whether each is successful, partially successful, or a failure.

@hll1213181368
Copy link
Contributor Author

if one of the threads fails, Others thread still migrate. But the total result is NotOK. This slotRange migrate failed. we need to analysis the problem or retry to migrate this slotRange.
@git-hulk Do you think so? Or do you have any good idea?

I'm good with that allows the partial failure while doing the parallel migration. But the current implementation doesn't respect this behavior. We should collect and wait for all migration threads to finish, then check whether each is successful, partially successful, or a failure.

@git-hulk The kvrocks-controller sends a range slots migration request, and the kvrocks node returns either success or failure result for the entire range. For a failed range, the kvrocks-controller is called again to retry the entire range. Successful slots within the range will be returned as successfully migrated, while failed slots will be migrated again. I think it's unnecessary for the kvrocks node to return which slots in the range succeeded and which failed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants