Skip to content

Commit 47752e0

Browse files
Dandandankszucs
authored andcommitted
ARROW-11268: [Rust][DataFusion] MemTable::load output partition support
I think the feature to be able to repartition an in memory table is useful, as the repartitioning only needs to be applied once, and repartition itself is cheap (at the same node). Doing this when loading data is very useful for in-memory analytics as we can benefit from mutliple cores after loading the data. The speed up from repartitioning is very big (mainly on aggregates), on my (8-core machine): ~5-7x on query 1 and 12 versus a single partition, and a smaller (~30%) difference for query 5 when using 16 partition. q1/q12 also have very high cpu utilization. @jorgecarleitao maybe this is of interest to you, as you mentioned you are looking into multi-threading. I think this would be a "high level" way to get more parallelism, also in the logical plan. I think in some optimizer rules and/or dynamically we can do repartitions, similar to what's described here https://issues.apache.org/jira/browse/ARROW-9464 Benchmarks after repartitioning (16 partitions): PR (16 partitions) ``` Query 12 iteration 0 took 33.9 ms Query 12 iteration 1 took 34.3 ms Query 12 iteration 2 took 36.9 ms Query 12 iteration 3 took 33.6 ms Query 12 iteration 4 took 35.1 ms Query 12 iteration 5 took 38.8 ms Query 12 iteration 6 took 35.8 ms Query 12 iteration 7 took 34.4 ms Query 12 iteration 8 took 34.2 ms Query 12 iteration 9 took 35.3 ms Query 12 avg time: 35.24 ms ``` Master (1 partition): ``` Query 12 iteration 0 took 245.6 ms Query 12 iteration 1 took 246.4 ms Query 12 iteration 2 took 246.1 ms Query 12 iteration 3 took 247.9 ms Query 12 iteration 4 took 246.5 ms Query 12 iteration 5 took 248.2 ms Query 12 iteration 6 took 247.8 ms Query 12 iteration 7 took 246.4 ms Query 12 iteration 8 took 246.6 ms Query 12 iteration 9 took 246.5 ms Query 12 avg time: 246.79 ms ``` PR (16 partitions): ``` Query 1 iteration 0 took 138.6 ms Query 1 iteration 1 took 142.2 ms Query 1 iteration 2 took 125.8 ms Query 1 iteration 3 took 102.4 ms Query 1 iteration 4 took 105.9 ms Query 1 iteration 5 took 107.0 ms Query 1 iteration 6 took 109.3 ms Query 1 iteration 7 took 109.9 ms Query 1 iteration 8 took 108.8 ms Query 1 iteration 9 took 112.0 ms Query 1 avg time: 116.19 ms ``` Master (1 partition): ``` Query 1 iteration 0 took 640.6 ms Query 1 iteration 1 took 640.0 ms Query 1 iteration 2 took 632.9 ms Query 1 iteration 3 took 634.6 ms Query 1 iteration 4 took 630.7 ms Query 1 iteration 5 took 630.7 ms Query 1 iteration 6 took 631.9 ms Query 1 iteration 7 took 635.5 ms Query 1 iteration 8 took 639.0 ms Query 1 iteration 9 took 638.3 ms Query 1 avg time: 635.43 ms ``` PR (16 partitions) ``` Query 5 iteration 0 took 465.8 ms Query 5 iteration 1 took 428.0 ms Query 5 iteration 2 took 435.0 ms Query 5 iteration 3 took 407.3 ms Query 5 iteration 4 took 435.7 ms Query 5 iteration 5 took 437.4 ms Query 5 iteration 6 took 411.2 ms Query 5 iteration 7 took 432.0 ms Query 5 iteration 8 took 436.8 ms Query 5 iteration 9 took 435.6 ms Query 5 avg time: 432.47 ms ``` Master (1 partition) ``` Query 5 iteration 0 took 660.6 ms Query 5 iteration 1 took 634.4 ms Query 5 iteration 2 took 626.4 ms Query 5 iteration 3 took 628.0 ms Query 5 iteration 4 took 635.3 ms Query 5 iteration 5 took 631.1 ms Query 5 iteration 6 took 631.3 ms Query 5 iteration 7 took 639.4 ms Query 5 iteration 8 took 634.3 ms Query 5 iteration 9 took 639.0 ms Query 5 avg time: 635.97 ms ``` Closes #9214 from Dandandan/mem_table_repartition Lead-authored-by: Heres, Daniel <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Signed-off-by: Jorge C. Leitao <[email protected]>
1 parent 2de39d1 commit 47752e0

File tree

3 files changed

+49
-5
lines changed

3 files changed

+49
-5
lines changed

rust/benchmarks/src/bin/tpch.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ struct BenchmarkOpt {
7070
/// Load the data into a MemTable before executing the query
7171
#[structopt(short = "m", long = "mem-table")]
7272
mem_table: bool,
73+
74+
/// Number of partitions to create when using MemTable as input
75+
#[structopt(short = "n", long = "partitions", default_value = "8")]
76+
partitions: usize,
7377
}
7478

7579
#[derive(Debug, StructOpt)]
@@ -138,8 +142,12 @@ async fn benchmark(opt: BenchmarkOpt) -> Result<Vec<arrow::record_batch::RecordB
138142
println!("Loading table '{}' into memory", table);
139143
let start = Instant::now();
140144

141-
let memtable =
142-
MemTable::load(table_provider.as_ref(), opt.batch_size).await?;
145+
let memtable = MemTable::load(
146+
table_provider.as_ref(),
147+
opt.batch_size,
148+
Some(opt.partitions),
149+
)
150+
.await?;
143151
println!(
144152
"Loaded table '{}' into memory in {} ms",
145153
table,
@@ -1593,6 +1601,7 @@ mod tests {
15931601
path: PathBuf::from(path.to_string()),
15941602
file_format: "tbl".to_string(),
15951603
mem_table: false,
1604+
partitions: 16,
15961605
};
15971606
let actual = benchmark(opt).await?;
15981607

rust/datafusion/benches/sort_limit_query_sql.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,13 @@ fn create_context() -> Arc<Mutex<ExecutionContext>> {
7070

7171
let ctx_holder: Arc<Mutex<Vec<Arc<Mutex<ExecutionContext>>>>> =
7272
Arc::new(Mutex::new(vec![]));
73+
74+
let partitions = 16;
75+
7376
rt.block_on(async {
74-
let mem_table = MemTable::load(&csv, 16 * 1024).await.unwrap();
77+
let mem_table = MemTable::load(&csv, 16 * 1024, Some(partitions))
78+
.await
79+
.unwrap();
7580

7681
// create local execution context
7782
let mut ctx = ExecutionContext::new();

rust/datafusion/src/datasource/memory.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,24 @@
1919
//! queried by DataFusion. This allows data to be pre-loaded into memory and then
2020
//! repeatedly queried without incurring additional file I/O overhead.
2121
22+
use futures::StreamExt;
2223
use log::debug;
2324
use std::any::Any;
2425
use std::sync::Arc;
2526

2627
use arrow::datatypes::{Field, Schema, SchemaRef};
2728
use arrow::record_batch::RecordBatch;
2829

29-
use crate::datasource::datasource::Statistics;
3030
use crate::datasource::TableProvider;
3131
use crate::error::{DataFusionError, Result};
3232
use crate::logical_plan::Expr;
3333
use crate::physical_plan::common;
3434
use crate::physical_plan::memory::MemoryExec;
3535
use crate::physical_plan::ExecutionPlan;
36+
use crate::{
37+
datasource::datasource::Statistics,
38+
physical_plan::{repartition::RepartitionExec, Partitioning},
39+
};
3640

3741
use super::datasource::ColumnStatistics;
3842

@@ -102,7 +106,11 @@ impl MemTable {
102106
}
103107

104108
/// Create a mem table by reading from another data source
105-
pub async fn load(t: &dyn TableProvider, batch_size: usize) -> Result<Self> {
109+
pub async fn load(
110+
t: &dyn TableProvider,
111+
batch_size: usize,
112+
output_partitions: Option<usize>,
113+
) -> Result<Self> {
106114
let schema = t.schema();
107115
let exec = t.scan(&None, batch_size, &[])?;
108116
let partition_count = exec.output_partitioning().partition_count();
@@ -126,6 +134,28 @@ impl MemTable {
126134
data.push(result);
127135
}
128136

137+
let exec = MemoryExec::try_new(&data, schema.clone(), None)?;
138+
139+
if let Some(num_partitions) = output_partitions {
140+
let exec = RepartitionExec::try_new(
141+
Arc::new(exec),
142+
Partitioning::RoundRobinBatch(num_partitions),
143+
)?;
144+
145+
// execute and collect results
146+
let mut output_partitions = vec![];
147+
for i in 0..exec.output_partitioning().partition_count() {
148+
// execute this *output* partition and collect all batches
149+
let mut stream = exec.execute(i).await?;
150+
let mut batches = vec![];
151+
while let Some(result) = stream.next().await {
152+
batches.push(result?);
153+
}
154+
output_partitions.push(batches);
155+
}
156+
157+
return MemTable::try_new(schema.clone(), output_partitions);
158+
}
129159
MemTable::try_new(schema.clone(), data)
130160
}
131161
}

0 commit comments

Comments
 (0)