Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ run.out
clickhouse/etc_sudoers.bak
workdir/
timeout-exit-codes.out
*/target
*.lock
13 changes: 13 additions & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "db-benchmark"
version = "0.1.0"
edition = "2018"

[dependencies]
datafusion = { git = "https://github.com/apache/arrow.git", features = ["simd"] }
arrow = { git = "https://github.com/apache/arrow.git", features = ["simd"] }
tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] }
snmalloc-rs = "0.2"

[profile.release]
lto = true
5 changes: 5 additions & 0 deletions datafusion/setup-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
set -e

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y

78 changes: 78 additions & 0 deletions datafusion/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::{CsvFile, MemTable};
use datafusion::error::Result;
use datafusion::prelude::*;
use std::env;
use std::time::Instant;

#[cfg(feature = "snmalloc")]
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;

#[tokio::main]
async fn main() -> Result<()> {
let mut ctx = ExecutionContext::new();
let data = format!("../data/{}.csv", env::var("SRC_DATANAME").unwrap());

let schema = Schema::new(vec![
Field::new("id1", DataType::Utf8, false),
Field::new("id2", DataType::Utf8, false),
Field::new("id3", DataType::Utf8, false),
Field::new("id4", DataType::Int32, false),
Field::new("id5", DataType::Int32, false),
Field::new("id6", DataType::Int32, false),
Field::new("v1", DataType::Int32, false),
Field::new("v2", DataType::Int32, false),
Field::new("v3", DataType::Float64, false),
]);
let options = CsvReadOptions::new().schema(&schema).has_header(true);

let csv = CsvFile::try_new(&data, options).unwrap();
let batch_size = 65536;
let memtable = MemTable::load(&csv, batch_size).await?;
ctx.register_table("t", Box::new(memtable));

// "q1"
let start = Instant::now();
let df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM t GROUP BY id1")?;

let _results = df.collect().await?;
Copy link
Contributor

@jangorecki jangorecki Jan 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would chain this two lines into single line, variable can be named ans for consistency to other scripts. Query for each question need to be run two times. After each query we do extra computation on the ans (and measure its timing as well) to ensure that actual query was not lazy by forcing computation on ans. Those two timings for each query we needs to be written to csv file with timings (mem usage can be ignored), which should be handled by a helper function, like this one:

def write_log(task, data, in_rows, question, out_rows, out_cols, solution, version, git, fun, run, time_sec, mem_gb, cache, chk, chk_time_sec, on_disk):

After both run of a query finished we need to print head-3 and tail-3 of ans.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @jangorecki - im picking up work on this. do you still need a rust version of write_log?


println!("q1 took {} ms", start.elapsed().as_millis());

// "q2"
let start = Instant::now();
let df = ctx.sql("SELECT id1, id2, SUM(v1) AS v1 FROM t GROUP BY id1, id2")?;

let _results = df.collect().await?;

println!("q2 took {} ms", start.elapsed().as_millis());

// "q3"
let start = Instant::now();
let df = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM t GROUP BY id3")?;

let _results = df.collect().await?;

println!("q3 took {} ms", start.elapsed().as_millis());

// "q4"
let start = Instant::now();
let df =
ctx.sql("SELECT id4, AVG(v1) AS v1, AVG(v2) AS v2, AVG(v3) AS v3 FROM t GROUP BY id4")?;

let _results = df.collect().await?;

println!("q4 took {} ms", start.elapsed().as_millis());

// "q5"
let start = Instant::now();
let df =
ctx.sql("SELECT id6, SUM(v1) AS v1, SUM(v2) AS v2, SUM(v3) AS v3 FROM t GROUP BY id6")?;

let _results = df.collect().await?;

println!("q5 took {} ms", start.elapsed().as_millis());

Ok(())
}
8 changes: 8 additions & 0 deletions datafusion/upg-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
set -e

cd datafusion

cargo update

cd ../
1 change: 1 addition & 0 deletions datafusion/ver-datafusion.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cargo tree | grep "├── datafusion (.*)"
2 changes: 1 addition & 1 deletion run.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# task, used in init-setup-iteration.R
export RUN_TASKS="groupby join"
# solution, used in init-setup-iteration.R
export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars"
export RUN_SOLUTIONS="data.table pydatatable dplyr pandas spark dask juliadf cudf clickhouse polars datafusion"

# flag to upgrade tools, used in run.sh on init
export DO_UPGRADE=true
Expand Down
2 changes: 2 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/upg-h2o.
if [[ "$RUN_SOLUTIONS" =~ "h2o" ]]; then ./h2o/ver-h2o.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/upg-polars.sh; fi;
if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./polars/ver-polars.sh; fi;
if [[ "$DO_UPGRADE" == true && "$RUN_SOLUTIONS" =~ "datafusion" ]]; then ./datafusion/upg-datafusion.sh; fi;
if [[ "$RUN_SOLUTIONS" =~ "polars" ]]; then ./datafusion/ver-datafusion.sh; fi;

# run
if [[ -f ./stop ]]; then echo "# Benchmark run $BATCH has been interrupted after $(($(date +%s)-$BATCH))s due to 'stop' file" && rm -f ./stop && rm -f ./run.lock && exit; fi;
Expand Down