|
| 1 | +--- |
| 2 | +slug: multi-round-local-merge |
| 3 | +title: "Multi-Round Lazy Start Merge" |
| 4 | +authors: [duanmeng, xiaoxmeng, pedroerp] |
| 5 | +tags: [tech-blog, spill, operator] |
| 6 | +--- |
| 7 | + |
| 8 | +## Background |
| 9 | + |
| 10 | +Efficiently merging sorted data partitions at scale is crucial for a variety of training data preparation |
| 11 | +workloads, especially for Generative Recommenders (GRs) a new paradigm introduced in the paper |
| 12 | +*[Actions Speak Louder than Words: Trillion-Parameter Sequential Transducers for Generative Recommendations](https://arxiv.org/pdf/2402.17152)*. |
| 13 | +A key requirement is to merge training data across partitions—for example, merging hourly partitions into |
| 14 | +daily ones—while ensuring that all rows sharing the same primary key are stored consecutively. Training data is |
| 15 | +typically partitioned and bucketed by primary key, with rows sharing the same key |
| 16 | +stored consecutively, so merging across partitions essentially becomes a [multi-way merge problem](https://en.wikipedia.org/wiki/K-way_merge_algorithm#). |
| 17 | + |
| 18 | +Normally, [Apache Spark](https://spark.apache.org/) can be used for this sort-merge requirement — for example, via `CLUSTER BY`. |
| 19 | +However, training datasets for a single job can often reach the PB scale, which in turn generates shuffle data at PB scale. |
| 20 | +Although we typically apply bucketing and ordering by key when preparing training data in production, |
| 21 | +Spark can eliminate the shuffle when merging training data from multiple hourly partitions. |
| 22 | +However, each Spark task can only read the files planned from various partitions within a split |
| 23 | +sequentially, placing them into the sorter and spilling as needed. Only after all files have been read |
| 24 | +does Spark perform a sort-merge of the spilled files. This process produces a large number of small |
| 25 | +spill files, which further degrades efficiency. |
| 26 | + |
| 27 | +Moreover, Spark’s spill is row-based with a low compression ratio, resulting in approximately 4 times |
| 28 | +amplification compared to the original columnar training data in the data lake. These factors |
| 29 | +significantly degrade task stability and performance. Velox has a `LocalMerge` operator that can be |
| 30 | +introduced into Apache Spark via [Gluten](https://gluten.apache.org/) or [PySpark on Velox](https://www.youtube.com/watch?v=oq5M2861WaQ). |
| 31 | + |
| 32 | +*Note: To keep the focus on merging, the remainder of this article also assumes that each partition’s |
| 33 | +training data is already sorted by primary key—a common setup in training data pipelines.* |
| 34 | + |
| 35 | + |
| 36 | +## LocalMerge Operator |
| 37 | + |
| 38 | +The `LocalMerge` operator consolidates its sources’ outputs into a single, sorted stream of rows. |
| 39 | +It runs single-threaded, while its upstream sources may run multi-threaded within the same task, |
| 40 | +producing multiple sorted inputs concurrently. For example, when merging 24 hourly partitions into |
| 41 | +a single daily partition (as shown in the figure below), the merge plan fragment is split into two pipelines: |
| 42 | + |
| 43 | +- Pipeline 0: contains two operators, `TableScan` and `CallbackSink`. 24 drivers are instantiated to scan the 24 hourly partitions. |
| 44 | +- Pipeline 1: contains only a single operator, `LocalMerge`, with one driver responsible for performing the sort merge. |
| 45 | + |
| 46 | +A `CallbackSink` operator is installed at the end of each driver in Pipeline 0. It pushes the `TableScan` |
| 47 | +operator’s output vectors into the queues backing the merge streams. Inside `LocalMerge`, a `TreeOfLosers` |
| 48 | +performs a k-way merge over the 24 merge streams supplied by the Pipeline 0 drivers, producing a single, |
| 49 | +globally sorted output stream. |
| 50 | + |
| 51 | + |
| 52 | +<figure> |
| 53 | + <img src="/img/merge.png" height= "100%" width="100%"/> |
| 54 | +</figure> |
| 55 | + |
| 56 | +## Multi-Round Spill Merge |
| 57 | + |
| 58 | +Although `LocalMerge` minimizes comparisons during merging, preserves row-ordering guarantees, and cleanly |
| 59 | +isolates the single-threaded merge from the multi-threaded scan phase for predictable performance, it can |
| 60 | +cause substantial memory pressure—particularly in training-data pipelines. In these workloads, extremely |
| 61 | +wide tables are common, and even after column pruning, thousands of columns may remain. |
| 62 | + |
| 63 | +Moreover, training data is typically stored in [PAX-style formats such as Parquet, ORC, or DRWF](https://www.vldb.org/pvldb/vol17/p148-zeng.pdf). |
| 64 | +Using Parquet as an example, the reader often needs to keep at least one page per column in memory. As a result, |
| 65 | +simply opening a Parquet file with thousands of columns can consume significant memory even before any |
| 66 | +merging occurs. Wide schemas further amplify per-column metadata, dictionary pages, and decompression buffers, |
| 67 | +inflating the overall footprint. In addition, the k-way merge must hold input vectors from multiple sources |
| 68 | +concurrently, which drives peak memory usage even higher. |
| 69 | + |
| 70 | +To cap memory usage and avoid OOM when merging a large number of partitions, we extend `LocalMerge` to |
| 71 | +process fewer local sources at a time, leverage existing spill facilities to persist intermediate |
| 72 | +results, and introduce lazy-start activation for merge inputs. Using the case of merging 24 hourly |
| 73 | +partitions into a single daily partition, the process is organized into two phases: |
| 74 | + |
| 75 | +**Phase 1** |
| 76 | + |
| 77 | +1. Break the scan-and-merge into multiple rounds (e.g., 3 rounds). |
| 78 | +2. In each round, lazily start a limited number of drivers (e.g., drivers 0–7, eight at a time). |
| 79 | +3. The started drivers scan data and push it into the queues backing their respective merge streams. |
| 80 | +4. Perform an in-memory k-way merge and spill the results, producing a spill-file group (one or more spill files per group). |
| 81 | +5. After all inputs from drivers 0–7 are consumed and spilled, the drivers will be closed, and close the file streams opened by their `TableScan` operators, and release associated memory. |
| 82 | +6. Repeat the above steps for the remaining rounds (drivers 8–15, then drivers 16–23), ensuring peak memory stays within budget. |
| 83 | + |
| 84 | +**Phase 2** |
| 85 | + |
| 86 | +1. Create a concatenated file stream for each spill-file group produced in Phase 1. |
| 87 | +2. Schedule one async callback for each concatenated stream to prefetch and push data into a merge stream. |
| 88 | +3. Merge the outputs of the three merge streams using a k-way merge (e.g., a loser-tree), and begin streaming the final, globally sorted results to downstream operators. |
| 89 | +4. The output batch rows is limited adaptively by estimating row size from the merge streams which use the averaged row size from the first batch. |
| 90 | + |
| 91 | +<figure> |
| 92 | + <img src="/img/spill.merge.png" height= "100%" width="100%"/> |
| 93 | +</figure> |
| 94 | + |
| 95 | +## How To Use |
| 96 | + |
| 97 | +Set `local_merge_spill_enabled` to `true` to enable spilling for the `LocalMerge` |
| 98 | +operator (it is `false` by default). Then, set `local_merge_max_num_merge_sources` to |
| 99 | +control the number of merge sources per round according to your memory management strategy. |
| 100 | + |
| 101 | +*Note: An executor must be configured for spilling, as it would schedule an asynchronous |
| 102 | +callback for each concatenated stream to prefetch data and push it into the merge stream.* |
| 103 | + |
| 104 | +## Future Work |
| 105 | + |
| 106 | +The number of merge sources is adjusted dynamically based on available memory, rather than being |
| 107 | +determined by the `local_merge_max_num_merge_sources` parameter. The process starts with a small |
| 108 | +number of sources, such as 2, and incrementally increases this number for subsequent |
| 109 | +rounds (e.g., to 4) as long as sufficient memory is available. The number of sources stops increasing |
| 110 | +once it reaches a memory-constrained limit. |
| 111 | + |
| 112 | +## Acknowledgements |
| 113 | + |
| 114 | +Thanks to [Xiaoxuan Meng](https://www.linkedin.com/in/xiaoxuanmeng/) and [Pedro Pederia](https://www.linkedin.com/in/pedro-pedreira/) for their guidance, review, and brainstorming. |
| 115 | +I also appreciate the excellent collaboration and work from my colleagues, |
| 116 | +[Xiang Yao](https://www.linkedin.com/in/%E7%BF%94-%E5%A7%9A-1b513a359/), |
| 117 | +[Gang Wang](https://github.com/zjuwangg), |
| 118 | +and [Weixin Xu](https://www.linkedin.com/in/xu-weixin-75b06786/). |
0 commit comments